package kafka.consumer
import kafka.cluster.Broker
import kafka.server.AbstractFetcherThread
import kafka.message.ByteBufferMessageSet
import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData}
import kafka.common.TopicAndPartition
class ConsumerFetcherThread(name: String,
val config: ConsumerConfig,
sourceBroker: Broker,
partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
val consumerFetcherManager: ConsumerFetcherManager)
extends AbstractFetcherThread(name = name,
clientId = config.clientId,
sourceBroker = sourceBroker,
socketTimeout = config.socketTimeoutMs,
socketBufferSize = config.socketReceiveBufferBytes,
fetchSize = config.fetchMessageMaxBytes,
fetcherBrokerId = Request.OrdinaryConsumerId,
maxWait = config.fetchWaitMaxMs,
minBytes = config.fetchMinBytes,
isInterruptible = true) {
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
val pti = partitionMap(topicAndPartition)
if (pti.getFetchOffset != fetchOffset)
throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
.format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
}
def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
var startTimestamp : Long = 0
config.autoOffsetReset match {
case OffsetRequest.SmallestTimeString => startTimestamp = OffsetRequest.EarliestTime
case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime
case _ => startTimestamp = OffsetRequest.LatestTime
}
val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId)
val pti = partitionMap(topicAndPartition)
pti.resetFetchOffset(newOffset)
pti.resetConsumeOffset(newOffset)
newOffset
}
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
removePartitions(partitions.toSet)
consumerFetcherManager.addPartitionsWithError(partitions)
}
}