package play.api.libs.iteratee
import java.nio.file.Files
import play.api.libs.iteratee.Execution.Implicits.{ defaultExecutionContext => dec }
import play.api.libs.iteratee.internal.{ eagerFuture, executeFuture }
import scala.concurrent.{ ExecutionContext, Future, Promise, blocking }
import scala.util.{ Try, Success, Failure }
import scala.language.reflectiveCalls
trait Enumerator[E] {
parent =>
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]
def |>>[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = apply(i)
def |>>>[A](i: Iteratee[E, A]): Future[A] = apply(i).flatMap(_.run)(dec)
def run[A](i: Iteratee[E, A]): Future[A] = |>>>(i)
def |>>|[A](i: Iteratee[E, A]): Future[Step[E, A]] = apply(i).flatMap(_.unflatten)(dec)
def andThen(e: Enumerator[E]): Enumerator[E] = new Enumerator[E] {
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = parent.apply(i).flatMap(e.apply)(dec)
}
def interleave[B >: E](other: Enumerator[B]): Enumerator[B] = Enumerator.interleave(this, other)
def >-[B >: E](other: Enumerator[B]): Enumerator[B] = interleave(other)
def &>[To](enumeratee: Enumeratee[E, To]): Enumerator[To] = new Enumerator[To] {
def apply[A](i: Iteratee[To, A]): Future[Iteratee[To, A]] = {
val transformed = enumeratee.applyOn(i)
val xx = parent |>> transformed
xx.flatMap(_.run)(dec)
}
}
def onDoneEnumerating(callback: => Unit)(implicit ec: ExecutionContext): Enumerator[E] = new Enumerator[E] {
private val pec = ec.prepare()
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = parent.apply(it).andThen {
case someTry =>
callback
someTry.get
}(pec)
}
def through[To](enumeratee: Enumeratee[E, To]): Enumerator[To] = &>(enumeratee)
def >>>(e: Enumerator[E]): Enumerator[E] = andThen(e)
def map[U](f: E => U)(implicit ec: ExecutionContext): Enumerator[U] = parent &> Enumeratee.map[E](f)(ec)
def mapInput[U](f: Input[E] => Input[U])(implicit ec: ExecutionContext): Enumerator[U] = parent &> Enumeratee.mapInput[E](f)(ec)
def flatMap[U](f: E => Enumerator[U])(implicit ec: ExecutionContext): Enumerator[U] = {
val pec = ec.prepare()
import Execution.Implicits.{ defaultExecutionContext => ec }
new Enumerator[U] {
def apply[A](iteratee: Iteratee[U, A]): Future[Iteratee[U, A]] = {
val folder = Iteratee.fold2[E, Iteratee[U, A]](iteratee) { (it, e) =>
for {
en <- Future(f(e))(pec)
newIt <- en(it)
done <- Iteratee.isDoneOrError(newIt)
} yield ((newIt, done))
}(dec)
parent(folder).flatMap(_.run)
}
}
}
}
object Enumerator {
def flatten[E](eventuallyEnum: Future[Enumerator[E]]): Enumerator[E] = new Enumerator[E] {
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = eventuallyEnum.flatMap(_.apply(it))(dec)
}
def enumInput[E](e: Input[E]) = new Enumerator[E] {
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
i.fold {
case Step.Cont(k) => eagerFuture(k(e))
case _ => Future.successful(i)
}(dec)
}
def interleave[E](e1: Enumerator[E], es: Enumerator[E]*): Enumerator[E] = interleave(e1 +: es)
def interleave[E](es: Seq[Enumerator[E]]): Enumerator[E] = new Enumerator[E] {
import scala.concurrent.stm._
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
val iter: Ref[Iteratee[E, A]] = Ref(it)
val attending: Ref[Option[Seq[Boolean]]] = Ref(Some(es.map(_ => true)))
val result = Promise[Iteratee[E, A]]()
def redeemResultIfNotYet(r: Iteratee[E, A]) {
if (attending.single.transformIfDefined { case Some(_) => None })
result.success(r)
}
def iteratee[EE <: E](f: Seq[Boolean] => Seq[Boolean]): Iteratee[EE, Unit] = {
def step(in: Input[EE]): Iteratee[EE, Unit] = {
val p = Promise[Iteratee[E, A]]()
val i = iter.single.swap(Iteratee.flatten(p.future))
in match {
case Input.El(_) | Input.Empty =>
val nextI = i.fold {
case Step.Cont(k) =>
val n = k(in)
n.fold {
case Step.Cont(kk) =>
p.success(Cont(kk))
Future.successful(Cont(step))
case _ =>
p.success(n)
Future.successful(Done((), Input.Empty: Input[EE]))
}(dec)
case _ =>
p.success(i)
Future.successful(Done((), Input.Empty: Input[EE]))
}(dec)
Iteratee.flatten(nextI)
case Input.EOF => {
if (attending.single.transformAndGet { _.map(f) }.forall(_.forall(_ == false))) {
p.complete(Try(Iteratee.flatten(i.feed(Input.EOF))))
} else {
p.success(i)
}
Done((), Input.Empty)
}
}
}
Cont(step)
}
val ps = es.zipWithIndex.map { case (e, index) => e |>> iteratee[E](_.patch(index, Seq(true), 1)) }
.map(_.flatMap(_.pureFold(any => ())(dec)))
Future.sequence(ps).onComplete {
case Success(_) =>
redeemResultIfNotYet(iter.single())
case Failure(e) => result.failure(e)
}
result.future
}
}
def interleave[E1, E2 >: E1](e1: Enumerator[E1], e2: Enumerator[E2]): Enumerator[E2] = new Enumerator[E2] {
import scala.concurrent.stm._
def apply[A](it: Iteratee[E2, A]): Future[Iteratee[E2, A]] = {
val iter: Ref[Iteratee[E2, A]] = Ref(it)
val attending: Ref[Option[(Boolean, Boolean)]] = Ref(Some(true -> true))
val result = Promise[Iteratee[E2, A]]()
def redeemResultIfNotYet(r: Iteratee[E2, A]) {
if (attending.single.transformIfDefined { case Some(_) => None })
result.success(r)
}
def iteratee[EE <: E2](f: ((Boolean, Boolean)) => (Boolean, Boolean)): Iteratee[EE, Unit] = {
def step(in: Input[EE]): Iteratee[EE, Unit] = {
val p = Promise[Iteratee[E2, A]]()
val i = iter.single.swap(Iteratee.flatten(p.future))
in match {
case Input.El(_) | Input.Empty =>
val nextI = i.fold {
case Step.Cont(k) =>
val n = k(in)
n.fold {
case Step.Cont(kk) =>
p.success(Cont(kk))
Future.successful(Cont(step))
case _ =>
p.success(n)
Future.successful(Done((), Input.Empty: Input[EE]))
}(dec)
case _ =>
p.success(i)
Future.successful(Done((), Input.Empty: Input[EE]))
}(dec)
Iteratee.flatten(nextI)
case Input.EOF => {
if (attending.single.transformAndGet { _.map(f) } == Some((false, false))) {
p.complete(Try(Iteratee.flatten(i.feed(Input.EOF))))
} else {
p.success(i)
}
Done((), Input.Empty)
}
}
}
Cont(step)
}
val itE1 = iteratee[E1] { case (l, r) => (false, r) }
val itE2 = iteratee[E2] { case (l, r) => (l, false) }
val r1 = e1 |>>| itE1
val r2 = e2 |>>| itE2
r1.flatMap(_ => r2).onComplete {
case Success(_) =>
redeemResultIfNotYet(iter.single())
case Failure(e) => result.failure(e)
}
result.future
}
}
def unfoldM[S, E](s: S)(f: S => Future[Option[(S, E)]])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue1(s)(new TreatCont1[E, S] {
private val pec = ec.prepare()
def apply[A](loop: (Iteratee[E, A], S) => Future[Iteratee[E, A]], s: S, k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]] = {
executeFuture(f(s))(pec).flatMap {
case Some((newS, e)) => loop(k(Input.El(e)), newS)
case None => Future.successful(Cont(k))
}(dec)
}
})
def unfold[S, E](s: S)(f: S => Option[(S, E)])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue1(s)(new TreatCont1[E, S] {
private val pec = ec.prepare()
def apply[A](loop: (Iteratee[E, A], S) => Future[Iteratee[E, A]], s: S, k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]] = Future(f(s))(pec).flatMap {
case Some((s, e)) => loop(k(Input.El(e)), s)
case None => Future.successful(Cont(k))
}(dec)
})
def repeat[E](e: => E)(implicit ec: ExecutionContext): Enumerator[E] = checkContinue0(new TreatCont0[E] {
private val pec = ec.prepare()
def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = Future(e)(pec).flatMap(ee => loop(k(Input.El(ee))))(dec)
})
def repeatM[E](e: => Future[E])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue0(new TreatCont0[E] {
private val pec = ec.prepare()
def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = executeFuture(e)(pec).flatMap(ee => loop(k(Input.El(ee))))(dec)
})
def generateM[E](e: => Future[Option[E]])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue0(new TreatCont0[E] {
private val pec = ec.prepare()
def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = executeFuture(e)(pec).flatMap {
case Some(e) => loop(k(Input.El(e)))
case None => Future.successful(Cont(k))
}(dec)
})
trait TreatCont0[E] {
def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]]
}
def checkContinue0[E](inner: TreatCont0[E]) = new Enumerator[E] {
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
def step(it: Iteratee[E, A]): Future[Iteratee[E, A]] = it.fold {
case Step.Done(a, e) => Future.successful(Done(a, e))
case Step.Cont(k) => inner[A](step, k)
case Step.Error(msg, e) => Future.successful(Error(msg, e))
}(dec)
step(it)
}
}
trait TreatCont1[E, S] {
def apply[A](loop: (Iteratee[E, A], S) => Future[Iteratee[E, A]], s: S, k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]]
}
def checkContinue1[E, S](s: S)(inner: TreatCont1[E, S]) = new Enumerator[E] {
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
def step(it: Iteratee[E, A], state: S): Future[Iteratee[E, A]] = it.fold {
case Step.Done(a, e) => Future.successful(Done(a, e))
case Step.Cont(k) => inner[A](step, state, k)
case Step.Error(msg, e) => Future.successful(Error(msg, e))
}(dec)
step(it, s)
}
}
def fromCallback1[E](retriever: Boolean => Future[Option[E]],
onComplete: () => Unit = () => (),
onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => ())(implicit ec: ExecutionContext) = new Enumerator[E] {
private val pec = ec.prepare()
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
val iterateeP = Promise[Iteratee[E, A]]()
def step(it: Iteratee[E, A], initial: Boolean = false) {
val next = it.fold {
case Step.Cont(k) => {
executeFuture(retriever(initial))(pec).map {
case None => {
val remainingIteratee = k(Input.EOF)
iterateeP.success(remainingIteratee)
None
}
case Some(read) => {
val nextIteratee = k(Input.El(read))
Some(nextIteratee)
}
}(dec)
}
case Step.Error(msg, in) =>
onError(msg, in)
iterateeP.success(it)
Future.successful(None)
case _ =>
iterateeP.success(it)
Future.successful(None)
}(dec)
next.onFailure {
case reason: Exception =>
onError(reason.getMessage(), Input.Empty)
}(dec)
next.onComplete {
case Success(Some(i)) => step(i)
case Success(None) => Future(onComplete())(pec)
case Failure(e) =>
iterateeP.failure(e)
}(dec)
}
step(it, true)
iterateeP.future
}
}
def fromStream(input: java.io.InputStream, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
implicit val pec = ec.prepare()
generateM({
val buffer = new Array[Byte](chunkSize)
val bytesRead = blocking { input.read(buffer) }
val chunk = bytesRead match {
case -1 => None
case `chunkSize` => Some(buffer)
case read =>
val input = new Array[Byte](read)
System.arraycopy(buffer, 0, input, 0, read)
Some(input)
}
Future.successful(chunk)
})(pec).onDoneEnumerating(input.close)(pec)
}
def fromFile(file: java.io.File, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
fromStream(new java.io.FileInputStream(file), chunkSize)(ec)
}
def fromPath(path: java.nio.file.Path, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
fromStream(Files.newInputStream(path), chunkSize)(ec)
}
def outputStream(a: java.io.OutputStream => Unit)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
Concurrent.unicast[Array[Byte]] { channel =>
val outputStream = new java.io.OutputStream() {
override def close() {
channel.end()
}
override def flush() {}
override def write(value: Int) {
channel.push(Array(value.toByte))
}
override def write(buffer: Array[Byte]) {
write(buffer, 0, buffer.length)
}
override def write(buffer: Array[Byte], start: Int, count: Int) {
channel.push(buffer.slice(start, start + count))
}
}
a(outputStream)
}(ec)
}
def eof[A] = enumInput[A](Input.EOF)
def apply[E](in: E*): Enumerator[E] = in.length match {
case 0 => Enumerator.empty
case 1 => new Enumerator[E] {
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.pureFoldNoEC {
case Step.Cont(k) => k(Input.El(in.head))
case _ => i
}
}
case _ => new Enumerator[E] {
def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i)
}
}
def enumerate[E](traversable: TraversableOnce[E])(implicit ctx: scala.concurrent.ExecutionContext): Enumerator[E] = {
val it = traversable.toIterator
Enumerator.unfoldM[scala.collection.Iterator[E], E](it: scala.collection.Iterator[E])({ currentIt =>
if (currentIt.hasNext)
Future[Option[(scala.collection.Iterator[E], E)]]({
val next = currentIt.next
Some((currentIt -> next))
})(ctx)
else
Future.successful[Option[(scala.collection.Iterator[E], E)]]({
None
})
})(dec)
}
def empty[E]: Enumerator[E] = new Enumerator[E] {
def apply[A](i: Iteratee[E, A]) = Future.successful(i)
}
private def enumerateSeq[E, A]: (Seq[E], Iteratee[E, A]) => Future[Iteratee[E, A]] = { (l, i) =>
l.foldLeft(Future.successful(i))((i, e) =>
i.flatMap(it => it.pureFold {
case Step.Cont(k) => k(Input.El(e))
case _ => it
}(dec))(dec))
}
private[iteratee] def enumerateSeq1[E](s: Seq[E]): Enumerator[E] = checkContinue1(s)(new TreatCont1[E, Seq[E]] {
def apply[A](loop: (Iteratee[E, A], Seq[E]) => Future[Iteratee[E, A]], s: Seq[E], k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]] =
if (!s.isEmpty)
loop(k(Input.El(s.head)), s.tail)
else Future.successful(Cont(k))
})
private[iteratee] def enumerateSeq2[E](s: Seq[Input[E]]): Enumerator[E] = checkContinue1(s)(new TreatCont1[E, Seq[Input[E]]] {
def apply[A](loop: (Iteratee[E, A], Seq[Input[E]]) => Future[Iteratee[E, A]], s: Seq[Input[E]], k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]] =
if (!s.isEmpty)
loop(k(s.head), s.tail)
else Future.successful(Cont(k))
})
}