package kafka.client
import scala.collection._
import kafka.cluster._
import kafka.api._
import kafka.producer._
import kafka.common.{ErrorMapping, KafkaException}
import kafka.utils.{Utils, Logging}
import java.util.Properties
import util.Random
import kafka.network.BlockingChannel
import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.ZkClient
import java.io.IOException
import org.apache.kafka.common.utils.Utils.{getHost, getPort}
object ClientUtils extends Logging{
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
var fetchMetaDataSucceeded: Boolean = false
var i: Int = 0
val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
var topicMetadataResponse: TopicMetadataResponse = null
var t: Throwable = null
val shuffledBrokers = Random.shuffle(brokers)
while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
try {
topicMetadataResponse = producer.send(topicMetadataRequest)
fetchMetaDataSucceeded = true
}
catch {
case e: Throwable =>
warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
.format(correlationId, topics, shuffledBrokers(i).toString), e)
t = e
} finally {
i = i + 1
producer.close()
}
}
if(!fetchMetaDataSucceeded) {
throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)
} else {
debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
}
return topicMetadataResponse
}
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int,
correlationId: Int = 0): TopicMetadataResponse = {
val props = new Properties()
props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(","))
props.put("client.id", clientId)
props.put("request.timeout.ms", timeoutMs.toString)
val producerConfig = new ProducerConfig(props)
fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
}
def parseBrokerList(brokerListStr: String): Seq[Broker] = {
val = Utils.parseCsvList(brokerListStr)
brokersStr.zipWithIndex.map { case (address, brokerId) =>
new Broker(brokerId, getHost(address), getPort(address))
}
}
def channelToAnyBroker(zkClient: ZkClient, socketTimeoutMs: Int = 3000) : BlockingChannel = {
var channel: BlockingChannel = null
var connected = false
while (!connected) {
val allBrokers = getAllBrokersInCluster(zkClient)
Random.shuffle(allBrokers).find { broker =>
trace("Connecting to broker %s:%d.".format(broker.host, broker.port))
try {
channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs)
channel.connect()
debug("Created channel to broker %s:%d.".format(channel.host, channel.port))
true
} catch {
case e: Exception =>
if (channel != null) channel.disconnect()
channel = null
info("Error while creating channel to %s:%d.".format(broker.host, broker.port))
false
}
}
connected = if (channel == null) false else true
}
channel
}
def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = {
var queryChannel = channelToAnyBroker(zkClient)
var offsetManagerChannelOpt: Option[BlockingChannel] = None
while (!offsetManagerChannelOpt.isDefined) {
var coordinatorOpt: Option[Broker] = None
while (!coordinatorOpt.isDefined) {
try {
if (!queryChannel.isConnected)
queryChannel = channelToAnyBroker(zkClient)
debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
queryChannel.send(ConsumerMetadataRequest(group))
val response = queryChannel.receive()
val consumerMetadataResponse = ConsumerMetadataResponse.readFrom(response.buffer)
debug("Consumer metadata response: " + consumerMetadataResponse.toString)
if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
coordinatorOpt = consumerMetadataResponse.coordinatorOpt
else {
debug("Query to %s:%d to locate offset manager for %s failed - will retry in %d milliseconds."
.format(queryChannel.host, queryChannel.port, group, retryBackOffMs))
Thread.sleep(retryBackOffMs)
}
}
catch {
case ioe: IOException =>
info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port))
queryChannel.disconnect()
}
}
val coordinator = coordinatorOpt.get
if (coordinator.host == queryChannel.host && coordinator.port == queryChannel.port) {
offsetManagerChannelOpt = Some(queryChannel)
} else {
val connectString = "%s:%d".format(coordinator.host, coordinator.port)
var offsetManagerChannel: BlockingChannel = null
try {
debug("Connecting to offset manager %s.".format(connectString))
offsetManagerChannel = new BlockingChannel(coordinator.host, coordinator.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
socketTimeoutMs)
offsetManagerChannel.connect()
offsetManagerChannelOpt = Some(offsetManagerChannel)
queryChannel.disconnect()
}
catch {
case ioe: IOException =>
info("Error while connecting to %s.".format(connectString))
if (offsetManagerChannel != null) offsetManagerChannel.disconnect()
Thread.sleep(retryBackOffMs)
offsetManagerChannelOpt = None
}
}
}
offsetManagerChannelOpt.get
}
}