package scala.collection.parallel
import scala.collection.{ Parallel, SeqLike, GenSeqLike, GenSeq, GenIterable, Iterator }
import scala.collection.generic.DefaultSignalling
import scala.collection.generic.AtomicIndexFlag
import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.generic.VolatileAbort
trait ParSeqLike[+T, +Repr <: ParSeq[T], +Sequential <: Seq[T] with SeqLike[T, Sequential]]
extends scala.collection.GenSeqLike[T, Repr]
with ParIterableLike[T, Repr, Sequential] {
self =>
import tasksupport._
type SuperParIterator = IterableSplitter[T]
trait ParIterator extends SeqSplitter[T] with super.ParIterator {
me: SignalContextPassingIterator[ParIterator] =>
def split: Seq[ParIterator]
def psplit(sizes: Int*): Seq[ParIterator]
}
trait SignalContextPassingIterator[+IterRepr <: ParIterator]
extends ParIterator with super.SignalContextPassingIterator[IterRepr] {
abstract override def psplit(sizes: Int*): Seq[IterRepr] = {
val pits = super.psplit(sizes: _*)
pits foreach { _.signalDelegate = signalDelegate }
pits.asInstanceOf[Seq[IterRepr]]
}
}
protected[parallel] def splitter: SeqSplitter[T]
override def iterator: PreciseSplitter[T] = splitter
override def size = length
protected abstract class Elements(start: Int, val end: Int) extends ParIterator with BufferedIterator[T] {
me: SignalContextPassingIterator[ParIterator] =>
private var i = start
def hasNext = i < end
def next: T = if (i < end) {
val x = self(i)
i += 1
x
} else Iterator.empty.next
def head = self(i)
final def remaining = end - i
def dup = new Elements(i, end) with SignalContextPassingIterator[ParIterator]
def split = psplit(remaining / 2, remaining - remaining / 2)
def psplit(sizes: Int*) = {
val incr = sizes.scanLeft(0)(_ + _)
for ((from, until) <- incr.init zip incr.tail) yield {
new Elements(start + from, (start + until) min end) with SignalContextPassingIterator[ParIterator]
}
}
override def toString = "Elements(" + start + ", " + end + ")"
}
def segmentLength(p: T => Boolean, from: Int): Int = if (from >= length) 0 else {
val realfrom = if (from < 0) 0 else from
val ctx = new DefaultSignalling with AtomicIndexFlag
ctx.setIndexFlag(Int.MaxValue)
executeAndWaitResult(new SegmentLength(p, 0, splitter.psplit(realfrom, length - realfrom)(1) assign ctx))._1
}
def indexWhere(p: T => Boolean, from: Int): Int = if (from >= length) -1 else {
val realfrom = if (from < 0) 0 else from
val ctx = new DefaultSignalling with AtomicIndexFlag
ctx.setIndexFlag(Int.MaxValue)
executeAndWaitResult(new IndexWhere(p, realfrom, splitter.psplit(realfrom, length - realfrom)(1) assign ctx))
}
def lastIndexWhere(p: T => Boolean, end: Int): Int = if (end < 0) -1 else {
val until = if (end >= length) length else end + 1
val ctx = new DefaultSignalling with AtomicIndexFlag
ctx.setIndexFlag(Int.MinValue)
executeAndWaitResult(new LastIndexWhere(p, 0, splitter.psplit(until, length - until)(0) assign ctx))
}
def reverse: Repr = {
executeAndWaitResult(new Reverse(() => newCombiner, splitter) mapResult { _.result })
}
def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[Repr, S, That]): That = bf ifParallel { pbf =>
executeAndWaitResult(new ReverseMap[S, That](f, pbf, splitter) mapResult { _.result })
} otherwise seq.reverseMap(f)(bf2seq(bf))
def startsWith[S](that: GenSeq[S], offset: Int): Boolean = that ifParSeq { pthat =>
if (offset < 0 || offset >= length) offset == length && pthat.length == 0
else if (pthat.length == 0) true
else if (pthat.length > length - offset) false
else {
val ctx = new DefaultSignalling with VolatileAbort
executeAndWaitResult(new SameElements(splitter.psplit(offset, pthat.length)(1) assign ctx, pthat.splitter))
}
} otherwise seq.startsWith(that, offset)
override def sameElements[U >: T](that: GenIterable[U]): Boolean = that ifParSeq { pthat =>
val ctx = new DefaultSignalling with VolatileAbort
length == pthat.length && executeAndWaitResult(new SameElements(splitter assign ctx, pthat.splitter))
} otherwise seq.sameElements(that)
def endsWith[S](that: GenSeq[S]): Boolean = that ifParSeq { pthat =>
if (that.length == 0) true
else if (that.length > length) false
else {
val ctx = new DefaultSignalling with VolatileAbort
val tlen = that.length
executeAndWaitResult(new SameElements(splitter.psplit(length - tlen, tlen)(1) assign ctx, pthat.splitter))
}
} otherwise seq.endsWith(that)
def patch[U >: T, That](from: Int, patch: GenSeq[U], replaced: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
val realreplaced = replaced min (length - from)
if (patch.isParSeq && bf.isParallel && (size - realreplaced + patch.size) > MIN_FOR_COPY) {
val that = patch.asParSeq
val pbf = bf.asParallel
val pits = splitter.psplit(from, replaced, length - from - realreplaced)
val copystart = new Copy[U, That](() => pbf(repr), pits(0))
val copymiddle = wrap {
val tsk = new that.Copy[U, That](() => pbf(repr), that.splitter)
tasksupport.executeAndWaitResult(tsk)
}
val copyend = new Copy[U, That](() => pbf(repr), pits(2))
executeAndWaitResult(((copystart parallel copymiddle) { _ combine _ } parallel copyend) { _ combine _ } mapResult {
_.result
})
} else patch_sequential(from, patch.seq, replaced)
}
private def patch_sequential[U >: T, That](fromarg: Int, patch: Seq[U], r: Int)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
val from = 0 max fromarg
val b = bf(repr)
val repl = (r min (length - from)) max 0
val pits = splitter.psplit(from, repl, length - from - repl)
b ++= pits(0)
b ++= patch
b ++= pits(2)
b.result
}
def updated[U >: T, That](index: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = bf ifParallel { pbf =>
executeAndWaitResult(new Updated(index, elem, pbf, splitter) mapResult { _.result })
} otherwise seq.updated(index, elem)(bf2seq(bf))
def +:[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
patch(0, mutable.ParArray(elem), 0)
}
def :+[U >: T, That](elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = {
patch(length, mutable.ParArray(elem), 0)
}
def padTo[U >: T, That](len: Int, elem: U)(implicit bf: CanBuildFrom[Repr, U, That]): That = if (length < len) {
patch(length, new immutable.Repetition(elem, len - length), 0)
} else patch(length, Nil, 0);
override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
val pbf = bf.asParallel
val thatseq = that.asParSeq
executeAndWaitResult(new Zip(length min thatseq.length, pbf, splitter, thatseq.splitter) mapResult { _.result });
} else super.zip(that)(bf)
def corresponds[S](that: GenSeq[S])(p: (T, S) => Boolean): Boolean = that ifParSeq { pthat =>
val ctx = new DefaultSignalling with VolatileAbort
length == pthat.length && executeAndWaitResult(new Corresponds(p, splitter assign ctx, pthat.splitter))
} otherwise seq.corresponds(that)(p)
def diff[U >: T](that: GenSeq[U]): Repr = sequentially {
_ diff that
}
def intersect[U >: T](that: GenSeq[U]) = sequentially {
_ intersect that
}
def distinct: Repr = sequentially {
_.distinct
}
override def toString = seq.mkString(stringPrefix + "(", ", ", ")")
override def toSeq = this.asInstanceOf[ParSeq[T]]
override def view = new ParSeqView[T, Repr, Sequential] {
protected lazy val underlying = self.repr
protected[this] def viewIdentifier = ""
protected[this] def viewIdString = ""
def length = self.length
def apply(idx: Int) = self(idx)
override def seq = self.seq.view
def splitter = self.splitter
}
protected[this] def down(p: IterableSplitter[_]) = p.asInstanceOf[SeqSplitter[T]]
protected trait Accessor[R, Tp] extends super.Accessor[R, Tp] {
protected[this] val pit: SeqSplitter[T]
}
protected trait Transformer[R, Tp] extends Accessor[R, Tp] with super.Transformer[R, Tp]
protected[this] class SegmentLength(pred: T => Boolean, from: Int, protected[this] val pit: SeqSplitter[T])
extends Accessor[(Int, Boolean), SegmentLength] {
@volatile var result: (Int, Boolean) = null
def leaf(prev: Option[(Int, Boolean)]) = if (from < pit.indexFlag) {
val itsize = pit.remaining
val seglen = pit.prefixLength(pred)
result = (seglen, itsize == seglen)
if (!result._2) pit.setIndexFlagIfLesser(from)
} else result = (0, false)
protected[this] def newSubtask(p: SuperParIterator) = throw new UnsupportedOperationException
override def split = {
val pits = pit.split
for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new SegmentLength(pred, from + untilp, p)
}
override def merge(that: SegmentLength) = if (result._2) result = (result._1 + that.result._1, that.result._2)
override def requiresStrictSplitters = true
}
protected[this] class IndexWhere(pred: T => Boolean, from: Int, protected[this] val pit: SeqSplitter[T])
extends Accessor[Int, IndexWhere] {
@volatile var result: Int = -1
def leaf(prev: Option[Int]) = if (from < pit.indexFlag) {
val r = pit.indexWhere(pred)
if (r != -1) {
result = from + r
pit.setIndexFlagIfLesser(from)
}
}
protected[this] def newSubtask(p: SuperParIterator) = unsupported
override def split = {
val pits = pit.split
for ((p, untilp) <- pits zip pits.scanLeft(from)(_ + _.remaining)) yield new IndexWhere(pred, untilp, p)
}
override def merge(that: IndexWhere) = result = if (result == -1) that.result else {
if (that.result != -1) result min that.result else result
}
override def requiresStrictSplitters = true
}
protected[this] class LastIndexWhere(pred: T => Boolean, pos: Int, protected[this] val pit: SeqSplitter[T])
extends Accessor[Int, LastIndexWhere] {
@volatile var result: Int = -1
def leaf(prev: Option[Int]) = if (pos > pit.indexFlag) {
val r = pit.lastIndexWhere(pred)
if (r != -1) {
result = pos + r
pit.setIndexFlagIfGreater(pos)
}
}
protected[this] def newSubtask(p: SuperParIterator) = unsupported
override def split = {
val pits = pit.split
for ((p, untilp) <- pits zip pits.scanLeft(pos)(_ + _.remaining)) yield new LastIndexWhere(pred, untilp, p)
}
override def merge(that: LastIndexWhere) = result = if (result == -1) that.result else {
if (that.result != -1) result max that.result else result
}
override def requiresStrictSplitters = true
}
protected[this] class Reverse[U >: T, This >: Repr](cbf: () => Combiner[U, This], protected[this] val pit: SeqSplitter[T])
extends Transformer[Combiner[U, This], Reverse[U, This]] {
@volatile var result: Combiner[U, This] = null
def leaf(prev: Option[Combiner[U, This]]) = result = pit.reverse2combiner(reuse(prev, cbf()))
protected[this] def newSubtask(p: SuperParIterator) = new Reverse(cbf, down(p))
override def merge(that: Reverse[U, This]) = result = that.result combine result
}
protected[this] class ReverseMap[S, That](f: T => S, pbf: CanCombineFrom[Repr, S, That], protected[this] val pit: SeqSplitter[T])
extends Transformer[Combiner[S, That], ReverseMap[S, That]] {
@volatile var result: Combiner[S, That] = null
def leaf(prev: Option[Combiner[S, That]]) = result = pit.reverseMap2combiner(f, pbf(self.repr))
protected[this] def newSubtask(p: SuperParIterator) = new ReverseMap(f, pbf, down(p))
override def merge(that: ReverseMap[S, That]) = result = that.result combine result
}
protected[this] class SameElements[U >: T](protected[this] val pit: SeqSplitter[T], val otherpit: PreciseSplitter[U])
extends Accessor[Boolean, SameElements[U]] {
@volatile var result: Boolean = true
def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
result = pit.sameElements(otherpit)
if (!result) pit.abort
}
protected[this] def newSubtask(p: SuperParIterator) = unsupported
override def split = {
val fp = pit.remaining / 2
val sp = pit.remaining - fp
for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new SameElements(p, op)
}
override def merge(that: SameElements[U]) = result = result && that.result
override def requiresStrictSplitters = true
}
protected[this] class Updated[U >: T, That](pos: Int, elem: U, pbf: CanCombineFrom[Repr, U, That], protected[this] val pit: SeqSplitter[T])
extends Transformer[Combiner[U, That], Updated[U, That]] {
@volatile var result: Combiner[U, That] = null
def leaf(prev: Option[Combiner[U, That]]) = result = pit.updated2combiner(pos, elem, pbf(self.repr))
protected[this] def newSubtask(p: SuperParIterator) = unsupported
override def split = {
val pits = pit.split
for ((p, untilp) <- pits zip pits.scanLeft(0)(_ + _.remaining)) yield new Updated(pos - untilp, elem, pbf, p)
}
override def merge(that: Updated[U, That]) = result = result combine that.result
override def requiresStrictSplitters = true
}
protected[this] class Zip[U >: T, S, That](len: Int, pbf: CanCombineFrom[Repr, (U, S), That], protected[this] val pit: SeqSplitter[T], val otherpit: SeqSplitter[S])
extends Transformer[Combiner[(U, S), That], Zip[U, S, That]] {
@volatile var result: Result = null
def leaf(prev: Option[Result]) = result = pit.zip2combiner[U, S, That](otherpit, pbf(self.repr))
protected[this] def newSubtask(p: SuperParIterator) = unsupported
override def split = {
val fp = len / 2
val sp = len - len / 2
val pits = pit.psplit(fp, sp)
val opits = otherpit.psplit(fp, sp)
Seq(
new Zip(fp, pbf, pits(0), opits(0)),
new Zip(sp, pbf, pits(1), opits(1))
)
}
override def merge(that: Zip[U, S, That]) = result = result combine that.result
}
protected[this] class Corresponds[S](corr: (T, S) => Boolean, protected[this] val pit: SeqSplitter[T], val otherpit: PreciseSplitter[S])
extends Accessor[Boolean, Corresponds[S]] {
@volatile var result: Boolean = true
def leaf(prev: Option[Boolean]) = if (!pit.isAborted) {
result = pit.corresponds(corr)(otherpit)
if (!result) pit.abort
}
protected[this] def newSubtask(p: SuperParIterator) = unsupported
override def split = {
val fp = pit.remaining / 2
val sp = pit.remaining - fp
for ((p, op) <- pit.psplit(fp, sp) zip otherpit.psplit(fp, sp)) yield new Corresponds(corr, p, op)
}
override def merge(that: Corresponds[S]) = result = result && that.result
override def requiresStrictSplitters = true
}
}