/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package kafka.controller import collection.mutable import kafka.utils.{ShutdownableThread, Logging, ZkUtils} import kafka.utils.Utils._ import collection.Set import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.api.{StopReplicaResponse, RequestOrResponse} import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicBoolean /** * This manages the state machine for topic deletion. * 1. TopicCommand issues topic deletion by creating a new admin path /admin/delete_topics/<topic> * 2. The controller listens for child changes on /admin/delete_topic and starts topic deletion for the respective topics * 3. The controller has a background thread that handles topic deletion. The purpose of having this background thread * is to accommodate the TTL feature, when we have it. This thread is signaled whenever deletion for a topic needs to * be started or resumed. Currently, a topic's deletion can be started only by the onPartitionDeletion callback on the * controller. In the future, it can be triggered based on the configured TTL for the topic. A topic will be ineligible * for deletion in the following scenarios - * 3.1 broker hosting one of the replicas for that topic goes down * 3.2 partition reassignment for partitions of that topic is in progress * 3.3 preferred replica election for partitions of that topic is in progress * (though this is not strictly required since it holds the controller lock for the entire duration from start to end) * 4. Topic deletion is resumed when - * 4.1 broker hosting one of the replicas for that topic is started * 4.2 preferred replica election for partitions of that topic completes * 4.3 partition reassignment for partitions of that topic completes * 5. Every replica for a topic being deleted is in either of the 3 states - * 5.1 TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the onPartitionDeletion callback is invoked. * This happens when the child change watch for /admin/delete_topics fires on the controller. As part of this state * change, the controller sends StopReplicaRequests to all replicas. It registers a callback for the * StopReplicaResponse when deletePartition=true thereby invoking a callback when a response for delete replica * is received from every replica) * 5.2 TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas from * TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in StopReplicaResponse) * 5.3 TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from * TopicDeletionStarted->TopicDeletionFailed depending on the error codes in StopReplicaResponse. * In general, if a broker dies and if it hosted replicas for topics being deleted, the controller marks the * respective replicas in TopicDeletionFailed state in the onBrokerFailure callback. The reason is that if a * broker fails before the request is sent and after the replica is in TopicDeletionStarted state, * it is possible that the replica will mistakenly remain in TopicDeletionStarted state and topic deletion * will not be retried when the broker comes back up.) * 6. The delete topic thread marks a topic successfully deleted only if all replicas are in TopicDeletionSuccessful * state and it starts the topic deletion teardown mode where it deletes all topic state from the controllerContext * as well as from zookeeper. This is the only time the /brokers/topics/<topic> path gets deleted. On the other hand, * if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed state, then * it marks the topic for deletion retry. * @param controller * @param initialTopicsToBeDeleted The topics that are queued up for deletion in zookeeper at the time of controller failover * @param initialTopicsIneligibleForDeletion The topics ineligible for deletion due to any of the conditions mentioned in #3 above */ class TopicDeletionManager(controller: KafkaController, initialTopicsToBeDeleted: Set[String] = Set.empty, initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging { this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], " val controllerContext = controller.controllerContext val partitionStateMachine = controller.partitionStateMachine val replicaStateMachine = controller.replicaStateMachine val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic) val deleteLock = new ReentrantLock() val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted) val deleteTopicsCond = deleteLock.newCondition() val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false) var deleteTopicsThread: DeleteTopicsThread = null val isDeleteTopicEnabled = controller.config.deleteTopicEnable /** * Invoked at the end of new controller initiation */ def start() { if (isDeleteTopicEnabled) { deleteTopicsThread = new DeleteTopicsThread() if (topicsToBeDeleted.size > 0) deleteTopicStateChanged.set(true) deleteTopicsThread.start() } } /** * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared. */ def shutdown() { // Only allow one shutdown to go through if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) { // Resume the topic deletion so it doesn't block on the condition resumeTopicDeletionThread() // Await delete topic thread to exit deleteTopicsThread.awaitShutdown() topicsToBeDeleted.clear() partitionsToBeDeleted.clear() topicsIneligibleForDeletion.clear() } } /** * Invoked by the child change listener on /admin/delete_topics to queue up the topics for deletion. The topic gets added * to the topicsToBeDeleted list and only gets removed from the list when the topic deletion has completed successfully * i.e. all replicas of all partitions of that topic are deleted successfully. * @param topics Topics that should be deleted */ def enqueueTopicsForDeletion(topics: Set[String]) { if(isDeleteTopicEnabled) { topicsToBeDeleted ++= topics partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic) resumeTopicDeletionThread() } } /** * Invoked when any event that can possibly resume topic deletion occurs. These events include - * 1. New broker starts up. Any replicas belonging to topics queued up for deletion can be deleted since the broker is up * 2. Partition reassignment completes. Any partitions belonging to topics queued up for deletion finished reassignment * 3. Preferred replica election completes. Any partitions belonging to topics queued up for deletion finished * preferred replica election * @param topics Topics for which deletion can be resumed */ def resumeDeletionForTopics(topics: Set[String] = Set.empty) { if(isDeleteTopicEnabled) { val topicsToResumeDeletion = topics & topicsToBeDeleted if(topicsToResumeDeletion.size > 0) { topicsIneligibleForDeletion --= topicsToResumeDeletion resumeTopicDeletionThread() } } } /** * Invoked when a broker that hosts replicas for topics to be deleted goes down. Also invoked when the callback for * StopReplicaResponse receives an error code for the replicas of a topic to be deleted. As part of this, the replicas * are moved from ReplicaDeletionStarted to ReplicaDeletionIneligible state. Also, the topic is added to the list of topics * ineligible for deletion until further notice. The delete topic thread is notified so it can retry topic deletion * if it has received a response for all replicas of a topic to be deleted * @param replicas Replicas for which deletion has failed */ def failReplicaDeletion(replicas: Set[PartitionAndReplica]) { if(isDeleteTopicEnabled) { val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic)) if(replicasThatFailedToDelete.size > 0) { val topics = replicasThatFailedToDelete.map(_.topic) debug("Deletion failed for replicas %s. Halting deletion for topics %s" .format(replicasThatFailedToDelete.mkString(","), topics)) controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible) markTopicIneligibleForDeletion(topics) resumeTopicDeletionThread() } } } /** * Halt delete topic if - * 1. replicas being down * 2. partition reassignment in progress for some partitions of the topic * 3. preferred replica election in progress for some partitions of the topic * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion */ def markTopicIneligibleForDeletion(topics: Set[String]) { if(isDeleteTopicEnabled) { val newTopicsToHaltDeletion = topicsToBeDeleted & topics topicsIneligibleForDeletion ++= newTopicsToHaltDeletion if(newTopicsToHaltDeletion.size > 0) info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(","))) } } def isTopicIneligibleForDeletion(topic: String): Boolean = { if(isDeleteTopicEnabled) { topicsIneligibleForDeletion.contains(topic) } else true } def isTopicDeletionInProgress(topic: String): Boolean = { if(isDeleteTopicEnabled) { controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic) } else false } def isPartitionToBeDeleted(topicAndPartition: TopicAndPartition) = { if(isDeleteTopicEnabled) { partitionsToBeDeleted.contains(topicAndPartition) } else false } def isTopicQueuedUpForDeletion(topic: String): Boolean = { if(isDeleteTopicEnabled) { topicsToBeDeleted.contains(topic) } else false } /** * Invoked by the delete-topic-thread to wait until events that either trigger, restart or halt topic deletion occur. * controllerLock should be acquired before invoking this API */ private def awaitTopicDeletionNotification() { inLock(deleteLock) { while(deleteTopicsThread.isRunning.get() && !deleteTopicStateChanged.compareAndSet(true, false)) { debug("Waiting for signal to start or continue topic deletion") deleteTopicsCond.await() } } } /** * Signals the delete-topic-thread to process topic deletion */ private def resumeTopicDeletionThread() { deleteTopicStateChanged.set(true) inLock(deleteLock) { deleteTopicsCond.signal() } } /** * Invoked by the StopReplicaResponse callback when it receives no error code for a replica of a topic to be deleted. * As part of this, the replicas are moved from ReplicaDeletionStarted to ReplicaDeletionSuccessful state. The delete * topic thread is notified so it can tear down the topic if all replicas of a topic have been successfully deleted * @param replicas Replicas that were successfully deleted by the broker */ private def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) { val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic)) debug("Deletion successfully completed for replicas %s".format(successfullyDeletedReplicas.mkString(","))) controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas, ReplicaDeletionSuccessful) resumeTopicDeletionThread() } /** * Topic deletion can be retried if - * 1. Topic deletion is not already complete * 2. Topic deletion is currently not in progress for that topic * 3. Topic is currently marked ineligible for deletion * @param topic Topic * @return Whether or not deletion can be retried for the topic */ private def isTopicEligibleForDeletion(topic: String): Boolean = { topicsToBeDeleted.contains(topic) && (!isTopicDeletionInProgress(topic) && !isTopicIneligibleForDeletion(topic)) } /** * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionIneligible to OfflineReplica state *@param topic Topic for which deletion should be retried */ private def markTopicForDeletionRetry(topic: String) { // reset replica states from ReplicaDeletionIneligible to OfflineReplica val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible) info("Retrying delete topic for topic %s since replicas %s were not successfully deleted" .format(topic, failedReplicas.mkString(","))) controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica) } private def completeDeleteTopic(topic: String) { // deregister partition change listener on the deleted topic. This is to prevent the partition change listener // firing before the new topic listener when a deleted topic gets auto created partitionStateMachine.deregisterPartitionChangeListener(topic) val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful) // controller will remove this replica from the state machine as well as its partition assignment cache replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica) val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic) // move respective partition to OfflinePartition and NonExistentPartition state partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition) partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition) topicsToBeDeleted -= topic partitionsToBeDeleted.retain(_.topic != topic) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic)) controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic)) controllerContext.removeTopic(topic) } /** * This callback is invoked by the DeleteTopics thread with the list of topics to be deleted * It invokes the delete partition callback for all partitions of a topic. * The updateMetadataRequest is also going to set the leader for the topics being deleted to * {@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be * removed from their caches. */ private def onTopicDeletion(topics: Set[String]) { info("Topic deletion callback for %s".format(topics.mkString(","))) // send update metadata so that brokers stop serving data for topics to be deleted val partitions = topics.flatMap(controllerContext.partitionsForTopic) controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions) val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic) topics.foreach { topic => onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet) } } /** * Invoked by the onPartitionDeletion callback. It is the 2nd step of topic deletion, the first being sending * UpdateMetadata requests to all brokers to start rejecting requests for deleted topics. As part of starting deletion, * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic * is never retried. A topic is removed from the in progress list when * 1. Either the topic is successfully deleted OR * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends * the replicas a StopReplicaRequest (delete=true) * This callback does the following things - * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible * for deletion if some replicas are dead since it won't complete successfully anyway * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully *@param replicasForTopicsToBeDeleted */ private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) { replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) => var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic)) val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful) val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas // move dead replicas directly to failed state replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible) // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica) debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(","))) controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted, new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build) if(deadReplicasForTopic.size > 0) { debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic)) markTopicIneligibleForDeletion(Set(topic)) } } } /** * This callback is invoked by the delete topic callback with the list of partitions for topics to be deleted * It does the following - * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) for partitions that are being * deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException * 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas * and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state, * it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1 * 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And * will delete all persistent data from all replicas of the respective partitions */ private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) { info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(","))) val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted) startReplicaDeletion(replicasPerPartition) } private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: RequestOrResponse, replicaId: Int) { val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse] debug("Delete topic callback invoked for %s".format(stopReplicaResponse)) val partitionsInError = if(stopReplicaResponse.errorCode != ErrorMapping.NoError) { stopReplicaResponse.responseMap.keySet } else stopReplicaResponse.responseMap.filter(p => p._2 != ErrorMapping.NoError).map(_._1).toSet val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)) inLock(controllerContext.controllerLock) { // move all the failed replicas to ReplicaDeletionIneligible failReplicaDeletion(replicasInError) if(replicasInError.size != stopReplicaResponse.responseMap.size) { // some replicas could have been successfully deleted val deletedReplicas = stopReplicaResponse.responseMap.keySet -- partitionsInError completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))) } } } class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) { val zkClient = controllerContext.zkClient override def doWork() { awaitTopicDeletionNotification() if (!isRunning.get) return inLock(controllerContext.controllerLock) { val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted if(!topicsQueuedForDeletion.isEmpty) info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(",")) topicsQueuedForDeletion.foreach { topic => // if all replicas are marked as deleted successfully, then topic deletion is done if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) { // clear up all state for this topic from controller cache and zookeeper completeDeleteTopic(topic) info("Deletion of topic %s successfully completed".format(topic)) } else { if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) { // ignore since topic deletion is in progress val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted) val replicaIds = replicasInDeletionStartedState.map(_.replica) val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition)) info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","), partitions.mkString(","), topic)) } else { // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion // or there is at least one failed replica (which means topic deletion should be retried). if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) { // mark topic for deletion retry markTopicForDeletionRetry(topic) } } } // Try delete topic if it is eligible for deletion. if(isTopicEligibleForDeletion(topic)) { info("Deletion of topic %s (re)started".format(topic)) // topic deletion will be kicked off onTopicDeletion(Set(topic)) } else if(isTopicIneligibleForDeletion(topic)) { info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic)) } } } } } }