/* __ *\ ** ________ ___ / / ___ Scala API ** ** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** ** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** ** /____/\___/_/ |_/____/_/ | | ** ** |/ ** \* */ package scala.collection.parallel.mutable import scala.collection.generic.Sizing import scala.collection.mutable.ArraySeq import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.UnrolledBuffer import scala.collection.mutable.UnrolledBuffer.Unrolled import scala.collection.parallel.TaskSupport import scala.collection.parallel.unsupportedop import scala.collection.parallel.Combiner import scala.collection.parallel.Task import scala.reflect.ClassTag private[mutable] class DoublingUnrolledBuffer[T](implicit t: ClassTag[T]) extends UnrolledBuffer[T]()(t) { override def calcNextLength(sz: Int) = if (sz < 10000) sz * 2 else sz protected override def newUnrolled = new Unrolled[T](0, new Array[T](4), null, this) } /** An array combiner that uses doubling unrolled buffers to store elements. */ trait UnrolledParArrayCombiner[T] extends Combiner[T, ParArray[T]] { //self: EnvironmentPassingCombiner[T, ParArray[T]] => // because size is doubling, random access is O(logn)! val buff = new DoublingUnrolledBuffer[Any] def +=(elem: T) = { buff += elem this } def result = { val arrayseq = new ArraySeq[T](size) val array = arrayseq.array.asInstanceOf[Array[Any]] combinerTaskSupport.executeAndWaitResult(new CopyUnrolledToArray(array, 0, size)) new ParArray(arrayseq) } def clear() { buff.clear } override def sizeHint(sz: Int) = { buff.lastPtr.next = new Unrolled(0, new Array[Any](sz), null, buff) buff.lastPtr = buff.lastPtr.next } def combine[N <: T, NewTo >: ParArray[T]](other: Combiner[N, NewTo]): Combiner[N, NewTo] = other match { case that if that eq this => this // just return this case that: UnrolledParArrayCombiner[t] => buff concat that.buff this case _ => unsupportedop("Cannot combine with combiner of different type.") } def size = buff.size /* tasks */ class CopyUnrolledToArray(array: Array[Any], offset: Int, howmany: Int) extends Task[Unit, CopyUnrolledToArray] { var result = (); def leaf(prev: Option[Unit]) = if (howmany > 0) { var totalleft = howmany val (startnode, startpos) = findStart(offset) var curr = startnode var pos = startpos var arroffset = offset while (totalleft > 0) { val lefthere = scala.math.min(totalleft, curr.size - pos) Array.copy(curr.array, pos, array, arroffset, lefthere) // println("from: " + arroffset + " elems " + lefthere + " - " + pos + ", " + curr + " -> " + array.toList + " by " + this + " !! " + buff.headPtr) totalleft -= lefthere arroffset += lefthere pos = 0 curr = curr.next } } private def findStart(pos: Int) = { var left = pos var node = buff.headPtr while ((left - node.size) >= 0) { left -= node.size node = node.next } (node, left) } def split = { val fp = howmany / 2 List(new CopyUnrolledToArray(array, offset, fp), new CopyUnrolledToArray(array, offset + fp, howmany - fp)) } def shouldSplitFurther = howmany > scala.collection.parallel.thresholdFromSize(size, combinerTaskSupport.parallelismLevel) override def toString = "CopyUnrolledToArray(" + offset + ", " + howmany + ")" } } object UnrolledParArrayCombiner { def apply[T](): UnrolledParArrayCombiner[T] = new UnrolledParArrayCombiner[T] {} // was: with EnvironmentPassingCombiner[T, ParArray[T]] }