package kafka.producer
import java.util.Properties
trait BaseProducer {
def send(topic: String, key: Array[Byte], value: Array[Byte])
def close()
}
class NewShinyProducer(producerProps: Properties) extends BaseProducer {
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
val sync = producerProps.getProperty("producer.type", "async").equals("sync")
val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, key, value)
if(sync) {
this.producer.send(record).get()
} else {
this.producer.send(record,
new ErrorLoggingCallback(topic, key, value, false))
}
}
override def close() {
this.producer.close()
}
}
class OldProducer(producerProps: Properties) extends BaseProducer {
import kafka.producer.{KeyedMessage, ProducerConfig}
if (producerProps.getProperty("partitioner.class") == null)
producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)
val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))
override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, value))
}
override def close() {
this.producer.close()
}
}