package scala.collection
import java.lang.Thread._
import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParArray
import scala.collection.mutable.UnrolledBuffer
import annotation.unchecked.uncheckedVariance
package object parallel {
val MIN_FOR_COPY = 512
val CHECK_RATE = 512
val SQRT2 = math.sqrt(2)
val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
val p = parallelismLevel
if (p > 1) 1 + sz / (8 * p)
else sz
}
private[parallel] def unsupported = throw new UnsupportedOperationException
private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg)
private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)
private[parallel] def getTaskSupport: TaskSupport =
if (util.Properties.isJavaAtLeast("1.6")) {
val vendor = util.Properties.javaVmVendor
if ((vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinTaskSupport
else new ThreadPoolTaskSupport
} else new ThreadPoolTaskSupport
val tasksupport = getTaskSupport
trait FactoryOps[From, Elem, To] {
trait Otherwise[R] {
def otherwise(notbody: => R): R
}
def isParallel: Boolean
def asParallel: CanCombineFrom[From, Elem, To]
def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R): Otherwise[R]
}
implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new FactoryOps[From, Elem, To] {
def isParallel = bf.isInstanceOf[Parallel]
def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new Otherwise[R] {
def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
}
}
trait TraversableOps[T] {
trait Otherwise[R] {
def otherwise(notbody: => R): R
}
def isParallel: Boolean
def isParIterable: Boolean
def asParIterable: ParIterable[T]
def isParSeq: Boolean
def asParSeq: ParSeq[T]
def ifParSeq[R](isbody: ParSeq[T] => R): Otherwise[R]
def toParArray: ParArray[T]
}
implicit def traversable2ops[T](t: collection.GenTraversableOnce[T]) = new TraversableOps[T] {
def isParallel = t.isInstanceOf[Parallel]
def isParIterable = t.isInstanceOf[ParIterable[_]]
def asParIterable = t.asInstanceOf[ParIterable[T]]
def isParSeq = t.isInstanceOf[ParSeq[_]]
def asParSeq = t.asInstanceOf[ParSeq[T]]
def ifParSeq[R](isbody: ParSeq[T] => R) = new Otherwise[R] {
def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody
}
def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else {
val it = t.toIterator
val cb = mutable.ParArrayCombiner[T]()
while (it.hasNext) cb += it.next
cb.result
}
}
trait ThrowableOps {
def alongWith(that: Throwable): Throwable
}
implicit def throwable2ops(self: Throwable) = new ThrowableOps {
def alongWith(that: Throwable) = (self, that) match {
case (self: CompositeThrowable, that: CompositeThrowable) => new CompositeThrowable(self.throwables ++ that.throwables)
case (self: CompositeThrowable, _) => new CompositeThrowable(self.throwables + that)
case (_, that: CompositeThrowable) => new CompositeThrowable(that.throwables + self)
case _ => new CompositeThrowable(Set(self, that))
}
}
final class CompositeThrowable(val throwables: Set[Throwable])
extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.map(t => (t, t.getStackTrace.toList)).mkString(", "))
private[parallel] class BufferSplitter[T]
(private val buffer: collection.mutable.ArrayBuffer[T], private var index: Int, private val until: Int, var signalDelegate: collection.generic.Signalling)
extends IterableSplitter[T] {
def hasNext = index < until
def next = {
val r = buffer(index)
index += 1
r
}
def remaining = until - index
def dup = new BufferSplitter(buffer, index, until, signalDelegate)
def split: Seq[IterableSplitter[T]] = if (remaining > 1) {
val divsz = (until - index) / 2
Seq(
new BufferSplitter(buffer, index, index + divsz, signalDelegate),
new BufferSplitter(buffer, index + divsz, until, signalDelegate)
)
} else Seq(this)
private[parallel] override def debugInformation = {
buildString {
append =>
append("---------------")
append("Buffer iterator")
append("buffer: " + buffer)
append("index: " + index)
append("until: " + until)
append("---------------")
}
}
}
private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
(private val bucketnumber: Int)
extends Combiner[Elem, To] {
protected var buckets: Array[UnrolledBuffer[Buck]] @uncheckedVariance = new Array[UnrolledBuffer[Buck]](bucketnumber)
protected var sz: Int = 0
def size = sz
def clear = {
buckets = new Array[UnrolledBuffer[Buck]](bucketnumber)
sz = 0
}
def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
if (other.isInstanceOf[BucketCombiner[_, _, _, _]]) {
beforeCombine(other)
val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]]
var i = 0
while (i < bucketnumber) {
if (buckets(i) eq null) {
buckets(i) = that.buckets(i)
} else {
if (that.buckets(i) ne null) buckets(i) concat that.buckets(i)
}
i += 1
}
sz = sz + that.size
afterCombine(other)
this
} else sys.error("Unexpected combiner type.")
} else this
}
}