package scala.collection.parallel
import scala.concurrent.forkjoin._
import scala.util.control.Breaks._
import annotation.unchecked.uncheckedVariance
trait Tasks {
  
  private[parallel] val debugMessages = collection.mutable.ArrayBuffer[String]()
  
  private[parallel] def debuglog(s: String) = synchronized {
    debugMessages += s
  }
  
  trait Task[R, +Tp] {
    type Result = R
    
    def repr = this.asInstanceOf[Tp]    
    
    
    def leaf(result: Option[R])    
    
    
    var result: R
    
    
    def shouldSplitFurther: Boolean
    
    
    private[parallel] def split: Seq[Task[R, Tp]]
    
    
    private[parallel] def merge(that: Tp @uncheckedVariance) {}
    
    
    @volatile var throwable: Throwable = null
    def forwardThrowable() = if (throwable != null) throw throwable
    
    
    private[parallel] def tryLeaf(lastres: Option[R]) {
      try {
        tryBreakable {
          leaf(lastres)
          result = result 
        } catchBreak {
          signalAbort
        }
      } catch {
        case thr: Exception =>
          result = result 
          throwable = thr
          signalAbort
      }
    }
    
    private[parallel] def tryMerge(t: Tp @uncheckedVariance) {
      val that = t.asInstanceOf[Task[R, Tp]]
      val local = result 
      
      if (this.throwable == null && that.throwable == null) merge(t)
      mergeThrowables(that)
    }
    
    private def checkMerge(that: Task[R, Tp] @uncheckedVariance) {
      if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) {
        println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable)
      } else if (this.throwable != null || that.throwable != null) {
        println("merging this thr: " + this.throwable + " with " + that + ", thr=" + that.throwable)
      }
    }
    
    private[parallel] def mergeThrowables(that: Task[_, _]) {
      if (this.throwable != null && that.throwable != null) {
        
        this.throwable = this.throwable alongWith that.throwable
      } else if (that.throwable != null) this.throwable = that.throwable
      else this.throwable = this.throwable
    }
    
    
    private[parallel] def signalAbort() {}
  }
  
  trait TaskImpl[R, +Tp] {
    
    val body: Task[R, Tp]
    
    def split: Seq[TaskImpl[R, Tp]]
    
    def compute()
    
    def start()
    
    def sync()
    
    def tryCancel: Boolean
    
    def release() {}
  }
  
  protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
  
  
  
  
  
  var environment: AnyRef
  
  
  def execute[R, Tp](fjtask: Task[R, Tp]): () => R
  
  
  def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R
  
  
  def parallelismLevel: Int
  
}
trait AdaptiveWorkStealingTasks extends Tasks {
  
  trait TaskImpl[R, Tp] extends super.TaskImpl[R, Tp] {
    @volatile var next: TaskImpl[R, Tp] = null
    @volatile var shouldWaitFor = true
    
    def split: Seq[TaskImpl[R, Tp]]
    
    def compute() = if (body.shouldSplitFurther) internal else body.tryLeaf(None)
    
    def internal() = {
      var last = spawnSubtasks()
      
      last.body.tryLeaf(None)
      body.result = last.body.result
      body.throwable = last.body.throwable
      
      while (last.next != null) {
        
        val beforelast = last
        last = last.next
        if (last.tryCancel) {
          
          last.body.tryLeaf(Some(body.result))
          last.release
        } else {
          
          last.sync
        }
        
        body.tryMerge(last.body.repr)
      }
    }
    
    def spawnSubtasks() = {
      var last: TaskImpl[R, Tp] = null
      var head: TaskImpl[R, Tp] = this
      do {
        val subtasks = head.split
        head = subtasks.head
        for (t <- subtasks.tail.reverse) {
          t.next = last
          last = t
          t.start
        }
      } while (head.body.shouldSplitFurther);
      head.next = last
      head
    }
    
    def printChain() = {
      var curr = this
      var chain = "chain: "
      while (curr != null) {
        chain += curr + " ---> "
        curr = curr.next
      }
      println(chain)
    }
  }
  
  
  protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
  
}
trait ThreadPoolTasks extends Tasks {
  import java.util.concurrent._
  
  trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
    
    
    
    @volatile var owned = false
    @volatile var completed = false
    
