package kafka.server
import scala.collection.mutable
import scala.collection.Set
import scala.collection.Map
import kafka.utils.{Utils, Logging}
import kafka.cluster.Broker
import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition
import com.yammer.metrics.core.Gauge
abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1)
extends Logging with KafkaMetricsGroup {
private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
private val mapLock = new Object
this.logIdent = "[" + name + "] "
newGauge(
"MaxLag",
new Gauge[Long] {
def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
fetcherThreadMapEntry._2.fetcherLagStats.stats.foldLeft(0L)((curMaxThread, fetcherLagStatsEntry) => {
curMaxThread.max(fetcherLagStatsEntry._2.lag)
}).max(curMaxAll)
})
},
Map("clientId" -> clientId)
)
newGauge(
"MinFetchRate", {
new Gauge[Double] {
def value = {
val headRate: Double =
fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => {
fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
})
}
}
},
Map("clientId" -> clientId)
)
private def getFetcherId(topic: String, partitionId: Int) : Int = {
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
}
def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread
def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
mapLock synchronized {
val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
var fetcherThread: AbstractFetcherThread = null
fetcherThreadMap.get(brokerAndFetcherId) match {
case Some(f) => fetcherThread = f
case None =>
fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
fetcherThread.start
}
fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) =>
topicAndPartition -> brokerAndInitOffset.initOffset
})
}
}
info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
"[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
}
def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) {
mapLock synchronized {
for ((key, fetcher) <- fetcherThreadMap) {
fetcher.removePartitions(partitions)
}
}
info("Removed fetcher for partitions %s".format(partitions.mkString(",")))
}
def shutdownIdleFetcherThreads() {
mapLock synchronized {
val keysToBeRemoved = new mutable.HashSet[BrokerAndFetcherId]
for ((key, fetcher) <- fetcherThreadMap) {
if (fetcher.partitionCount <= 0) {
fetcher.shutdown()
keysToBeRemoved += key
}
}
fetcherThreadMap --= keysToBeRemoved
}
}
def closeAllFetchers() {
mapLock synchronized {
for ( (_, fetcher) <- fetcherThreadMap) {
fetcher.shutdown()
}
fetcherThreadMap.clear()
}
}
}
case class BrokerAndFetcherId(broker: Broker, fetcherId: Int)
case class BrokerAndInitialOffset(broker: Broker, initOffset: Long)