package kafka.producer
import collection.mutable.HashMap
import kafka.api.TopicMetadata
import kafka.common.KafkaException
import kafka.utils.Logging
import kafka.common.ErrorMapping
import kafka.client.ClientUtils
class BrokerPartitionInfo(producerConfig: ProducerConfig,
producerPool: ProducerPool,
topicPartitionInfo: HashMap[String, TopicMetadata])
extends Logging {
val brokerList = producerConfig.brokerList
val brokers = ClientUtils.parseBrokerList(brokerList)
def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {
debug("Getting broker partition info for topic %s".format(topic))
val topicMetadata = topicPartitionInfo.get(topic)
val metadata: TopicMetadata =
topicMetadata match {
case Some(m) => m
case None =>
updateInfo(Set(topic), correlationId)
val topicMetadata = topicPartitionInfo.get(topic)
topicMetadata match {
case Some(m) => m
case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic)
}
}
val partitionMetadata = metadata.partitionsMetadata
if(partitionMetadata.size == 0) {
if(metadata.errorCode != ErrorMapping.NoError) {
throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))
} else {
throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
}
}
partitionMetadata.map { m =>
m.leader match {
case Some(leader) =>
debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id))
new PartitionAndLeader(topic, m.partitionId, Some(leader.id))
case None =>
debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId))
new PartitionAndLeader(topic, m.partitionId, None)
}
}.sortWith((s, t) => s.partitionId < t.partitionId)
}
def updateInfo(topics: Set[String], correlationId: Int) {
var topicsMetadata: Seq[TopicMetadata] = Nil
val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
topicsMetadata = topicMetadataResponse.topicsMetadata
topicsMetadata.foreach(tmd =>{
trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
if(tmd.errorCode == ErrorMapping.NoError) {
topicPartitionInfo.put(tmd.topic, tmd)
} else
warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
tmd.partitionsMetadata.foreach(pmd =>{
if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
ErrorMapping.exceptionFor(pmd.errorCode).getClass))
}
})
})
producerPool.updateProducer(topicsMetadata)
}
}
case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int])