package kafka.tools
import java.io.BufferedReader
import java.io.FileReader
import joptsimple._
import kafka.utils.{Logging, ZkUtils,ZKStringSerializer, CommandLineUtils}
import org.I0Itec.zkclient.ZkClient
object ImportZkOffsets extends Logging {
def main(args: Array[String]) {
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zkconnect", "ZooKeeper connect string.")
.withRequiredArg()
.defaultsTo("localhost:2181")
.ofType(classOf[String])
val inFileOpt = parser.accepts("input-file", "Input file")
.withRequiredArg()
.ofType(classOf[String])
parser.accepts("help", "Print this message.")
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "Import offsets to zookeeper from files.")
val options = parser.parse(args : _*)
if (options.has("help")) {
parser.printHelpOn(System.out)
System.exit(0)
}
CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val partitionOffsetFile = options.valueOf(inFileOpt)
val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile)
updateZkOffsets(zkClient, partitionOffsets)
}
private def getPartitionOffsetsFromFile(filename: String):Map[String,String] = {
val fr = new FileReader(filename)
val br = new BufferedReader(fr)
var partOffsetsMap: Map[String,String] = Map()
var s: String = br.readLine()
while ( s != null && s.length() >= 1) {
val tokens = s.split(":")
partOffsetsMap += tokens(0) -> tokens(1)
debug("adding node path [" + s + "]")
s = br.readLine()
}
return partOffsetsMap
}
private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = {
for ((partition, offset) <- partitionOffsets) {
debug("updating [" + partition + "] with offset [" + offset + "]")
try {
ZkUtils.updatePersistentPath(zkClient, partition, offset.toString)
} catch {
case e: Throwable => e.printStackTrace()
}
}
}
}