    def start() = synchronized {
      
      
      executor.synchronized {
        incrTasks
        executor.submit(this)
      }
    }
    def sync() = synchronized {
      
      
      executor.synchronized {
        val coresize = executor.getCorePoolSize
        if (coresize < totaltasks) {
          executor.setCorePoolSize(coresize + 1)
          
        }
      }
      if (!completed) this.wait
    }
    def tryCancel = synchronized {
      
      if (!owned) {
        
        owned = true
        true
      } else false
    }
    def run = {
      
      var isOkToRun = false
      synchronized {
        if (!owned) {
          owned = true
          isOkToRun = true
        }
      }
      if (isOkToRun) {
        
        compute
        release
      } else {
        
        
      }
    }
    override def release = synchronized {
      completed = true
      executor.synchronized {
        decrTasks
      }
      this.notifyAll
    }
  }
  
  protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
  
  var environment: AnyRef = ThreadPoolTasks.defaultThreadPool
  def executor = environment.asInstanceOf[ThreadPoolExecutor]
  def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]]
  @volatile var totaltasks = 0
  
  private def incrTasks() = synchronized {
    totaltasks += 1
  }
  
  private def decrTasks() = synchronized {
    totaltasks -= 1
  }
  
  def execute[R, Tp](task: Task[R, Tp]): () => R = {
    val t = newTaskImpl(task)
    
    
    t.start
    
    () => {
      t.sync
      t.body.forwardThrowable
      t.body.result
    }
  }
  
  def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
    val t = newTaskImpl(task)
    
    
    t.start
    
    t.sync
    t.body.forwardThrowable
    t.body.result
  }
  
  def parallelismLevel = ThreadPoolTasks.numCores
  
}
object ThreadPoolTasks {
  import java.util.concurrent._
  
  val numCores = Runtime.getRuntime.availableProcessors
  
  val tcount = new atomic.AtomicLong(0L)
  
  val defaultThreadPool = new ThreadPoolExecutor(
    numCores,
    Int.MaxValue,
    60L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue[Runnable],
    new ThreadFactory {
      def newThread(r: Runnable) = {
        val t = new Thread(r)
        t.setName("pc-thread-" + tcount.incrementAndGet)
        t.setDaemon(true)
        t
      }
    },
    new ThreadPoolExecutor.CallerRunsPolicy
  )
}
trait FutureThreadPoolTasks extends Tasks {
  import java.util.concurrent._
  
  trait TaskImpl[R, +Tp] extends Runnable with super.TaskImpl[R, Tp] {
    @volatile var future: Future[_] = null
    
    def start() = {
      executor.synchronized {
        future = executor.submit(this)
      }
    }
    def sync() = future.get
    def tryCancel = false
    def run = {
      compute
    }
  }
  
  protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
  
  var environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool
  def executor = environment.asInstanceOf[ThreadPoolExecutor]
  
  def execute[R, Tp](task: Task[R, Tp]): () => R = {
    val t = newTaskImpl(task)
    
    
    t.start
    
    () => {
      t.sync
      t.body.forwardThrowable
      t.body.result
    }
  }
  
  def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
    val t = newTaskImpl(task)
    
    
    t.start
    
    t.sync
    t.body.forwardThrowable
    t.body.result
  }
  
  def parallelismLevel = FutureThreadPoolTasks.numCores
  
}
object FutureThreadPoolTasks {
  import java.util.concurrent._
  
  val numCores = Runtime.getRuntime.availableProcessors
  
  val tcount = new atomic.AtomicLong(0L)
  
  val defaultThreadPool = Executors.newCachedThreadPool()
}
trait HavingForkJoinPool {
  def forkJoinPool: ForkJoinPool
}
trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
  
  trait TaskImpl[R, +Tp] extends RecursiveAction with super.TaskImpl[R, Tp] {
    def start() = fork
    def sync() = join
    def tryCancel = tryUnfork
  }
  
  
  protected def newTaskImpl[R, Tp](b: Task[R, Tp]): TaskImpl[R, Tp]
  
  
  def forkJoinPool: ForkJoinPool = environment.asInstanceOf[ForkJoinPool]
  var environment: AnyRef = ForkJoinTasks.defaultForkJoinPool
  
  
  def execute[R, Tp](task: Task[R, Tp]): () => R = {
    val fjtask = newTaskImpl(task)
    
    if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
      fjtask.fork
    } else {
      forkJoinPool.execute(fjtask)
    }
    
    () => {
      fjtask.sync
      fjtask.body.forwardThrowable
      fjtask.body.result
    }
  }
  
  
  def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
    val fjtask = newTaskImpl(task)
    
    if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
      fjtask.fork
    } else {
      forkJoinPool.execute(fjtask)
    }
    
    fjtask.sync
    
    fjtask.body.forwardThrowable
    fjtask.body.result
  }
  
  def parallelismLevel = forkJoinPool.getParallelism
  
}
object ForkJoinTasks {
  val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() 
  
  
}
trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {
  
  class TaskImpl[R, Tp](val body: Task[R, Tp])
  extends super[ForkJoinTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
    def split = body.split.map(b => newTaskImpl(b))
  }
  
  def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
  
}
trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks {
  
  class TaskImpl[R, Tp](val body: Task[R, Tp])
  extends super[ThreadPoolTasks].TaskImpl[R, Tp] with super[AdaptiveWorkStealingTasks].TaskImpl[R, Tp] {
    def split = body.split.map(b => newTaskImpl(b))
  }
  
  def newTaskImpl[R, Tp](b: Task[R, Tp]) = new TaskImpl[R, Tp](b)
  
}