package kafka.server
import kafka.metrics.KafkaMetricsGroup
import kafka.network.RequestChannel
import kafka.api.FetchResponseSend
import java.util.concurrent.TimeUnit
class FetchRequestPurgatory(replicaManager: ReplicaManager, requestChannel: RequestChannel)
extends RequestPurgatory[DelayedFetch](replicaManager.config.brokerId, replicaManager.config.fetchPurgatoryPurgeIntervalRequests) {
this.logIdent = "[FetchRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
private val metricPrefix = if (forFollower) "Follower" else "Consumer"
val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
}
private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
private def recordDelayedFetchExpired(forFollower: Boolean) {
val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
else aggregateNonFollowerFetchRequestMetrics
metrics.expiredRequestMeter.mark()
}
def checkSatisfied(delayedFetch: DelayedFetch): Boolean = delayedFetch.isSatisfied(replicaManager)
def expire(delayedFetch: DelayedFetch) {
debug("Expiring fetch request %s.".format(delayedFetch.fetch))
val fromFollower = delayedFetch.fetch.isFromFollower
recordDelayedFetchExpired(fromFollower)
respond(delayedFetch)
}
def respond(delayedFetch: DelayedFetch) {
val response = delayedFetch.respond(replicaManager)
requestChannel.sendResponse(new RequestChannel.Response(delayedFetch.request, new FetchResponseSend(response)))
}
}