package kafka.server
import kafka.admin._
import kafka.log.LogConfig
import kafka.log.CleanerConfig
import kafka.log.LogManager
import kafka.utils._
import java.util.concurrent._
import atomic.{AtomicInteger, AtomicBoolean}
import java.io.File
import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController}
import kafka.cluster.Broker
import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
import kafka.common.ErrorMapping
import kafka.network.{Receive, BlockingChannel, SocketServer}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Kafka Server " + config.brokerId + "], "
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
private var startupComplete = new AtomicBoolean(false)
val brokerState: BrokerState = new BrokerState
val correlationId: AtomicInteger = new AtomicInteger(0)
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
var logManager: LogManager = null
var offsetManager: OffsetManager = null
var kafkaHealthcheck: KafkaHealthcheck = null
var topicConfigManager: TopicConfigManager = null
var replicaManager: ReplicaManager = null
var apis: KafkaApis = null
var kafkaController: KafkaController = null
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
var zkClient: ZkClient = null
newGauge(
"BrokerState",
new Gauge[Int] {
def value = brokerState.currentState
}
)
def startup() {
try {
info("starting")
brokerState.newState(Starting)
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
kafkaScheduler.startup()
zkClient = initZk()
logManager = createLogManager(zkClient, brokerState)
logManager.startup()
socketServer = new SocketServer(config.brokerId,
config.hostName,
config.port,
config.numNetworkThreads,
config.queuedMaxRequests,
config.socketSendBufferBytes,
config.socketReceiveBufferBytes,
config.socketRequestMaxBytes,
config.maxConnectionsPerIp,
config.connectionsMaxIdleMs,
config.maxConnectionsPerIpOverrides)
socketServer.startup()
replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
offsetManager = createOffsetManager()
kafkaController = new KafkaController(config, zkClient, brokerState)
apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
Mx4jLoader.maybeLoad()
replicaManager.startup()
kafkaController.startup()
topicConfigManager = new TopicConfigManager(zkClient, logManager)
topicConfigManager.startup()
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()
registerStats()
startupComplete.set(true)
info("started")
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
shutdown()
throw e
}
}
private def initZk(): ZkClient = {
info("Connecting to zookeeper on " + config.zkConnect)
val chroot = {
if (config.zkConnect.indexOf("/") > 0)
config.zkConnect.substring(config.zkConnect.indexOf("/"))
else
""
}
if (chroot.length > 1) {
val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)
info("Created zookeeper path " + chroot)
zkClientForChrootCreation.close()
}
val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
ZkUtils.setupCommonPaths(zkClient)
zkClient
}
private def registerStats() {
BrokerTopicStats.getBrokerAllTopicsStats()
ControllerStats.uncleanLeaderElectionRate
ControllerStats.leaderElectionTimer
}
private def controlledShutdown() {
if (startupComplete.get() && config.controlledShutdownEnable) {
var remainingRetries = config.controlledShutdownMaxRetries
info("Starting controlled shutdown")
var channel : BlockingChannel = null
var prevController : Broker = null
var shutdownSuceeded : Boolean = false
try {
brokerState.newState(PendingControlledShutdown)
while (!shutdownSuceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
val controllerId = ZkUtils.getController(zkClient)
ZkUtils.getBrokerInfo(zkClient, controllerId) match {
case Some(broker) =>
if (channel == null || prevController == null || !prevController.equals(broker)) {
if (channel != null) {
channel.disconnect()
}
channel = new BlockingChannel(broker.host, broker.port,
BlockingChannel.UseDefaultBufferSize,
BlockingChannel.UseDefaultBufferSize,
config.controllerSocketTimeoutMs)
channel.connect()
prevController = broker
}
case None=>
}
if (channel != null) {
var response: Receive = null
try {
val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
channel.send(request)
response = channel.receive()
val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer)
if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null &&
shutdownResponse.partitionsRemaining.size == 0) {
shutdownSuceeded = true
info ("Controlled shutdown succeeded")
}
else {
info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.mkString(",")))
info("Error code from controller: %d".format(shutdownResponse.errorCode))
}
}
catch {
case ioe: java.io.IOException =>
channel.disconnect()
channel = null
warn("Error during controlled shutdown, possibly because leader movement took longer than the configured socket.timeout.ms: %s".format(ioe.getMessage))
}
}
if (!shutdownSuceeded) {
Thread.sleep(config.controlledShutdownRetryBackoffMs)
warn("Retrying controlled shutdown after the previous attempt failed...")
}
}
}
finally {
if (channel != null) {
channel.disconnect()
channel = null
}
}
if (!shutdownSuceeded) {
warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
}
}
}
def shutdown() {
try {
info("shutting down")
val canShutdown = isShuttingDown.compareAndSet(false, true)
if (canShutdown) {
Utils.swallow(controlledShutdown())
brokerState.newState(BrokerShuttingDown)
if(socketServer != null)
Utils.swallow(socketServer.shutdown())
if(requestHandlerPool != null)
Utils.swallow(requestHandlerPool.shutdown())
if(offsetManager != null)
offsetManager.shutdown()
Utils.swallow(kafkaScheduler.shutdown())
if(apis != null)
Utils.swallow(apis.close())
if(replicaManager != null)
Utils.swallow(replicaManager.shutdown())
if(logManager != null)
Utils.swallow(logManager.shutdown())
if(kafkaController != null)
Utils.swallow(kafkaController.shutdown())
if(zkClient != null)
Utils.swallow(zkClient.close())
brokerState.newState(NotRunning)
shutdownLatch.countDown()
startupComplete.set(false)
info("shut down completed")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer shutdown.", e)
throw e
}
}
def awaitShutdown(): Unit = shutdownLatch.await()
def getLogManager(): LogManager = logManager
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {
val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,
segmentMs = config.logRollTimeMillis,
segmentJitterMs = config.logRollTimeJitterMillis,
flushInterval = config.logFlushIntervalMessages,
flushMs = config.logFlushIntervalMs.toLong,
retentionSize = config.logRetentionBytes,
retentionMs = config.logRetentionTimeMillis,
maxMessageSize = config.messageMaxBytes,
maxIndexSize = config.logIndexSizeMaxBytes,
indexInterval = config.logIndexIntervalBytes,
deleteRetentionMs = config.logCleanerDeleteRetentionMs,
fileDeleteDelayMs = config.logDeleteDelayMs,
minCleanableRatio = config.logCleanerMinCleanRatio,
compact = config.logCleanupPolicy.trim.toLowerCase == "compact")
val defaultProps = defaultLogConfig.toProps
val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
ioBufferSize = config.logCleanerIoBufferSize,
maxMessageSize = config.messageMaxBytes,
maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
backOffMs = config.logCleanerBackoffMs,
enableCleaner = config.logCleanerEnable)
new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
topicConfigs = configs,
defaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
ioThreads = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
scheduler = kafkaScheduler,
brokerState = brokerState,
time = time)
}
private def createOffsetManager(): OffsetManager = {
val offsetManagerConfig = OffsetManagerConfig(
maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler)
}
}