package scala.collection.parallel
import scala.concurrent.forkjoin._
import scala.util.control.Breaks._
import annotation.unchecked.uncheckedVariance
trait Tasks {
private[parallel] val debugMessages = collection.mutable.ArrayBuffer[String]()
private[parallel] def debuglog(s: String) = synchronized {
debugMessages += s
}
trait Task[R, +Tp] {
type Result = R
def repr = this.asInstanceOf[Tp]
def leaf(result: Option[R])
var result: R
def shouldSplitFurther: Boolean
private[parallel] def split: Seq[Task[R, Tp]]
private[parallel] def merge(that: Tp @uncheckedVariance) {}
@volatile var throwable: Throwable = null
def forwardThrowable() = if (throwable != null) throw throwable
private[parallel] def tryLeaf(lastres: Option[R]) {
try {
tryBreakable {
leaf(lastres)
result = result
} catchBreak {
signalAbort
}
} catch {
case thr: Exception =>
result = result
throwable = thr
signalAbort
}
}
private[parallel] def tryMerge(t: Tp @uncheckedVariance) {
val that = t.asInstanceOf[Task[R, Tp]]
val local = result
if (this.throwable == null && that.throwable == null) merge(t)
mergeThrowables(that)
}
private def checkMerge(that: Task[R, Tp] @uncheckedVariance) {
if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) {
println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable)
} else if (this.throwable != null || that.throwable != null) {
println("merging this thr: " + this.throwable + " with " + that + ", thr=" + that.throwable)
}
}
private[parallel] def mergeThrowables(that: Task[_, _]) {
if (this.throwable != null && that.throwable != null) {
this.throwable = this.throwable alongWith that.throwable
} else if (that.throwable != null) this.throwable = that.throwable
else this.throwable = this.throwable
}
private[parallel] def signalAbort() {}
}
trait TaskImpl[R, +Tp] {
val body: Task[R, Tp]
def split: Seq[TaskImpl[R, Tp]]
def compute()
def start()
def sync()
def tryCancel: Boolean
def release() {}
}
protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
var environment: AnyRef
def execute[R, Tp](fjtask: Task[R, Tp]): () => R
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R
def parallelismLevel: Int
}
trait AdaptiveWorkStealingTasks extends Tasks {
trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] {
@volatile var next: TaskImpl[R, Tp] = null
@volatile var shouldWaitFor = true
def split: Seq[TaskImpl[R, Tp]]
def compute() = if (body.shouldSplitFurther) internal else body.tryLeaf(None)
def internal() = {
var last = spawnSubtasks()
last.body.tryLeaf(None)
body.result = last.body.result
body.throwable = last.body.throwable
while (last.next != null) {
val beforelast = last
last = last.next
if (last.tryCancel) {
last.body.tryLeaf(Some(body.result))
last.release
} else {
last.sync
}
body.tryMerge(last.body.repr)
}
}
def spawnSubtasks() = {
var last: TaskImpl[R, Tp] = null
var head: TaskImpl[R, Tp] = this
do {
val subtasks = head.split
head = subtasks.head
for (t <- subtasks.tail.reverse) {
t.next = last
last = t
t.start
}
} while (head.body.shouldSplitFurther);
head.next = last
head
}
def printChain() = {
var curr = this
var chain = "chain: "
while (curr != null) {
chain += curr + " ---> "
curr = curr.next
}
println(chain)
}
}
protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
}
trait ThreadPoolTasks extends Tasks {
import java.util.concurrent._
trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
@volatile var owned = false
@volatile var completed = false
def start() = synchronized {
executor.synchronized {
incrTasks
executor.submit(this)
}
}
def sync() = synchronized {
executor.synchronized {
val coresize = executor.getCorePoolSize
if (coresize < totaltasks) {
executor.setCorePoolSize(coresize + 1)
}
}
if (!completed) this.wait
}
def tryCancel = synchronized {
if (!owned) {
owned = true
true
} else false
}
def run = {
var isOkToRun = false
synchronized {
if (!owned) {
owned = true
isOkToRun = true
}
}
if (isOkToRun) {
compute
release
} else {
}
}
override def release = synchronized {
completed = true
executor.synchronized {
decrTasks
}
this.notifyAll
}
}
protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
var environment: AnyRef = ThreadPoolTasks.defaultThreadPool
def executor = environment.asInstanceOf[ThreadPoolExecutor]
def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]]
@volatile var totaltasks = 0
private def incrTasks() = synchronized {
totaltasks += 1
}
private def decrTasks() = synchronized {
totaltasks -= 1
}
def execute[R, Tp](task: Task[R, Tp]): () => R = {
val t = newTaskImpl(task)
t.start
() => {
t.sync
t.body.forwardThrowable
t.body.result
}
}
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
val t = newTaskImpl(task)
t.start
t.sync
t.body.forwardThrowable
t.body.result
}
def parallelismLevel = ThreadPoolTasks.numCores
}
object ThreadPoolTasks {
import java.util.concurrent._
val numCores = Runtime.getRuntime.availableProcessors
val tcount = new atomic.AtomicLong(0L)
val defaultThreadPool = new ThreadPoolExecutor(
numCores,
Int.MaxValue,
60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable],
new ThreadFactory {
def newThread(r: Runnable) = {
val t = new Thread(r)
t.setName("pc-thread-" + tcount.incrementAndGet)
t.setDaemon(true)
t
}
},
new ThreadPoolExecutor.CallerRunsPolicy
)
}
trait FutureThreadPoolTasks extends Tasks {
import java.util.concurrent._
trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
@volatile var future: Future[_] = null
def start() = {
executor.synchronized {
future = executor.submit(this)
}
}
def sync() = future.get
def tryCancel = false
def run = {
compute
}
}
protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool
def executor = environment.asInstanceOf[ThreadPoolExecutor]
def execute[R, Tp](task: Task[R, Tp]): () => R = {
val t = newTaskImpl(task)
t.start
() => {
t.sync
t.body.forwardThrowable
t.body.result
}
}
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
val t = newTaskImpl(task)
t.start
t.sync
t.body.forwardThrowable
t.body.result
}
def parallelismLevel = FutureThreadPoolTasks.numCores
}
object FutureThreadPoolTasks {
import java.util.concurrent._
val numCores = Runtime.getRuntime.availableProcessors
val tcount = new atomic.AtomicLong(0L)
val defaultThreadPool = Executors.newCachedThreadPool()
}
trait HavingForkJoinPool {
def forkJoinPool: ForkJoinPool
}
trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
trait TaskImpl[R, +Tp] extends RecursiveAction with super.TaskImpl[R, Tp] {
def start() = fork
def sync() = join
def tryCancel = tryUnfork
}
protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
def forkJoinPool: ForkJoinPool = environment.asInstanceOf[ForkJoinPool]
var environment: AnyRef = ForkJoinTasks.defaultForkJoinPool
def execute[R, Tp](task: Task[R, Tp]): () => R = {
val fjtask = newTaskImpl(task)
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
} else {
forkJoinPool.execute(fjtask)
}
() => {
fjtask.sync
fjtask.body.forwardThrowable
fjtask.body.result
}
}
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
val fjtask = newTaskImpl(task)
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
} else {
forkJoinPool.execute(fjtask)
}
fjtask.sync
fjtask.body.forwardThrowable
fjtask.body.result
}
def parallelismLevel = forkJoinPool.getParallelism
}
object ForkJoinTasks {
val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool()
}
trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {
class TaskImpl[R, Tp](val body: Task[R, Tp])
extends super[ForkJoinTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
def split = body.split.map(b => newTaskImpl(b))
}
def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
}
trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks {
class TaskImpl[R, Tp](val body: Task[R, Tp])
extends super[ThreadPoolTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
def split = body.split.map(b => newTaskImpl(b))
}
def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
}