package kafka.tools

import joptsimple.OptionParser
import java.util.concurrent.{Executors, CountDownLatch}
import java.util.Properties
import kafka.consumer._
import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils}
import kafka.api.OffsetRequest
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}

object ReplayLogProducer extends Logging {

  private val GroupId: String = "replay-log-producer"

  def main(args: Array[String]) {
    val config = new Config(args)

    val executor = Executors.newFixedThreadPool(config.numThreads)
    val allDone = new CountDownLatch(config.numThreads)

    // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
    ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId)

    // consumer properties
    val consumerProps = new Properties
    consumerProps.put("group.id", GroupId)
    consumerProps.put("zookeeper.connect", config.zkConnect)
    consumerProps.put("consumer.timeout.ms", "10000")
    consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString)
    consumerProps.put("fetch.message.max.bytes", (1024*1024).toString)
    consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString)
    val consumerConfig = new ConsumerConfig(consumerProps)
    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
    var threadList = List[ZKConsumerThread]()
    for ((topic, streamList) <- topicMessageStreams)
      for (stream <- streamList)
        threadList ::= new ZKConsumerThread(config, stream)

    for (thread <- threadList)


  class Config(args: Array[String]) {
    val parser = new OptionParser
    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
      "Multiple URLS can be given to allow fail-over.")
      .describedAs("zookeeper url")
    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.")
    val inputTopicOpt = parser.accepts("inputtopic", "REQUIRED: The topic to consume from.")
    val outputTopicOpt = parser.accepts("outputtopic", "REQUIRED: The topic to produce to")
    val numMessagesOpt = parser.accepts("messages", "The number of messages to send.")
    val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
    val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.")
    val propertyOpt = parser.accepts("property", "A mechanism to pass properties in the form key=value to the producer. " +
      "This allows the user to override producer properties that are not exposed by the existing command line arguments")
      .describedAs("producer properties")
    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")

    val options = parser.parse(args : _*)
    CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt)

    val zkConnect = options.valueOf(zkConnectOpt)
    val brokerList = options.valueOf(brokerListOpt)
    val numMessages = options.valueOf(numMessagesOpt).intValue
    val numThreads = options.valueOf(numThreadsOpt).intValue
    val inputTopic = options.valueOf(inputTopicOpt)
    val outputTopic = options.valueOf(outputTopicOpt)
    val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
    val isSync = options.has(syncOpt)
    import scala.collection.JavaConversions._
    val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")

  class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
    val shutdownLatch = new CountDownLatch(1)
    val producer = new KafkaProducer[Array[Byte],Array[Byte]](config.producerProps)

    override def run() {
      info("Starting consumer thread..")
      var messageCount: Int = 0
      try {
        val iter =
          if(config.numMessages >= 0)
            stream.slice(0, config.numMessages)
        for (messageAndMetadata <- iter) {
          try {
            val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic,
                                            messageAndMetadata.key(), messageAndMetadata.message()))
            if(config.isSync) {
            messageCount += 1
          }catch {
            case ie: Exception => error("Skipping this message", ie)
      }catch {
        case e: ConsumerTimeoutException => error("consumer thread timing out", e)
      info("Sent " + messageCount + " messages")
      info("thread finished execution !" )

    def shutdown() {
