package kafka.consumer
import kafka.api._
import kafka.network._
import kafka.utils._
import kafka.common.{ErrorMapping, TopicAndPartition}
import org.apache.kafka.common.utils.Utils._
@threadsafe
class SimpleConsumer(val host: String,
val port: Int,
val soTimeout: Int,
val bufferSize: Int,
val clientId: String) extends Logging {
ConsumerConfig.validateClientId(clientId)
private val lock = new Object()
private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
private var isClosed = false
private def connect(): BlockingChannel = {
close
blockingChannel.connect()
blockingChannel
}
private def disconnect() = {
debug("Disconnecting from " + formatAddress(host, port))
blockingChannel.disconnect()
}
private def reconnect() {
disconnect()
connect()
}
def close() {
lock synchronized {
disconnect()
isClosed = true
}
}
private def sendRequest(request: RequestOrResponse): Receive = {
lock synchronized {
var response: Receive = null
try {
getOrMakeConnection()
blockingChannel.send(request)
response = blockingChannel.receive()
} catch {
case e : Throwable =>
info("Reconnect due to socket error: %s".format(e.toString))
try {
reconnect()
blockingChannel.send(request)
response = blockingChannel.receive()
} catch {
case e: Throwable =>
disconnect()
throw e
}
}
response
}
}
def send(request: TopicMetadataRequest): TopicMetadataResponse = {
val response = sendRequest(request)
TopicMetadataResponse.readFrom(response.buffer)
}
def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = {
val response = sendRequest(request)
ConsumerMetadataResponse.readFrom(response.buffer)
}
def fetch(request: FetchRequest): FetchResponse = {
var response: Receive = null
val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer
val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer
aggregateTimer.time {
specificTimer.time {
response = sendRequest(request)
}
}
val fetchResponse = FetchResponse.readFrom(response.buffer)
val fetchedSize = fetchResponse.sizeInBytes
fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
fetchResponse
}
def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer)
def commitOffsets(request: OffsetCommitRequest) = {
OffsetCommitResponse.readFrom(sendRequest(request).buffer)
}
def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer)
private def getOrMakeConnection() {
if(!isClosed && !blockingChannel.isConnected) {
connect()
}
}
def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = {
val request = OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
clientId = clientId,
replicaId = consumerId)
val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
val offset = partitionErrorAndOffset.error match {
case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
}
offset
}
}