package kafka.utils
import java.util.concurrent._
import atomic._
import collection.mutable.HashMap
trait Scheduler {
def startup()
def shutdown()
def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
}
@threadsafe
class KafkaScheduler(val threads: Int,
val threadNamePrefix: String = "kafka-scheduler-",
daemon: Boolean = true) extends Scheduler with Logging {
@volatile private var executor: ScheduledThreadPoolExecutor = null
private val schedulerThreadId = new AtomicInteger(0)
override def startup() {
debug("Initializing task scheduler.")
this synchronized {
if(executor != null)
throw new IllegalStateException("This scheduler has already been started!")
executor = new ScheduledThreadPoolExecutor(threads)
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
executor.setThreadFactory(new ThreadFactory() {
def newThread(runnable: Runnable): Thread =
Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
})
}
}
override def shutdown() {
debug("Shutting down task scheduler.")
ensureStarted
executor.shutdown()
executor.awaitTermination(1, TimeUnit.DAYS)
this.executor = null
}
def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
debug("Scheduling task %s with initial delay %d ms and period %d ms."
.format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
ensureStarted
val runnable = Utils.runnable {
try {
trace("Begining execution of scheduled task '%s'.".format(name))
fun()
} catch {
case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
} finally {
trace("Completed execution of scheduled task '%s'.".format(name))
}
}
if(period >= 0)
executor.scheduleAtFixedRate(runnable, delay, period, unit)
else
executor.schedule(runnable, delay, unit)
}
private def ensureStarted = {
if(executor == null)
throw new IllegalStateException("Kafka scheduler has not been started")
}
}