package kafka.server
import java.util.Properties
import kafka.message.{MessageSet, Message}
import kafka.consumer.ConsumerConfig
import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
props.verify()
}
private def getLogRetentionTimeMillis(): Long = {
val millisInMinute = 60L * 1000L
val millisInHour = 60L * millisInMinute
if(props.containsKey("log.retention.ms")){
props.getIntInRange("log.retention.ms", (1, Int.MaxValue))
}
else if(props.containsKey("log.retention.minutes")){
millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue))
}
else {
millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue))
}
}
private def getLogRollTimeMillis(): Long = {
val millisInHour = 60L * 60L * 1000L
if(props.containsKey("log.roll.ms")){
props.getIntInRange("log.roll.ms", (1, Int.MaxValue))
}
else {
millisInHour * props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue))
}
}
private def getLogRollTimeJitterMillis(): Long = {
val millisInHour = 60L * 60L * 1000L
if(props.containsKey("log.roll.jitter.ms")) {
props.getIntInRange("log.roll.jitter.ms", (0, Int.MaxValue))
}
else {
millisInHour * props.getIntInRange("log.roll.jitter.hours", 0, (0, Int.MaxValue))
}
}
val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue))
val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue))
val numIoThreads = props.getIntInRange("num.io.threads", 8, (1, Int.MaxValue))
val backgroundThreads = props.getIntInRange("background.threads", 10, (1, Int.MaxValue))
val queuedMaxRequests = props.getIntInRange("queued.max.requests", 500, (1, Int.MaxValue))
val port: Int = props.getInt("port", 9092)
val hostName: String = props.getString("host.name", null)
val advertisedHostName: String = props.getString("advertised.host.name", hostName)
val advertisedPort: Int = props.getInt("advertised.port", port)
val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024)
val socketReceiveBufferBytes: Int = props.getInt("socket.receive.buffer.bytes", 100*1024)
val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue))
val maxConnectionsPerIp: Int = props.getIntInRange("max.connections.per.ip", Int.MaxValue, (1, Int.MaxValue))
val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry => (entry._1, entry._2.toInt))
val connectionsMaxIdleMs = props.getLong("connections.max.idle.ms", 10*60*1000L)
val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
val logDirs = Utils.parseCsvList(props.getString("log.dirs", props.getString("log.dir", "/tmp/kafka-logs")))
require(logDirs.size > 0)
val logSegmentBytes = props.getIntInRange("log.segment.bytes", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
val logRollTimeMillis = getLogRollTimeMillis
val logRollTimeJitterMillis = getLogRollTimeJitterMillis
val logRetentionTimeMillis = getLogRetentionTimeMillis
val logRetentionBytes = props.getLong("log.retention.bytes", -1)
val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue))
val logCleanupPolicy = props.getString("log.cleanup.policy", "delete")
val logCleanerThreads = props.getIntInRange("log.cleaner.threads", 1, (0, Int.MaxValue))
val logCleanerIoMaxBytesPerSecond = props.getDouble("log.cleaner.io.max.bytes.per.second", Double.MaxValue)
val logCleanerDedupeBufferSize = props.getLongInRange("log.cleaner.dedupe.buffer.size", 500*1024*1024L, (0, Long.MaxValue))
require(logCleanerDedupeBufferSize / logCleanerThreads > 1024*1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
val logCleanerIoBufferSize = props.getIntInRange("log.cleaner.io.buffer.size", 512*1024, (0, Int.MaxValue))
val logCleanerDedupeBufferLoadFactor = props.getDouble("log.cleaner.io.buffer.load.factor", 0.9d)
val logCleanerBackoffMs = props.getLongInRange("log.cleaner.backoff.ms", 15*1000, (0L, Long.MaxValue))
val logCleanerMinCleanRatio = props.getDouble("log.cleaner.min.cleanable.ratio", 0.5)
val logCleanerEnable = props.getBoolean("log.cleaner.enable", false)
val logCleanerDeleteRetentionMs = props.getLong("log.cleaner.delete.retention.ms", 24 * 60 * 60 * 1000L)
val logIndexSizeMaxBytes = props.getIntInRange("log.index.size.max.bytes", 10*1024*1024, (4, Int.MaxValue))
val logIndexIntervalBytes = props.getIntInRange("log.index.interval.bytes", 4096, (0, Int.MaxValue))
val logFlushIntervalMessages = props.getLongInRange("log.flush.interval.messages", Long.MaxValue, (1, Long.MaxValue))
val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue))
val logFlushSchedulerIntervalMs = props.getLong("log.flush.scheduler.interval.ms", Long.MaxValue)
val logFlushIntervalMs = props.getLong("log.flush.interval.ms", logFlushSchedulerIntervalMs)
val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue))
val numRecoveryThreadsPerDataDir = props.getIntInRange("num.recovery.threads.per.data.dir", 1, (1, Int.MaxValue))
val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
val minInSyncReplicas = props.getIntInRange("min.insync.replicas",1,(1,Int.MaxValue))
val controllerSocketTimeoutMs = props.getInt("controller.socket.timeout.ms", 30000)
val controllerMessageQueueSize= props.getInt("controller.message.queue.size", Int.MaxValue)
val defaultReplicationFactor = props.getInt("default.replication.factor", 1)
val replicaLagTimeMaxMs = props.getLong("replica.lag.time.max.ms", 10000)
val replicaLagMaxMessages = props.getLong("replica.lag.max.messages", 4000)
val replicaSocketTimeoutMs = props.getInt("replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")
val replicaSocketReceiveBufferBytes = props.getInt("replica.socket.receive.buffer.bytes", ConsumerConfig.SocketBufferSize)
val replicaFetchMaxBytes = props.getIntInRange("replica.fetch.max.bytes", ConsumerConfig.FetchSize, (messageMaxBytes, Int.MaxValue))
val replicaFetchWaitMaxMs = props.getInt("replica.fetch.wait.max.ms", 500)
require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, "replica.fetch.wait.max.ms should always be at least replica.lag.time.max.ms" +
" to prevent frequent changes in ISR")
val replicaFetchMinBytes = props.getInt("replica.fetch.min.bytes", 1)
val numReplicaFetchers = props.getInt("num.replica.fetchers", 1)
val replicaHighWatermarkCheckpointIntervalMs = props.getLong("replica.high.watermark.checkpoint.interval.ms", 5000L)
val fetchPurgatoryPurgeIntervalRequests = props.getInt("fetch.purgatory.purge.interval.requests", 1000)
val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 1000)
val autoLeaderRebalanceEnable = props.getBoolean("auto.leader.rebalance.enable", true)
val leaderImbalancePerBrokerPercentage = props.getInt("leader.imbalance.per.broker.percentage", 10)
val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds", 300)
val uncleanLeaderElectionEnable = props.getBoolean("unclean.leader.election.enable", true)
val controlledShutdownMaxRetries = props.getInt("controlled.shutdown.max.retries", 3)
val controlledShutdownRetryBackoffMs = props.getInt("controlled.shutdown.retry.backoff.ms", 5000)
val controlledShutdownEnable = props.getBoolean("controlled.shutdown.enable", default = true)
val offsetMetadataMaxSize = props.getInt("offset.metadata.max.bytes", OffsetManagerConfig.DefaultMaxMetadataSize)
val offsetsLoadBufferSize = props.getIntInRange("offsets.load.buffer.size",
OffsetManagerConfig.DefaultLoadBufferSize, (1, Integer.MAX_VALUE))
val offsetsTopicReplicationFactor: Short = props.getShortInRange("offsets.topic.replication.factor",
OffsetManagerConfig.DefaultOffsetsTopicReplicationFactor, (1, Short.MaxValue))
val offsetsTopicPartitions: Int = props.getIntInRange("offsets.topic.num.partitions",
OffsetManagerConfig.DefaultOffsetsTopicNumPartitions, (1, Integer.MAX_VALUE))
val offsetsTopicSegmentBytes: Int = props.getIntInRange("offsets.topic.segment.bytes",
OffsetManagerConfig.DefaultOffsetsTopicSegmentBytes, (1, Integer.MAX_VALUE))
val offsetsTopicCompressionCodec = props.getCompressionCodec("offsets.topic.compression.codec",
OffsetManagerConfig.DefaultOffsetsTopicCompressionCodec)
val offsetsRetentionMinutes: Int = props.getIntInRange("offsets.retention.minutes", 24*60, (1, Integer.MAX_VALUE))
val offsetsRetentionCheckIntervalMs: Long = props.getLongInRange("offsets.retention.check.interval.ms",
OffsetManagerConfig.DefaultOffsetsRetentionCheckIntervalMs, (1, Long.MaxValue))
val offsetCommitTimeoutMs = props.getIntInRange("offsets.commit.timeout.ms",
OffsetManagerConfig.DefaultOffsetCommitTimeoutMs, (1, Integer.MAX_VALUE))
val offsetCommitRequiredAcks = props.getShortInRange("offsets.commit.required.acks",
OffsetManagerConfig.DefaultOffsetCommitRequiredAcks, (-1, offsetsTopicReplicationFactor))
val deleteTopicEnable = props.getBoolean("delete.topic.enable", false)
}