package kafka.controller
import collection._
import collection.Set
import com.yammer.metrics.core.Gauge
import java.lang.{IllegalStateException, Object}
import java.util.concurrent.TimeUnit
import kafka.admin.AdminUtils
import kafka.admin.PreferredReplicaLeaderElectionCommand
import kafka.api._
import kafka.cluster.Broker
import kafka.common._
import kafka.log.LogConfig
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import kafka.utils.ZkUtils._
import kafka.utils._
import kafka.utils.Utils._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import scala.None
import kafka.server._
import scala.Some
import kafka.common.TopicAndPartition
class ControllerContext(val zkClient: ZkClient,
val zkSessionTimeout: Int) {
var controllerChannelManager: ControllerChannelManager = null
val controllerLock: ReentrantLock = new ReentrantLock()
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
val brokerShutdownLock: Object = new Object
var epoch: Int = KafkaController.InitialControllerEpoch - 1
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
val correlationId: AtomicInteger = new AtomicInteger(0)
var allTopics: Set[String] = Set.empty
var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
private var liveBrokersUnderlying: Set[Broker] = Set.empty
private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
def liveBrokers_=(brokers: Set[Broker]) {
liveBrokersUnderlying = brokers
liveBrokerIdsUnderlying = liveBrokersUnderlying.map(_.id)
}
def liveBrokers = liveBrokersUnderlying.filter(broker => !shuttingDownBrokerIds.contains(broker.id))
def liveBrokerIds = liveBrokerIdsUnderlying.filter(brokerId => !shuttingDownBrokerIds.contains(brokerId))
def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
def liveOrShuttingDownBrokers = liveBrokersUnderlying
def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
partitionReplicaAssignment
.filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
.map { case(topicAndPartition, replicas) => topicAndPartition }
.toSet
}
def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
brokerIds.map { brokerId =>
partitionReplicaAssignment
.filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
.map { case(topicAndPartition, replicas) =>
new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) }
}.flatten.toSet
}
def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
partitionReplicaAssignment
.filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
.map { case(topicAndPartition, replicas) =>
replicas.map { r =>
new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
}
}.flatten.toSet
}
def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
partitionReplicaAssignment
.filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet
}
def allLiveReplicas(): Set[PartitionAndReplica] = {
replicasOnBrokers(liveBrokerIds)
}
def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
partitions.map { p =>
val replicas = partitionReplicaAssignment(p)
replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
}.flatten
}
def removeTopic(topic: String) = {
partitionLeadershipInfo = partitionLeadershipInfo.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }
partitionReplicaAssignment = partitionReplicaAssignment.filter{ case (topicAndPartition, _) => topicAndPartition.topic != topic }
allTopics -= topic
}
}
object KafkaController extends Logging {
val stateChangeLogger = new StateChangeLogger("state.change.logger")
val InitialControllerEpoch = 1
val InitialControllerEpochZkVersion = 1
case class StateChangeLogger(override val loggerName: String) extends Logging
def parseControllerId(controllerInfoString: String): Int = {
try {
Json.parseFull(controllerInfoString) match {
case Some(m) =>
val controllerInfo = m.asInstanceOf[Map[String, Any]]
return controllerInfo.get("brokerid").get.asInstanceOf[Int]
case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
}
} catch {
case t: Throwable =>
warn("Failed to parse the controller info as json. "
+ "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
try {
return controllerInfoString.toInt
} catch {
case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
}
}
}
}
class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
onControllerResignation, config.brokerId)
private val autoRebalanceScheduler = new KafkaScheduler(1)
var deleteTopicManager: TopicDeletionManager = null
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
private val partitionReassignedListener = new PartitionsReassignedListener(this)
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
newGauge(
"ActiveControllerCount",
new Gauge[Int] {
def value() = if (isActive) 1 else 0
}
)
newGauge(
"OfflinePartitionsCount",
new Gauge[Int] {
def value(): Int = {
inLock(controllerContext.controllerLock) {
if (!isActive())
0
else
controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader))
}
}
}
)
newGauge(
"PreferredReplicaImbalanceCount",
new Gauge[Int] {
def value(): Int = {
inLock(controllerContext.controllerLock) {
if (!isActive())
0
else
controllerContext.partitionReplicaAssignment.count {
case (topicPartition, replicas) => controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != replicas.head
}
}
}
}
)
def epoch = controllerContext.epoch
def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port)
def shutdownBroker(id: Int) : Set[TopicAndPartition] = {
if (!isActive()) {
throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
}
controllerContext.brokerShutdownLock synchronized {
info("Shutting down broker " + id)
inLock(controllerContext.controllerLock) {
if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
controllerContext.shuttingDownBrokerIds.add(id)
debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
}
val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
inLock(controllerContext.controllerLock) {
controllerContext.partitionsOnBroker(id)
.map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
}
allPartitionsAndReplicationFactorOnBroker.foreach {
case(topicAndPartition, replicationFactor) =>
inLock(controllerContext.controllerLock) {
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
if (replicationFactor > 1) {
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
controlledShutdownPartitionLeaderSelector)
} else {
brokerRequestBatch.newBatch()
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
topicAndPartition.partition, deletePartition = false)
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
topicAndPartition.partition, id)), OfflineReplica)
}
}
}
}
}
def replicatedPartitionsBrokerLeads() = inLock(controllerContext.controllerLock) {
trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
controllerContext.partitionLeadershipInfo.filter {
case (topicAndPartition, leaderIsrAndControllerEpoch) =>
leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
}.map(_._1)
}
replicatedPartitionsBrokerLeads().toSet
}
}
def onControllerFailover() {
if(isRunning) {
info("Broker %d starting become controller state transition".format(config.brokerId))
readControllerEpochFromZookeeper()
incrementControllerEpoch(zkClient)
registerReassignedPartitionsListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
replicaStateMachine.startup()
partitionStateMachine.startup()
controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
brokerState.newState(RunningAsController)
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
}
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}
def onControllerResignation() {
deregisterReassignedPartitionsListener()
deregisterPreferredReplicaElectionListener()
if (deleteTopicManager != null)
deleteTopicManager.shutdown()
if (config.autoLeaderRebalanceEnable)
autoRebalanceScheduler.shutdown()
inLock(controllerContext.controllerLock) {
deregisterReassignedPartitionsIsrChangeListeners()
partitionStateMachine.shutdown()
replicaStateMachine.shutdown()
if(controllerContext.controllerChannelManager != null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
controllerContext.epoch=0
controllerContext.epochZkVersion=0
brokerState.newState(RunningAsBroker)
}
}
def isActive(): Boolean = {
inLock(controllerContext.controllerLock) {
controllerContext.controllerChannelManager != null
}
}
def onBrokerStartup(newBrokers: Seq[Int]) {
info("New broker startup callback for %s".format(newBrokers.mkString(",")))
val = newBrokers.toSet
sendUpdateMetadataRequest(newBrokers)
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
partitionStateMachine.triggerOnlinePartitionStateChange()
val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
}
partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if(replicasForTopicsToBeDeleted.size > 0) {
info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
"Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
}
}
def onBrokerFailure(deadBrokers: Seq[Int]) {
info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
val deadBrokersThatWereShuttingDown =
deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
val = deadBrokers.toSet
val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
!deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
partitionStateMachine.triggerOnlinePartitionStateChange()
var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if(replicasForTopicsToBeDeleted.size > 0) {
deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
}
}
def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
info("New topic creation callback for %s".format(newPartitions.mkString(",")))
topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
onNewPartitionCreation(newPartitions)
}
def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
info("New partition creation callback for %s".format(newPartitions.mkString(",")))
partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
case false =>
info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned not yet caught up with the leader")
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
newAndOldReplicas.toSeq)
startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
"reassigned to catch up with the leader")
case true =>
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
replica)), OnlineReplica)
}
moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
}
}
private def watchIsrChangesForReassignedPartition(topic: String,
partition: Int,
reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
val isrChangeListener = new ReassignedPartitionsIsrChangeListener(this, topic, partition,
reassignedReplicas.toSet)
reassignedPartitionContext.isrChangeListener = isrChangeListener
zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
}
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val newReplicas = reassignedPartitionContext.newReplicas
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
try {
val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
assignedReplicasOpt match {
case Some(assignedReplicas) =>
if(assignedReplicas == newReplicas) {
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} else {
if(aliveNewReplicas == newReplicas) {
info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
} else {
throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
" %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
"Failing partition reassignment")
}
}
case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
.format(topicAndPartition))
}
} catch {
case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
removePartitionFromReassignedPartitions(topicAndPartition)
}
}
def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
} catch {
case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally {
removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
}
}
def startup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up");
registerSessionExpirationListener()
isRunning = true
controllerElector.startup
info("Controller startup complete")
}
}
def shutdown() = {
inLock(controllerContext.controllerLock) {
isRunning = false
}
onControllerResignation()
}
def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback)
}
def incrementControllerEpoch(zkClient: ZkClient) = {
try {
var newControllerEpoch = controllerContext.epoch + 1
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPathIfExists(zkClient,
ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
if(!updateSucceeded)
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
else {
controllerContext.epochZkVersion = newVersion
controllerContext.epoch = newControllerEpoch
}
} catch {
case nne: ZkNoNodeException =>
try {
zkClient.createPersistent(ZkUtils.ControllerEpochPath, KafkaController.InitialControllerEpoch.toString)
controllerContext.epoch = KafkaController.InitialControllerEpoch
controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
} catch {
case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
"Aborting controller startup procedure")
case oe: Throwable => error("Error while incrementing controller epoch", oe)
}
case oe: Throwable => error("Error while incrementing controller epoch", oe)
}
info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
}
private def registerSessionExpirationListener() = {
zkClient.subscribeStateChanges(new SessionExpirationListener())
}
private def initializeControllerContext() {
controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
updateLeaderAndIsrCache()
startChannelManager()
initializePreferredReplicaElection()
initializePartitionReassignment()
initializeTopicDeletion()
info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
}
private def initializePreferredReplicaElection() {
val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
val topicDeleted = replicasOpt.isEmpty
val successful =
if(!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false
successful || topicDeleted
}
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
info("Partitions that completed preferred replica election: %s".format(partitionsThatCompletedPreferredReplicaElection.mkString(",")))
info("Resuming preferred replica election for partitions: %s".format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
}
private def initializePartitionReassignment() {
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
val topicDeleted = replicasOpt.isEmpty
val successful = if(!topicDeleted) replicasOpt.get == partition._2.newReplicas else false
topicDeleted || successful
}.map(_._1)
reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
partitionsToReassign ++= partitionsBeingReassigned
partitionsToReassign --= reassignedPartitions
controllerContext.partitionsBeingReassigned ++= partitionsToReassign
info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString()))
}
private def initializeTopicDeletion() {
val topicsQueuedForDeletion = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.DeleteTopicsPath).toSet
val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) =>
replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
val topicsIneligibleForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress |
topicsForWhichPreferredReplicaElectionIsInProgress
info("List of topics to be deleted: %s".format(topicsQueuedForDeletion.mkString(",")))
info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(",")))
deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion, topicsIneligibleForDeletion)
}
private def maybeTriggerPartitionReassignment() {
controllerContext.partitionsBeingReassigned.foreach { topicPartitionToReassign =>
initiateReassignReplicasForTopicPartition(topicPartitionToReassign._1, topicPartitionToReassign._2)
}
}
private def maybeTriggerPreferredReplicaElection() {
onPreferredReplicaElection(controllerContext.partitionsUndergoingPreferredReplicaElection.toSet)
}
private def startChannelManager() {
controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config)
controllerContext.controllerChannelManager.startup()
}
private def updateLeaderAndIsrCache() {
val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.partitionReplicaAssignment.keySet)
for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
}
private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
getLeaderAndIsrForPartition(zkClient, topic, partition) match {
case Some(leaderAndIsr) =>
val replicasNotInIsr = replicas.filterNot(r => leaderAndIsr.isr.contains(r))
replicasNotInIsr.isEmpty
case None => false
}
}
private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
} else {
controllerContext.liveBrokerIds.contains(currentLeader) match {
case true =>
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas)
case false =>
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
}
}
}
private def stopOldReplicasOfReassignedPartition(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext,
oldReplicas: Set[Int]) {
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
val replicasToBeDeleted = oldReplicas.map(r => PartitionAndReplica(topic, partition, r))
replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica)
replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted)
replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful)
replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica)
}
private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
replicas: Seq[Int]) {
val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic))
partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas)
updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic)
info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(",")))
controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas)
}
private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext,
newReplicas: Set[Int]) {
newReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
}
}
private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
brokerRequestBatch.newBatch()
updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement)
stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " +
"to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch,
newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition))
case None =>
stateChangeLogger.error(("Controller %d epoch %d failed to send LeaderAndIsr request with new assigned replica list %s " +
"to leader for partition being reassigned %s").format(config.brokerId, controllerContext.epoch,
newAssignedReplicas.mkString(","), topicAndPartition))
}
}
private def registerReassignedPartitionsListener() = {
zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
private def deregisterReassignedPartitionsListener() = {
zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
private def registerPreferredReplicaElectionListener() {
zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
}
private def deregisterPreferredReplicaElectionListener() {
zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
}
private def deregisterReassignedPartitionsIsrChangeListeners() {
controllerContext.partitionsBeingReassigned.foreach {
case (topicAndPartition, reassignedPartitionsContext) =>
val zkPartitionPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
}
}
private def readControllerEpochFromZookeeper() {
if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) {
val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath)
controllerContext.epoch = epochData._1.toInt
controllerContext.epochZkVersion = epochData._2.getVersion
info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
}
}
def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
}
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
}
def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
try {
val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic)
val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
case e2: Throwable => throw new KafkaException(e2.toString)
}
}
def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
isTriggeredByAutoRebalance : Boolean) {
for(partition <- partitionsToBeRemoved) {
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
if(currentLeader == preferredReplica) {
info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
} else {
warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
}
}
if (!isTriggeredByAutoRebalance)
ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
}
def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
brokerRequestBatch.newBatch()
brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
}
def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch] = {
val topicAndPartition = TopicAndPartition(topic, partition)
debug("Removing replica %d from ISR %s for partition %s.".format(replicaId,
controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.isr.mkString(","), topicAndPartition))
var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
var zkWriteCompleteOrUnnecessary = false
while (!zkWriteCompleteOrUnnecessary) {
val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
case Some(leaderIsrAndEpoch) =>
val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
if(controllerEpoch > epoch)
throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
"means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
if (leaderAndIsr.isr.contains(replicaId)) {
val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
var newIsr = leaderAndIsr.isr.filter(b => b != replicaId)
if (newIsr.isEmpty && !LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
newIsr = leaderAndIsr.isr
}
val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1,
newIsr, leaderAndIsr.zkVersion + 1)
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition,
newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
newLeaderAndIsr.zkVersion = newVersion
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
if (updateSucceeded)
info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString()))
updateSucceeded
} else {
warn("Cannot remove replica %d from ISR of partition %s since it is not in the ISR. Leader = %d ; ISR = %s"
.format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
true
}
case None =>
warn("Cannot remove replica %d from ISR of %s - leaderAndIsr is empty.".format(replicaId, topicAndPartition))
true
}
}
finalLeaderIsrAndControllerEpoch
}
private def updateLeaderEpoch(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
val topicAndPartition = TopicAndPartition(topic, partition)
debug("Updating leader epoch for partition %s.".format(topicAndPartition))
var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
var zkWriteCompleteOrUnnecessary = false
while (!zkWriteCompleteOrUnnecessary) {
val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
case Some(leaderIsrAndEpoch) =>
val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
if(controllerEpoch > epoch)
throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
"means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
leaderAndIsr.isr, leaderAndIsr.zkVersion + 1)
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic,
partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
newLeaderAndIsr.zkVersion = newVersion
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
if (updateSucceeded)
info("Updated leader epoch for partition %s to %d".format(topicAndPartition, newLeaderAndIsr.leaderEpoch))
updateSucceeded
case None =>
throw new IllegalStateException(("Cannot update leader epoch for partition %s as leaderAndIsr path is empty. " +
"This could mean we somehow tried to reassign a partition that doesn't exist").format(topicAndPartition))
true
}
}
finalLeaderIsrAndControllerEpoch
}
class SessionExpirationListener() extends IZkStateListener with Logging {
this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
}
@throws(classOf[Exception])
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
}
}
}
private def checkAndTriggerPartitionRebalance(): Unit = {
if (isActive()) {
trace("checking need to trigger partition rebalance")
var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
inLock(controllerContext.controllerLock) {
preferredReplicasForTopicsByBrokers =
controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
case(topicAndPartition, assignedReplicas) => assignedReplicas.head
}
}
debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
preferredReplicasForTopicsByBrokers.foreach {
case(leaderBroker, topicAndPartitionsForBroker) => {
var imbalanceRatio: Double = 0
var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
inLock(controllerContext.controllerLock) {
topicsNotInPreferredReplica =
topicAndPartitionsForBroker.filter {
case(topicPartition, replicas) => {
controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
}
}
debug("topics not in preferred replica " + topicsNotInPreferredReplica)
val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
}
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
topicsNotInPreferredReplica.foreach {
case(topicPartition, replicas) => {
inLock(controllerContext.controllerLock) {
if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
controllerContext.partitionsBeingReassigned.size == 0 &&
controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
controllerContext.allTopics.contains(topicPartition.topic)) {
onPreferredReplicaElection(Set(topicPartition), true)
}
}
}
}
}
}
}
}
}
}
class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
val zkClient = controller.controllerContext.zkClient
val controllerContext = controller.controllerContext
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
.format(dataPath, data))
val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
}
partitionsToBeReassigned.foreach { partitionToBeReassigned =>
inLock(controllerContext.controllerLock) {
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
.format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
} else {
val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
}
}
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}
class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int,
reassignedReplicas: Set[Int])
extends IZkDataListener with Logging {
this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: "
val zkClient = controller.controllerContext.zkClient
val controllerContext = controller.controllerContext
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
inLock(controllerContext.controllerLock) {
debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
val topicAndPartition = TopicAndPartition(topic, partition)
try {
controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
case Some(reassignedPartitionContext) =>
val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
newLeaderAndIsrOpt match {
case Some(leaderAndIsr) =>
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
if(caughtUpReplicas == reassignedReplicas) {
info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
.format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
"Resuming partition reassignment")
controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
}
else {
info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
.format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
"Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
}
case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
.format(topicAndPartition, reassignedReplicas.mkString(",")))
}
case None =>
}
} catch {
case e: Throwable => error("Error while handling partition reassignment", e)
}
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}
class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging {
this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: "
val zkClient = controller.controllerContext.zkClient
val controllerContext = controller.controllerContext
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
.format(dataPath, data.toString))
inLock(controllerContext.controllerLock) {
val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
if(controllerContext.partitionsUndergoingPreferredReplicaElection.size > 0)
info("These partitions are already undergoing preferred replica election: %s"
.format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if(partitionsForTopicsToBeDeleted.size > 0) {
error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
.format(partitionsForTopicsToBeDeleted))
}
controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}
case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
override def toString(): String = {
"[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
}
}
case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
override def toString(): String = {
val leaderAndIsrInfo = new StringBuilder
leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))
leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch)
leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
leaderAndIsrInfo.toString()
}
}
object ControllerStats extends KafkaMetricsGroup {
val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
}