package kafka.consumer
import org.I0Itec.zkclient.ZkClient
import kafka.server.{BrokerAndInitialOffset, AbstractFetcherThread, AbstractFetcherManager}
import kafka.cluster.{Cluster, Broker}
import scala.collection.immutable
import scala.collection.Map
import collection.mutable.HashMap
import scala.collection.mutable
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.Utils.inLock
import kafka.utils.ZkUtils._
import kafka.utils.{ShutdownableThread, SystemTime}
import kafka.common.TopicAndPartition
import kafka.client.ClientUtils
import java.util.concurrent.atomic.AtomicInteger
class ConsumerFetcherManager(private val consumerIdString: String,
private val config: ConsumerConfig,
private val zkClient : ZkClient)
extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
config.clientId, config.numConsumerFetchers) {
private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
private var cluster: Cluster = null
private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
private val lock = new ReentrantLock
private val cond = lock.newCondition()
private var leaderFinderThread: ShutdownableThread = null
private val correlationId = new AtomicInteger(0)
private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
override def doWork() {
val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
lock.lock()
try {
while (noLeaderPartitionSet.isEmpty) {
trace("No partition for leader election.")
cond.await()
}
trace("Partitions without leader %s".format(noLeaderPartitionSet))
val brokers = getAllBrokersInCluster(zkClient)
val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
brokers,
config.clientId,
config.socketTimeoutMs,
correlationId.getAndIncrement).topicsMetadata
if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
topicsMetadata.foreach { tmd =>
val topic = tmd.topic
tmd.partitionsMetadata.foreach { pmd =>
val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
val leaderBroker = pmd.leader.get
leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
noLeaderPartitionSet -= topicAndPartition
}
}
}
} catch {
case t: Throwable => {
if (!isRunning.get())
throw t
else
warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
}
} finally {
lock.unlock()
}
try {
addFetcherForPartitions(leaderForPartitionsMap.map{
case (topicAndPartition, broker) =>
topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())}
)
} catch {
case t: Throwable => {
if (!isRunning.get())
throw t
else {
warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t)
lock.lock()
noLeaderPartitionSet ++= leaderForPartitionsMap.keySet
lock.unlock()
}
}
}
shutdownIdleFetcherThreads()
Thread.sleep(config.refreshLeaderBackoffMs)
}
}
override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
new ConsumerFetcherThread(
"ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
config, sourceBroker, partitionMap, this)
}
def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
leaderFinderThread.start()
inLock(lock) {
partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap
this.cluster = cluster
noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
cond.signalAll()
}
}
def stopConnections() {
info("Stopping leader finder thread")
if (leaderFinderThread != null) {
leaderFinderThread.shutdown()
leaderFinderThread = null
}
info("Stopping all fetchers")
closeAllFetchers()
partitionMap = null
noLeaderPartitionSet.clear()
info("All connections stopped")
}
def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
debug("adding partitions with error %s".format(partitionList))
inLock(lock) {
if (partitionMap != null) {
noLeaderPartitionSet ++= partitionList
cond.signalAll()
}
}
}
}