package kafka.producer.async
import kafka.utils.{SystemTime, Logging}
import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
import collection.mutable.ArrayBuffer
import kafka.producer.KeyedMessage
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
class ProducerSendThread[K,V](val threadName: String,
val queue: BlockingQueue[KeyedMessage[K,V]],
val handler: EventHandler[K,V],
val queueTime: Long,
val batchSize: Int,
val clientId: String) extends Thread(threadName) with Logging with KafkaMetricsGroup {
private val shutdownLatch = new CountDownLatch(1)
private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
newGauge("ProducerQueueSize",
new Gauge[Int] {
def value = queue.size
},
Map("clientId" -> clientId))
override def run {
try {
processEvents
}catch {
case e: Throwable => error("Error in sending events: ", e)
}finally {
shutdownLatch.countDown
}
}
def shutdown = {
info("Begin shutting down ProducerSendThread")
queue.put(shutdownCommand)
shutdownLatch.await
info("Shutdown ProducerSendThread complete")
}
private def processEvents() {
var lastSend = SystemTime.milliseconds
var events = new ArrayBuffer[KeyedMessage[K,V]]
var full: Boolean = false
Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
.takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
currentQueueItem =>
val elapsed = (SystemTime.milliseconds - lastSend)
val expired = currentQueueItem == null
if(currentQueueItem != null) {
trace("Dequeued item for topic %s, partition key: %s, data: %s"
.format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
events += currentQueueItem
}
full = events.size >= batchSize
if(full || expired) {
if(expired)
debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full)
debug("Batch full. Sending..")
tryToHandle(events)
lastSend = SystemTime.milliseconds
events = new ArrayBuffer[KeyedMessage[K,V]]
}
}
tryToHandle(events)
if(queue.size > 0)
throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
.format(queue.size))
}
def tryToHandle(events: Seq[KeyedMessage[K,V]]) {
val size = events.size
try {
debug("Handling " + size + " events")
if(size > 0)
handler.handle(events)
}catch {
case e: Throwable => error("Error in handling batch of " + size + " events", e)
}
}
}