package kafka.consumer
import java.util.Properties
import kafka.api.OffsetRequest
import kafka.utils._
import kafka.common.{InvalidConfigException, Config}
object ConsumerConfig extends Config {
val RefreshMetadataBackoffMs = 200
val SocketTimeout = 30 * 1000
val SocketBufferSize = 64*1024
val FetchSize = 1024 * 1024
val MaxFetchSize = 10*FetchSize
val NumConsumerFetchers = 1
val DefaultFetcherBackoffMs = 1000
val AutoCommit = true
val AutoCommitInterval = 60 * 1000
val MaxQueuedChunks = 2
val MaxRebalanceRetries = 4
val AutoOffsetReset = OffsetRequest.LargestTimeString
val ConsumerTimeoutMs = -1
val MinFetchBytes = 1
val MaxFetchWaitMs = 100
val MirrorTopicsWhitelist = ""
val MirrorTopicsBlacklist = ""
val MirrorConsumerNumThreads = 1
val OffsetsChannelBackoffMs = 1000
val OffsetsChannelSocketTimeoutMs = 10000
val OffsetsCommitMaxRetries = 5
val OffsetsStorage = "zookeeper"
val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
val ExcludeInternalTopics = true
val DefaultPartitionAssignmentStrategy = "range"
val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
val DefaultClientId = ""
def validate(config: ConsumerConfig) {
validateClientId(config.clientId)
validateGroupId(config.groupId)
validateAutoOffsetReset(config.autoOffsetReset)
validateOffsetsStorage(config.offsetsStorage)
}
def validateClientId(clientId: String) {
validateChars("client.id", clientId)
}
def validateGroupId(groupId: String) {
validateChars("group.id", groupId)
}
def validateAutoOffsetReset(autoOffsetReset: String) {
autoOffsetReset match {
case OffsetRequest.SmallestTimeString =>
case OffsetRequest.LargestTimeString =>
case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of auto.offset.reset in ConsumerConfig; " +
"Valid values are " + OffsetRequest.SmallestTimeString + " and " + OffsetRequest.LargestTimeString)
}
}
def validateOffsetsStorage(storage: String) {
storage match {
case "zookeeper" =>
case "kafka" =>
case _ => throw new InvalidConfigException("Wrong value " + storage + " of offsets.storage in consumer config; " +
"Valid values are 'zookeeper' and 'kafka'")
}
}
}
class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
import ConsumerConfig._
def this(originalProps: Properties) {
this(new VerifiableProperties(originalProps))
props.verify()
}
val groupId = props.getString("group.id")
val consumerId: Option[String] = Option(props.getString("consumer.id", null))
val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" +
" to prevent unnecessary socket timeouts")
val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize)
val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers)
val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit)
val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval)
val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks)
val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries)
val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes)
val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)
val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
val offsetsChannelBackoffMs = props.getInt("offsets.channel.backoff.ms", OffsetsChannelBackoffMs)
val offsetsChannelSocketTimeoutMs = props.getInt("offsets.channel.socket.timeout.ms", OffsetsChannelSocketTimeoutMs)
val offsetsCommitMaxRetries = props.getInt("offsets.commit.max.retries", OffsetsCommitMaxRetries)
val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase
val dualCommitEnabled = props.getBoolean("dual.commit.enabled", if (offsetsStorage == "kafka") true else false)
val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset)
val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs)
val clientId = props.getString("client.id", groupId)
val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics)
val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)
validate(this)
}