package kafka.server
import kafka.network.RequestChannel
import kafka.api.{FetchResponse, FetchRequest}
import kafka.common.{UnknownTopicOrPartitionException, NotLeaderForPartitionException, TopicAndPartition}
import scala.collection.immutable.Map
import scala.collection.Seq
class DelayedFetch(override val keys: Seq[TopicAndPartition],
override val request: RequestChannel.Request,
override val delayMs: Long,
val fetch: FetchRequest,
private val partitionFetchOffsets: Map[TopicAndPartition, LogOffsetMetadata])
extends DelayedRequest(keys, request, delayMs) {
def isSatisfied(replicaManager: ReplicaManager) : Boolean = {
var accumulatedSize = 0
val fromFollower = fetch.isFromFollower
partitionFetchOffsets.foreach {
case (topicAndPartition, fetchOffset) =>
try {
if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
val endOffset =
if (fromFollower)
replica.logEndOffset
else
replica.highWatermark
if (endOffset.offsetOnOlderSegment(fetchOffset)) {
debug("Satisfying fetch request %s since it is fetching later segments of partition %s.".format(fetch, topicAndPartition))
return true
} else if (fetchOffset.offsetOnOlderSegment(endOffset)) {
debug("Satisfying fetch request %s immediately since it is fetching older segments.".format(fetch))
return true
} else if (fetchOffset.precedes(endOffset)) {
accumulatedSize += endOffset.positionDiff(fetchOffset)
}
}
} catch {
case utpe: UnknownTopicOrPartitionException =>
debug("Broker no longer know of %s, satisfy %s immediately".format(topicAndPartition, fetch))
return true
case nle: NotLeaderForPartitionException =>
debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetch))
return true
}
}
accumulatedSize >= fetch.minBytes
}
def respond(replicaManager: ReplicaManager): FetchResponse = {
val topicData = replicaManager.readMessageSets(fetch)
FetchResponse(fetch.correlationId, topicData.mapValues(_.data))
}
}