package kafka.consumer
import scala.collection.JavaConversions._
import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
class ZookeeperTopicEventWatcher(val zkClient: ZkClient,
val eventHandler: TopicEventHandler[String]) extends Logging {
val lock = new Object()
startWatchingTopicEvents()
private def startWatchingTopicEvents() {
val topicEventListener = new ZkTopicEventListener()
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
zkClient.subscribeStateChanges(
new ZkSessionExpireListener(topicEventListener))
val topics = zkClient.subscribeChildChanges(
ZkUtils.BrokerTopicsPath, topicEventListener).toList
topicEventListener.handleChildChange(ZkUtils.BrokerTopicsPath, topics)
}
private def stopWatchingTopicEvents() { zkClient.unsubscribeAll() }
def shutdown() {
lock.synchronized {
info("Shutting down topic event watcher.")
if (zkClient != null) {
stopWatchingTopicEvents()
}
else {
warn("Cannot shutdown since the embedded zookeeper client has already closed.")
}
}
}
class ZkTopicEventListener extends IZkChildListener {
@throws(classOf[Exception])
def handleChildChange(parent: String, children: java.util.List[String]) {
lock.synchronized {
try {
if (zkClient != null) {
val latestTopics = zkClient.getChildren(ZkUtils.BrokerTopicsPath).toList
debug("all topics: %s".format(latestTopics))
eventHandler.handleTopicEvent(latestTopics)
}
}
catch {
case e: Throwable =>
error("error in handling child changes", e)
}
}
}
}
class ZkSessionExpireListener(val topicEventListener: ZkTopicEventListener)
extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) { }
@throws(classOf[Exception])
def handleNewSession() {
lock.synchronized {
if (zkClient != null) {
info("ZK expired: resubscribing topic event listener to topic registry")
zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicEventListener)
}
}
}
}
}