package kafka.metrics
import java.util.concurrent.TimeUnit
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{Gauge, MetricName}
import kafka.consumer.{ConsumerTopicStatsRegistry, FetchRequestAndResponseStatsRegistry}
import kafka.producer.{ProducerRequestStatsRegistry, ProducerStatsRegistry, ProducerTopicStatsRegistry}
import kafka.utils.Logging
import scala.collection.immutable
trait KafkaMetricsGroup extends Logging {
private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
val klass = this.getClass
val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
explicitMetricName(pkg, simpleName, name, tags)
}
private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
val nameBuilder: StringBuilder = new StringBuilder
nameBuilder.append(group)
nameBuilder.append(":type=")
nameBuilder.append(typeName)
if (name.length > 0) {
nameBuilder.append(",name=")
nameBuilder.append(name)
}
val scope: String = KafkaMetricsGroup.toScope(tags).getOrElse(null)
val tagsName = KafkaMetricsGroup.toMBeanName(tags)
tagsName match {
case Some(tn) =>
nameBuilder.append(",").append(tn)
case None =>
}
new MetricName(group, typeName, name, scope, nameBuilder.toString())
}
def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) =
Metrics.defaultRegistry().newGauge(metricName(name, tags), metric)
def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
Metrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit)
def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty) =
Metrics.defaultRegistry().newHistogram(metricName(name, tags), biased)
def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
Metrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit)
def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty) =
Metrics.defaultRegistry().removeMetric(metricName(name, tags))
}
object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "FetchQueueSize"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "KafkaCommitsPerSec"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "ZooKeeperCommitsPerSec"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "RebalanceRateAndTime"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "OwnedPartitionsCount"),
new MetricName("kafka.consumer", "ConsumerFetcherManager", "MaxLag"),
new MetricName("kafka.consumer", "ConsumerFetcherManager", "MinFetchRate"),
new MetricName("kafka.server", "FetcherLagMetrics", "ConsumerLag"),
new MetricName("kafka.consumer", "ConsumerTopicMetrics", "MessagesPerSec"),
new MetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),
new MetricName("kafka.server", "FetcherStats", "BytesPerSec"),
new MetricName("kafka.server", "FetcherStats", "RequestsPerSec"),
new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"),
new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
)
private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
new MetricName("kafka.producer", "ProducerStats", "SerializationErrorsPerSec"),
new MetricName("kafka.producer", "ProducerStats", "ResendsPerSec"),
new MetricName("kafka.producer", "ProducerStats", "FailedSendsPerSec"),
new MetricName("kafka.producer.async", "ProducerSendThread", "ProducerQueueSize"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "MessagesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "DroppedMessagesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "BytesPerSec"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
)
private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
val filteredTags = tags
.filter { case (tagKey, tagValue) => tagValue != ""}
if (filteredTags.nonEmpty) {
val tagsString = filteredTags
.map { case (key, value) => "%s=%s".format(key, value)}
.mkString(",")
Some(tagsString)
}
else {
None
}
}
private def toScope(tags: collection.Map[String, String]): Option[String] = {
val filteredTags = tags
.filter { case (tagKey, tagValue) => tagValue != ""}
if (filteredTags.nonEmpty) {
val tagsString = filteredTags
.toList.sortWith((t1, t2) => t1._1 < t2._1)
.map { case (key, value) => "%s.%s".format(key, value.replaceAll("\\.", "_"))}
.mkString(".")
Some(tagsString)
}
else {
None
}
}
def removeAllConsumerMetrics(clientId: String) {
FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId)
}
def removeAllProducerMetrics(clientId: String) {
ProducerRequestStatsRegistry.removeProducerRequestStats(clientId)
ProducerTopicStatsRegistry.removeProducerTopicStats(clientId)
ProducerStatsRegistry.removeProducerStats(clientId)
removeAllMetricsInList(KafkaMetricsGroup.producerMetricNameList, clientId)
}
private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
metricNameList.foreach(metric => {
val pattern = (".*clientId=" + clientId + ".*").r
val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet())
for (registeredMetric <- registeredMetrics) {
if (registeredMetric.getGroup == metric.getGroup &&
registeredMetric.getName == metric.getName &&
registeredMetric.getType == metric.getType) {
pattern.findFirstIn(registeredMetric.getMBeanName) match {
case Some(_) => {
val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
Metrics.defaultRegistry().removeMetric(registeredMetric)
val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
trace("Removing metric %s. Metrics registry size reduced from %d to %d".format(
registeredMetric, beforeRemovalSize, afterRemovalSize))
}
case _ =>
}
}
}
})
}
}