/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.tools

import joptsimple._
import kafka.utils._
import kafka.consumer._
import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
import kafka.cluster.Broker
import scala.collection.JavaConversions._
import kafka.common.TopicAndPartition

/**
 * Command line program to dump out messages to standard out using the simple consumer
 */
object SimpleConsumerShell extends Logging {

  def UseLeaderReplica = -1

  def main(args: Array[String]): Unit = {

    val parser = new OptionParser
    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
                           .withRequiredArg
                           .describedAs("hostname:port,...,hostname:port")
                           .ofType(classOf[String])
    val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
                           .withRequiredArg
                           .describedAs("topic")
                           .ofType(classOf[String])
    val partitionIdOpt = parser.accepts("partition", "The partition to consume from.")
                           .withRequiredArg
                           .describedAs("partition")
                           .ofType(classOf[java.lang.Integer])
                           .defaultsTo(0)
    val replicaIdOpt = parser.accepts("replica", "The replica id to consume from, default -1 means leader broker.")
                           .withRequiredArg
                           .describedAs("replica id")
                           .ofType(classOf[java.lang.Integer])
                           .defaultsTo(UseLeaderReplica)
    val offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end")
                           .withRequiredArg
                           .describedAs("consume offset")
                           .ofType(classOf[java.lang.Long])
                           .defaultsTo(OffsetRequest.EarliestTime)
    val clientIdOpt = parser.accepts("clientId", "The ID of this client.")
                           .withRequiredArg
                           .describedAs("clientId")
                           .ofType(classOf[String])
                           .defaultsTo("SimpleConsumerShell")
    val fetchSizeOpt = parser.accepts("fetchsize", "The fetch size of each request.")
                           .withRequiredArg
                           .describedAs("fetchsize")
                           .ofType(classOf[java.lang.Integer])
                           .defaultsTo(1024 * 1024)
    val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
                           .withRequiredArg
                           .describedAs("class")
                           .ofType(classOf[String])
                           .defaultsTo(classOf[DefaultMessageFormatter].getName)
    val messageFormatterArgOpt = parser.accepts("property")
                           .withRequiredArg
                           .describedAs("prop")
                           .ofType(classOf[String])
    val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator")
    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
                           .withRequiredArg
                           .describedAs("ms")
                           .ofType(classOf[java.lang.Integer])
                           .defaultsTo(1000)
    val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume")
                           .withRequiredArg
                           .describedAs("max-messages")
                           .ofType(classOf[java.lang.Integer])
                           .defaultsTo(Integer.MAX_VALUE)
    val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
        "skip it instead of halt.")
    val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
        "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages")
        
    if(args.length == 0)
      CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.")

    val options = parser.parse(args : _*)
    CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt)

    val topic = options.valueOf(topicOpt)
    val partitionId = options.valueOf(partitionIdOpt).intValue()
    val replicaId = options.valueOf(replicaIdOpt).intValue()
    var startingOffset = options.valueOf(offsetOpt).longValue
    val fetchSize = options.valueOf(fetchSizeOpt).intValue
    val clientId = options.valueOf(clientIdOpt).toString
    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
    val maxMessages = options.valueOf(maxMessagesOpt).intValue

    val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
    val printOffsets = if(options.has(printOffsetOpt)) true else false
    val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt)

    val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
    val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))

    val fetchRequestBuilder = new FetchRequestBuilder()
                       .clientId(clientId)
                       .replicaId(Request.DebuggingConsumerId)
                       .maxWait(maxWaitMs)
                       .minBytes(ConsumerConfig.MinFetchBytes)

    // getting topic metadata
    info("Getting topic metatdata...")
    val brokerList = options.valueOf(brokerListOpt)
    ToolsUtils.validatePortOrDie(parser,brokerList)
    val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
      System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
      System.exit(1)
    }

    // validating partition id
    val partitionsMetadata = topicsMetadata(0).partitionsMetadata
    val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId)
    if(!partitionMetadataOpt.isDefined) {
      System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic))
      System.exit(1)
    }

    // validating replica id and initializing target broker
    var fetchTargetBroker: Broker = null
    var replicaOpt: Option[Broker] = null
    if(replicaId == UseLeaderReplica) {
      replicaOpt = partitionMetadataOpt.get.leader
      if(!replicaOpt.isDefined) {
        System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId))
        System.exit(1)
      }
    }
    else {
      val replicasForPartition = partitionMetadataOpt.get.replicas
      replicaOpt = replicasForPartition.find(r => r.id == replicaId)
      if(!replicaOpt.isDefined) {
        System.err.println("Error: replica %d does not exist for partition (%s, %d)".format(replicaId, topic, partitionId))
        System.exit(1)
      }
    }
    fetchTargetBroker = replicaOpt.get

    // initializing starting offset
    if(startingOffset < OffsetRequest.EarliestTime) {
      System.err.println("Invalid starting offset: %d".format(startingOffset))
      System.exit(1)
    }
    if (startingOffset < 0) {
      val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
                                              ConsumerConfig.SocketBufferSize, clientId)
      try {
        startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset,
                                                               Request.DebuggingConsumerId)
      } catch {
        case t: Throwable =>
          System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t))
          System.exit(1)
      } finally {
        if (simpleConsumer != null)
          simpleConsumer.close()
      }
    }

    // initializing formatter
    val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
    formatter.init(formatterArgs)

    val replicaString = if(replicaId > 0) "leader" else "replica"
    info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]"
                 .format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
    val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
    val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
      def run() {
        var offset = startingOffset
        var numMessagesConsumed = 0
        try {
          while(numMessagesConsumed < maxMessages) {
            val fetchRequest = fetchRequestBuilder
                    .addFetch(topic, partitionId, offset, fetchSize)
                    .build()
            val fetchResponse = simpleConsumer.fetch(fetchRequest)
            val messageSet = fetchResponse.messageSet(topic, partitionId)
            if (messageSet.validBytes <= 0 && noWaitAtEndOfLog) {
              println("Terminating. Reached the end of partition (%s, %d) at offset %d".format(topic, partitionId, offset))
              return
            }
            debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
            for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) {
              try {
                offset = messageAndOffset.nextOffset
                if(printOffsets)
                  System.out.println("next offset = " + offset)
                val message = messageAndOffset.message
                val key = if(message.hasKey) Utils.readBytes(message.key) else null
                formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out)
                numMessagesConsumed += 1
              } catch {
                case e: Throwable =>
                  if (skipMessageOnError)
                    error("Error processing message, skipping this message: ", e)
                  else
                    throw e
              }
              if(System.out.checkError()) {
                // This means no one is listening to our output stream any more, time to shutdown
                System.err.println("Unable to write to standard out, closing consumer.")
                formatter.close()
                simpleConsumer.close()
                System.exit(1)
              }
            }
          }
        } catch {
          case e: Throwable =>
            error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e)
        }finally {
          info("Consumed " + numMessagesConsumed + " messages")
        }
      }
    }, false)
    thread.start()
    thread.join()
    System.out.flush()
    formatter.close()
    simpleConsumer.close()
  }
}