package kafka.server
import kafka.utils.ZkUtils._
import kafka.utils.Utils._
import kafka.utils.{Json, SystemTime, Logging}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.IZkDataListener
import kafka.controller.ControllerContext
import kafka.controller.KafkaController
class ZookeeperLeaderElector(controllerContext: ControllerContext,
electionPath: String,
onBecomingLeader: () => Unit,
onResigningAsLeader: () => Unit,
brokerId: Int)
extends LeaderElector with Logging {
var leaderId = -1
val index = electionPath.lastIndexOf("/")
if (index > 0)
makeSurePersistentPathExists(controllerContext.zkClient, electionPath.substring(0, index))
val leaderChangeListener = new LeaderChangeListener
def startup {
inLock(controllerContext.controllerLock) {
controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
elect
}
}
private def getControllerID(): Int = {
readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
case Some(controller) => KafkaController.parseControllerId(controller)
case None => -1
}
}
def elect: Boolean = {
val timestamp = SystemTime.milliseconds.toString
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
leaderId = getControllerID
if(leaderId != -1) {
debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
return amILeader
}
try {
createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
(controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
controllerContext.zkSessionTimeout)
info(brokerId + " successfully elected as leader")
leaderId = brokerId
onBecomingLeader()
} catch {
case e: ZkNodeExistsException =>
leaderId = getControllerID
if (leaderId != -1)
debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
else
warn("A leader has been elected but just resigned, this will result in another round of election")
case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
resign()
}
amILeader
}
def close = {
leaderId = -1
}
def amILeader : Boolean = leaderId == brokerId
def resign() = {
leaderId = -1
deletePath(controllerContext.zkClient, electionPath)
}
class LeaderChangeListener extends IZkDataListener with Logging {
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
inLock(controllerContext.controllerLock) {
leaderId = KafkaController.parseControllerId(data.toString)
info("New leader is %d".format(leaderId))
}
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
inLock(controllerContext.controllerLock) {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
.format(brokerId, dataPath))
if(amILeader)
onResigningAsLeader()
elect
}
}
}
}