/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package kafka.producer import async.AsyncProducerConfig import java.util.Properties import kafka.utils.{Utils, VerifiableProperties} import kafka.message.{CompressionCodec, NoCompressionCodec} import kafka.common.{InvalidConfigException, Config} object ProducerConfig extends Config { def validate(config: ProducerConfig) { validateClientId(config.clientId) validateBatchSize(config.batchNumMessages, config.queueBufferingMaxMessages) validateProducerType(config.producerType) } def validateClientId(clientId: String) { validateChars("client.id", clientId) } def validateBatchSize(batchSize: Int, queueSize: Int) { if (batchSize > queueSize) throw new InvalidConfigException("Batch size = " + batchSize + " can't be larger than queue size = " + queueSize) } def validateProducerType(producerType: String) { producerType match { case "sync" => case "async"=> case _ => throw new InvalidConfigException("Invalid value " + producerType + " for producer.type, valid values are sync/async") } } } class ProducerConfig private (val props: VerifiableProperties) extends AsyncProducerConfig with SyncProducerConfigShared { import ProducerConfig._ def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) props.verify() } /** This is for bootstrapping and the producer will only use it for getting metadata * (topics, partitions and replicas). The socket connections for sending the actual data * will be established based on the broker information returned in the metadata. The * format is host1:port1,host2:port2, and the list can be a subset of brokers or * a VIP pointing to a subset of brokers. */ val brokerList = props.getString("metadata.broker.list") /** the partitioner class for partitioning events amongst sub-topics */ val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner") /** this parameter specifies whether the messages are sent asynchronously * * or not. Valid values are - async for asynchronous send * * sync for synchronous send */ val producerType = props.getString("producer.type", "sync") /** * This parameter allows you to specify the compression codec for all data generated * * by this producer. The default is NoCompressionCodec */ val compressionCodec = props.getCompressionCodec("compression.codec", NoCompressionCodec) /** This parameter allows you to set whether compression should be turned * * on for particular topics * * If the compression codec is anything other than NoCompressionCodec, * * Enable compression only for specified topics if any * * If the list of compressed topics is empty, then enable the specified compression codec for all topics * * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ val compressedTopics = Utils.parseCsvList(props.getString("compressed.topics", null)) /** The leader may be unavailable transiently, which can fail the sending of a message. * This property specifies the number of retries when such failures occur. */ val messageSendMaxRetries = props.getInt("message.send.max.retries", 3) /** Before each retry, the producer refreshes the metadata of relevant topics. Since leader * election takes a bit of time, this property specifies the amount of time that the producer * waits before refreshing the metadata. */ val retryBackoffMs = props.getInt("retry.backoff.ms", 100) /** * The producer generally refreshes the topic metadata from brokers when there is a failure * (partition missing, leader not available...). It will also poll regularly (default: every 10min * so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. * If you set this to zero, the metadata will get refreshed after each message sent (not recommended) * Important note: the refresh happen only AFTER the message is sent, so if the producer never sends * a message the metadata is never refreshed */ val topicMetadataRefreshIntervalMs = props.getInt("topic.metadata.refresh.interval.ms", 600000) validate(this) }