package kafka.consumer
import kafka.utils.{IteratorTemplate, Logging, Utils}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import kafka.serializer.Decoder
import java.util.concurrent.atomic.AtomicReference
import kafka.message.{MessageAndOffset, MessageAndMetadata}
import kafka.common.{KafkaException, MessageSizeTooLargeException}
class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val keyDecoder: Decoder[K],
private val valueDecoder: Decoder[V],
val clientId: String)
extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
private var currentTopicInfo: PartitionTopicInfo = null
private var consumedOffset: Long = -1L
private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
override def next(): MessageAndMetadata[K, V] = {
val item = super.next()
if(consumedOffset < 0)
throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
currentTopicInfo.resetConsumeOffset(consumedOffset)
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
item
}
protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
var localCurrent = current.get()
if(localCurrent == null || !localCurrent.hasNext) {
if (consumerTimeoutMs < 0)
currentDataChunk = channel.take
else {
currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
if (currentDataChunk == null) {
resetState()
throw new ConsumerTimeoutException
}
}
if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
debug("Received the shutdown command")
return allDone
} else {
currentTopicInfo = currentDataChunk.topicInfo
val cdcFetchOffset = currentDataChunk.fetchOffset
val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
if (ctiConsumeOffset < cdcFetchOffset) {
error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
.format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
}
localCurrent = currentDataChunk.messages.iterator
current.set(localCurrent)
}
if(currentDataChunk.messages.validBytes == 0)
throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
"%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
.format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
}
var item = localCurrent.next()
while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
item = localCurrent.next()
}
consumedOffset = item.nextOffset
item.message.ensureValid()
new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
}
def clearCurrentChunk() {
try {
debug("Clearing the current data chunk for this consumer iterator")
current.set(null)
}
}
}
class ConsumerTimeoutException() extends RuntimeException()