package kafka.producer
import kafka.metrics.KafkaMetricsGroup
import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
import kafka.utils.{Pool, threadsafe}
import java.util.concurrent.TimeUnit
@threadsafe
class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
val tags = metricId match {
case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
case ClientIdAllTopics(clientId) => Map("clientId" -> clientId)
}
val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS, tags)
}
class ProducerTopicStats(clientId: String) {
private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics(clientId))
def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
}
}
object ProducerTopicStatsRegistry {
private val valueFactory = (k: String) => new ProducerTopicStats(k)
private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory))
def getProducerTopicStats(clientId: String) = {
globalStats.getAndMaybePut(clientId)
}
def removeProducerTopicStats(clientId: String) {
globalStats.remove(clientId)
}
}