package kafka.producer
import kafka.api._
import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
import kafka.utils._
import java.util.Random
import org.apache.kafka.common.utils.Utils._
object SyncProducer {
val RequestKey: Short = 0
val randomGenerator = new Random
}
@threadsafe
class SyncProducer(val config: SyncProducerConfig) extends Logging {
private val lock = new Object()
@volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
config.sendBufferBytes, config.requestTimeoutMs)
val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
trace("Instantiating Scala Sync Producer with properties: %s".format(config.props))
private def verifyRequest(request: RequestOrResponse) = {
if (logger.isDebugEnabled) {
val buffer = new BoundedByteBufferSend(request).buffer
trace("verifying sendbuffer of size " + buffer.limit)
val requestTypeId = buffer.getShort()
if(requestTypeId == RequestKeys.ProduceKey) {
val request = ProducerRequest.readFrom(buffer)
trace(request.toString)
}
}
}
private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = {
lock synchronized {
verifyRequest(request)
getOrMakeConnection()
var response: Receive = null
try {
blockingChannel.send(request)
if(readResponse)
response = blockingChannel.receive()
else
trace("Skipping reading response")
} catch {
case e: java.io.IOException =>
disconnect()
throw e
case e: Throwable => throw e
}
response
}
}
def send(producerRequest: ProducerRequest): ProducerResponse = {
val requestSize = producerRequest.sizeInBytes
producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize)
producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
var response: Receive = null
val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer
val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
aggregateTimer.time {
specificTimer.time {
response = doSend(producerRequest, if(producerRequest.requiredAcks == 0) false else true)
}
}
if(producerRequest.requiredAcks != 0)
ProducerResponse.readFrom(response.buffer)
else
null
}
def send(request: TopicMetadataRequest): TopicMetadataResponse = {
val response = doSend(request)
TopicMetadataResponse.readFrom(response.buffer)
}
def close() = {
lock synchronized {
disconnect()
shutdown = true
}
}
private def disconnect() {
try {
info("Disconnecting from " + formatAddress(config.host, config.port))
blockingChannel.disconnect()
} catch {
case e: Exception => error("Error on disconnect: ", e)
}
}
private def connect(): BlockingChannel = {
if (!blockingChannel.isConnected && !shutdown) {
try {
blockingChannel.connect()
info("Connected to " + formatAddress(config.host, config.port) + " for producing")
} catch {
case e: Exception => {
disconnect()
error("Producer connection to " + formatAddress(config.host, config.port) + " unsuccessful", e)
throw e
}
}
}
blockingChannel
}
private def getOrMakeConnection() {
if(!blockingChannel.isConnected) {
connect()
}
}
}