package kafka.producer
import kafka.cluster.Broker
import java.util.Properties
import collection.mutable.HashMap
import java.lang.Object
import kafka.utils.Logging
import kafka.api.TopicMetadata
import kafka.common.UnavailableProducerException
object ProducerPool {
def createSyncProducer(config: ProducerConfig, broker: Broker): SyncProducer = {
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
props.putAll(config.props.props)
new SyncProducer(new SyncProducerConfig(props))
}
}
class ProducerPool(val config: ProducerConfig) extends Logging {
private val syncProducers = new HashMap[Int, SyncProducer]
private val lock = new Object()
def updateProducer(topicMetadata: Seq[TopicMetadata]) {
val newBrokers = new collection.mutable.HashSet[Broker]
topicMetadata.foreach(tmd => {
tmd.partitionsMetadata.foreach(pmd => {
if(pmd.leader.isDefined)
newBrokers+=(pmd.leader.get)
})
})
lock synchronized {
newBrokers.foreach(b => {
if(syncProducers.contains(b.id)){
syncProducers(b.id).close()
syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
} else
syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
})
}
}
def getProducer(brokerId: Int) : SyncProducer = {
lock.synchronized {
val producer = syncProducers.get(brokerId)
producer match {
case Some(p) => p
case None => throw new UnavailableProducerException("Sync producer for broker id %d does not exist".format(brokerId))
}
}
}
def close() = {
lock.synchronized {
info("Closing all sync producers")
val iter = syncProducers.values.iterator
while(iter.hasNext)
iter.next.close
}
}
}