/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.tools

import scala.collection.JavaConversions._
import org.I0Itec.zkclient._
import joptsimple._
import java.util.Properties
import java.util.Random
import java.io.PrintStream
import kafka.message._
import kafka.serializer._
import kafka.utils._
import kafka.metrics.KafkaMetricsReporter
import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer}

/**
 * Consumer that dumps messages out to standard out.
 *
 */
object ConsoleConsumer extends Logging {

  def main(args: Array[String]) {
    val parser = new OptionParser
    val topicIdOpt = parser.accepts("topic", "The topic id to consume on.")
            .withRequiredArg
            .describedAs("topic")
            .ofType(classOf[String])
    val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
            .withRequiredArg
            .describedAs("whitelist")
            .ofType(classOf[String])
    val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.")
            .withRequiredArg
            .describedAs("blacklist")
            .ofType(classOf[String])
    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
            "Multiple URLS can be given to allow fail-over.")
            .withRequiredArg
            .describedAs("urls")
            .ofType(classOf[String])

    val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
            .withRequiredArg
            .describedAs("config file")
            .ofType(classOf[String])
    val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
            .withRequiredArg
            .describedAs("class")
            .ofType(classOf[String])
            .defaultsTo(classOf[DefaultMessageFormatter].getName)
    val messageFormatterArgOpt = parser.accepts("property")
            .withRequiredArg
            .describedAs("prop")
            .ofType(classOf[String])
    val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up");
    val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
            "start with the earliest message present in the log rather than the latest message.")
    val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
            .withRequiredArg
            .describedAs("num_messages")
            .ofType(classOf[java.lang.Integer])
    val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
            "skip it instead of halt.")
    val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
    val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
            "set, the csv metrics will be outputed here")
      .withRequiredArg
      .describedAs("metrics dictory")
      .ofType(classOf[java.lang.String])

    if(args.length == 0)
      CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.")
      
    var groupIdPassed = true
    val options: OptionSet = tryParse(parser, args)
    CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
    val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
    if (topicOrFilterOpt.size != 1)
      CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.")
    val topicArg = options.valueOf(topicOrFilterOpt.head)
    val filterSpec = if (options.has(blacklistOpt))
      new Blacklist(topicArg)
    else
      new Whitelist(topicArg)

    val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
    if (csvMetricsReporterEnabled) {
      val csvReporterProps = new Properties()
      csvReporterProps.put("kafka.metrics.polling.interval.secs", "5")
      csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
      if (options.has(metricsDirectoryOpt))
        csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
      else
        csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics")
      csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true")
      val verifiableProps = new VerifiableProperties(csvReporterProps)
      KafkaMetricsReporter.startReporters(verifiableProps)
    }



    val consumerProps = if (options.has(consumerConfigOpt))
      Utils.loadProps(options.valueOf(consumerConfigOpt))
    else
      new Properties()

    if(!consumerProps.containsKey("group.id")) {
      consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))
      groupIdPassed=false
    }
    consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
    consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt))

    if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) &&
       checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) {
      System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id")
        +". Please use --delete-consumer-offsets to delete previous offsets metadata")
      System.exit(1)
    }

    if(options.has(deleteConsumerOffsetsOpt))
      ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id"))

    val config = new ConsumerConfig(consumerProps)
    val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
    val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
    val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
    val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
    val connector = Consumer.create(config)

    Runtime.getRuntime.addShutdownHook(new Thread() {
      override def run() {
        connector.shutdown()
        // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
        if(!groupIdPassed)
          ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id"))
      }
    })

    var numMessages = 0L
    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
    formatter.init(formatterArgs)
    try {
      val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0)
      val iter = if(maxMessages >= 0)
        stream.slice(0, maxMessages)
      else
        stream

      for(messageAndTopic <- iter) {
        try {
          formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out)
          numMessages += 1
        } catch {
          case e: Throwable =>
            if (skipMessageOnError)
              error("Error processing message, skipping this message: ", e)
            else
              throw e
        }
        if(System.out.checkError()) {
          // This means no one is listening to our output stream any more, time to shutdown
          System.err.println("Unable to write to standard out, closing consumer.")
          System.err.println("Consumed %d messages".format(numMessages))
          formatter.close()
          connector.shutdown()
          System.exit(1)
        }
      }
    } catch {
      case e: Throwable => error("Error processing message, stopping consumer: ", e)
    }
    System.err.println("Consumed %d messages".format(numMessages))
    System.out.flush()
    formatter.close()
    connector.shutdown()
  }

  def tryParse(parser: OptionParser, args: Array[String]) = {
    try {
      parser.parse(args : _*)
    } catch {
      case e: OptionException => {
        Utils.croak(e.getMessage)
        null
      }
    }
  }

  def checkZkPathExists(zkUrl: String, path: String): Boolean = {
    try {
      val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer);
      zk.exists(path)
    } catch {
      case _: Throwable => false
    }
  }
}

trait MessageFormatter {
  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
  def init(props: Properties) {}
  def close() {}
}

class DefaultMessageFormatter extends MessageFormatter {
  var printKey = false
  var keySeparator = "\t".getBytes
  var lineSeparator = "\n".getBytes

  override def init(props: Properties) {
    if(props.containsKey("print.key"))
      printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
    if(props.containsKey("key.separator"))
      keySeparator = props.getProperty("key.separator").getBytes
    if(props.containsKey("line.separator"))
      lineSeparator = props.getProperty("line.separator").getBytes
  }

  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
    if(printKey) {
      output.write(if (key == null) "null".getBytes() else key)
      output.write(keySeparator)
    }
    output.write(if (value == null) "null".getBytes() else value)
    output.write(lineSeparator)
  }
}

class NoOpMessageFormatter extends MessageFormatter {
  override def init(props: Properties) {}
  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {}
}

class ChecksumMessageFormatter extends MessageFormatter {
  private var topicStr: String = _

  override def init(props: Properties) {
    topicStr = props.getProperty("topic")
    if (topicStr != null)
      topicStr = topicStr + ":"
    else
      topicStr = ""
  }

  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
    val chksum = new Message(value, key).checksum
    output.println(topicStr + "checksum:" + chksum)
  }
}