package kafka.controller
import collection._
import collection.JavaConversions
import collection.mutable.Buffer
import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.LeaderAndIsr
import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
import kafka.utils.{Logging, ZkUtils, ReplicationUtils}
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.apache.log4j.Logger
import kafka.controller.Callbacks.CallbackBuilder
import kafka.utils.Utils._
class PartitionStateMachine(controller: KafkaController) extends Logging {
private val controllerContext = controller.controllerContext
private val controllerId = controller.config.brokerId
private val zkClient = controllerContext.zkClient
private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
private val hasStarted = new AtomicBoolean(false)
private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
private val topicChangeListener = new TopicChangeListener()
private val deleteTopicsListener = new DeleteTopicsListener()
private val addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty
private val stateChangeLogger = KafkaController.stateChangeLogger
this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
def startup() {
initializePartitionState()
hasStarted.set(true)
triggerOnlinePartitionStateChange()
info("Started partition state machine with initial state -> " + partitionState.toString())
}
def registerListeners() {
registerTopicChangeListener()
if(controller.config.deleteTopicEnable)
registerDeleteTopicListener()
}
def deregisterListeners() {
deregisterTopicChangeListener()
addPartitionsListener.foreach {
case (topic, listener) =>
zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener)
}
addPartitionsListener.clear()
if(controller.config.deleteTopicEnable)
deregisterDeleteTopicListener()
}
def shutdown() {
hasStarted.set(false)
partitionState.clear()
deregisterListeners()
info("Stopped partition state machine")
}
def triggerOnlinePartitionStateChange() {
try {
brokerRequestBatch.newBatch()
for((topicAndPartition, partitionState) <- partitionState
if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))) {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
(new CallbackBuilder).build)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
} catch {
case e: Throwable => error("Error while moving some partitions to the online state", e)
}
}
def partitionsInState(state: PartitionState): Set[TopicAndPartition] = {
partitionState.filter(p => p._2 == state).keySet
}
def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
callbacks: Callbacks = (new CallbackBuilder).build) {
info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
try {
brokerRequestBatch.newBatch()
partitions.foreach { topicAndPartition =>
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
}catch {
case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
}
}
private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
leaderSelector: PartitionLeaderSelector,
callbacks: Callbacks) {
val topicAndPartition = TopicAndPartition(topic, partition)
if (!hasStarted.get)
throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
"the partition state machine has not started")
.format(controllerId, controller.epoch, topicAndPartition, targetState))
val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
try {
targetState match {
case NewPartition =>
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
assignReplicasToPartitions(topic, partition)
partitionState.put(topicAndPartition, NewPartition)
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
assignedReplicas))
case OnlinePartition =>
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
partitionState(topicAndPartition) match {
case NewPartition =>
initializeLeaderAndIsrForPartition(topicAndPartition)
case OfflinePartition =>
electLeaderForPartition(topic, partition, leaderSelector)
case OnlinePartition =>
electLeaderForPartition(topic, partition, leaderSelector)
case _ =>
}
partitionState.put(topicAndPartition, OnlinePartition)
val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
case OfflinePartition =>
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
partitionState.put(topicAndPartition, OfflinePartition)
case NonExistentPartition =>
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
partitionState.put(topicAndPartition, NonExistentPartition)
}
} catch {
case t: Throwable =>
stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
}
}
private def initializePartitionState() {
for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
controllerContext.partitionLeadershipInfo.get(topicPartition) match {
case Some(currentLeaderIsrAndEpoch) =>
controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match {
case true =>
partitionState.put(topicPartition, OnlinePartition)
case false =>
partitionState.put(topicPartition, OfflinePartition)
}
case None =>
partitionState.put(topicPartition, NewPartition)
}
}
}
private def assertValidPreviousStates(topicAndPartition: TopicAndPartition, fromStates: Seq[PartitionState],
targetState: PartitionState) {
if(!fromStates.contains(partitionState(topicAndPartition)))
throw new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
.format(topicAndPartition, fromStates.mkString(","), targetState) + ". Instead it is in %s state"
.format(partitionState(topicAndPartition)))
}
private def assignReplicasToPartitions(topic: String, partition: Int) {
val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient, topic, partition)
controllerContext.partitionReplicaAssignment += TopicAndPartition(topic, partition) -> assignedReplicas
}
private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
liveAssignedReplicas.size match {
case 0 =>
val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " +
"live brokers are [%s]. No assigned replica is alive.")
.format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg)
case _ =>
debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
val leader = liveAssignedReplicas.head
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
controller.epoch)
debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
try {
ZkUtils.createPersistentPath(controllerContext.zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
} catch {
case e: ZkNodeExistsException =>
val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
topicAndPartition.partition).get
val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +
"exists with value %s and controller epoch %d")
.format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg)
}
}
}
def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
val topicAndPartition = TopicAndPartition(topic, partition)
stateChangeLogger.trace("Controller %d epoch %d started leader election for partition %s"
.format(controllerId, controller.epoch, topicAndPartition))
try {
var zookeeperPathUpdateSucceeded: Boolean = false
var newLeaderAndIsr: LeaderAndIsr = null
var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
while(!zookeeperPathUpdateSucceeded) {
val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
if (controllerEpoch > controller.epoch) {
val failMsg = ("aborted leader election for partition [%s,%d] since the LeaderAndIsr path was " +
"already written by another controller. This probably means that the current controller %d went through " +
"a soft failure and another controller was elected with epoch %d.")
.format(topic, partition, controllerId, controllerEpoch)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg)
}
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partition,
leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
newLeaderAndIsr = leaderAndIsr
newLeaderAndIsr.zkVersion = newVersion
zookeeperPathUpdateSucceeded = updateSucceeded
replicasForThisPartition = replicas
}
val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
.format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
newLeaderIsrAndControllerEpoch, replicas)
} catch {
case lenne: LeaderElectionNotNeededException =>
case nroe: NoReplicaOnlineException => throw nroe
case sce: Throwable =>
val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg, sce)
}
debug("After leader election, leader cache is updated to %s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2))))
}
private def registerTopicChangeListener() = {
zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
}
private def deregisterTopicChangeListener() = {
zkClient.unsubscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
}
def registerPartitionChangeListener(topic: String) = {
addPartitionsListener.put(topic, new AddPartitionsListener(topic))
zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
}
def deregisterPartitionChangeListener(topic: String) = {
zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
addPartitionsListener.remove(topic)
}
private def registerDeleteTopicListener() = {
zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
}
private def deregisterDeleteTopicListener() = {
zkClient.unsubscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
}
private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
val topicAndPartition = TopicAndPartition(topic, partition)
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
case None =>
val failMsg = "LeaderAndIsr information doesn't exist for partition %s in %s state"
.format(topicAndPartition, partitionState(topicAndPartition))
throw new StateChangeFailedException(failMsg)
}
}
class TopicChangeListener extends IZkChildListener with Logging {
this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
try {
val currentChildren = {
import JavaConversions._
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
(children: Buffer[String]).toSet
}
val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren
val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
deletedTopics, addedPartitionReplicaAssignment))
if(newTopics.size > 0)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
} catch {
case e: Throwable => error("Error while handling new topic", e )
}
}
}
}
}
class DeleteTopicsListener() extends IZkChildListener with Logging {
this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
val zkClient = controllerContext.zkClient
@throws(classOf[Exception])
def handleChildChange(parentPath : String, children : java.util.List[String]) {
inLock(controllerContext.controllerLock) {
var topicsToBeDeleted = {
import JavaConversions._
(children: Buffer[String]).toSet
}
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
if(nonExistentTopics.size > 0) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
nonExistentTopics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getDeleteTopicPath(topic)))
}
topicsToBeDeleted --= nonExistentTopics
if(topicsToBeDeleted.size > 0) {
info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
topicsToBeDeleted.foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
}
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
}
class AddPartitionsListener(topic: String) extends IZkDataListener with Logging {
this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: "
@throws(classOf[Exception])
def handleDataChange(dataPath : String, data: Object) {
inLock(controllerContext.controllerLock) {
try {
info("Add Partition triggered " + data.toString + " for path " + dataPath)
val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
error("Skipping adding partitions %s for topic %s since it is currently being deleted"
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
else {
if (partitionsToBeAdded.size > 0) {
info("New partitions to be added %s".format(partitionsToBeAdded))
controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
}
}
} catch {
case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
}
}
}
@throws(classOf[Exception])
def handleDataDeleted(parentPath : String) {
}
}
}
sealed trait PartitionState { def state: Byte }
case object NewPartition extends PartitionState { val state: Byte = 0 }
case object OnlinePartition extends PartitionState { val state: Byte = 1 }
case object OfflinePartition extends PartitionState { val state: Byte = 2 }
case object NonExistentPartition extends PartitionState { val state: Byte = 3 }