package kafka.consumer
import org.I0Itec.zkclient.ZkClient
import kafka.common.TopicAndPartition
import kafka.utils.{Utils, ZkUtils, Logging}
trait PartitionAssignor {
def assign(ctx: AssignmentContext): scala.collection.Map[TopicAndPartition, ConsumerThreadId]
}
object PartitionAssignor {
def createInstance(assignmentStrategy: String) = assignmentStrategy match {
case "roundrobin" => new RoundRobinAssignor()
case _ => new RangeAssignor()
}
}
class AssignmentContext(group: String, val consumerId: String, excludeInternalTopics: Boolean, zkClient: ZkClient) {
val myTopicThreadIds: collection.Map[String, collection.Set[ConsumerThreadId]] = {
val myTopicCount = TopicCount.constructTopicCount(group, consumerId, zkClient, excludeInternalTopics)
myTopicCount.getConsumerThreadIdsPerTopic
}
val partitionsForTopic: collection.Map[String, Seq[Int]] =
ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIds.keySet.toSeq)
val consumersForTopic: collection.Map[String, List[ConsumerThreadId]] =
ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics)
val consumers: Seq[String] = ZkUtils.getConsumersInGroup(zkClient, group).sorted
}
class RoundRobinAssignor() extends PartitionAssignor with Logging {
def assign(ctx: AssignmentContext) = {
val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet)
ctx.consumersForTopic.foreach { case (topic, threadIds) =>
val threadIdSet = threadIds.toSet
require(threadIdSet == headThreadIdSet,
"Round-robin assignment is allowed only if all consumers in the group subscribe to the same topics, " +
"AND if the stream counts across topics are identical for a given consumer instance.\n" +
"Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet) +
"Topic %s has the following available consumer streams: %s\n".format(headTopic, headThreadIdSet))
}
val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted)
info("Starting round-robin assignment with consumers " + ctx.consumers)
val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
info("Consumer %s rebalancing the following partitions for topic %s: %s"
.format(ctx.consumerId, topic, partitions))
partitions.map(partition => {
TopicAndPartition(topic, partition)
})
}.toSeq.sortWith((topicPartition1, topicPartition2) => {
topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
})
allTopicPartitions.foreach(topicPartition => {
val threadId = threadAssignor.next()
if (threadId.consumer == ctx.consumerId)
partitionOwnershipDecision += (topicPartition -> threadId)
})
partitionOwnershipDecision
}
}
class RangeAssignor() extends PartitionAssignor with Logging {
def assign(ctx: AssignmentContext) = {
val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) {
val curConsumers = ctx.consumersForTopic(topic)
val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)
val nPartsPerConsumer = curPartitions.size / curConsumers.size
val nConsumersWithExtraPart = curPartitions.size curConsumers.size
info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
" for topic " + topic + " with consumers: " + curConsumers)
for (consumerThreadId <- consumerThreadIdSet) {
val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
assert(myConsumerPosition >= 0)
val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
if (nParts <= 0)
warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
else {
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
}
}
}
}
partitionOwnershipDecision
}
}