package kafka.utils
import java.io._
import java.nio._
import charset.Charset
import java.nio.channels._
import java.util.concurrent.locks.{ReadWriteLock, Lock}
import java.lang.management._
import javax.management._
import scala.collection._
import scala.collection.mutable
import java.util.Properties
import kafka.common.KafkaException
import kafka.common.KafkaStorageException
object Utils extends Logging {
def runnable(fun: => Unit): Runnable =
new Runnable {
def run() = fun
}
def daemonThread(runnable: Runnable): Thread =
newThread(runnable, true)
def daemonThread(name: String, runnable: Runnable): Thread =
newThread(name, runnable, true)
def daemonThread(name: String, fun: () => Unit): Thread =
daemonThread(name, runnable(fun))
def newThread(name: String, runnable: Runnable, daemon: Boolean): Thread = {
val thread = new Thread(runnable, name)
thread.setDaemon(daemon)
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
def uncaughtException(t: Thread, e: Throwable) {
error("Uncaught exception in thread '" + t.getName + "':", e)
}
})
thread
}
def newThread(runnable: Runnable, daemon: Boolean): Thread = {
val thread = new Thread(runnable)
thread.setDaemon(daemon)
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
def uncaughtException(t: Thread, e: Throwable) {
error("Uncaught exception in thread '" + t.getName + "':", e)
}
})
thread
}
def readBytes(buffer: ByteBuffer): Array[Byte] = readBytes(buffer, 0, buffer.limit)
def readBytes(buffer: ByteBuffer, offset: Int, size: Int): Array[Byte] = {
val dest = new Array[Byte](size)
if(buffer.hasArray) {
System.arraycopy(buffer.array, buffer.arrayOffset() + offset, dest, 0, size)
} else {
buffer.mark()
buffer.get(dest)
buffer.reset()
}
dest
}
def loadProps(filename: String): Properties = {
val props = new Properties()
var propStream: InputStream = null
try {
propStream = new FileInputStream(filename)
props.load(propStream)
} finally {
if(propStream != null)
propStream.close
}
props
}
def openChannel(file: File, mutable: Boolean): FileChannel = {
if(mutable)
new RandomAccessFile(file, "rw").getChannel()
else
new FileInputStream(file).getChannel()
}
def swallow(log: (Object, Throwable) => Unit, action: => Unit) {
try {
action
} catch {
case e: Throwable => log(e.getMessage(), e)
}
}
def equal(b1: ByteBuffer, b2: ByteBuffer): Boolean = {
if(b1.position != b2.position)
return false
if(b1.remaining != b2.remaining)
return false
for(i <- 0 until b1.remaining)
if(b1.get(i) != b2.get(i))
return false
return true
}
def readString(buffer: ByteBuffer, encoding: String = Charset.defaultCharset.toString): String = {
val bytes = new Array[Byte](buffer.remaining)
buffer.get(bytes)
new String(bytes, encoding)
}
def croak(message: String) {
System.err.println(message)
System.exit(1)
}
def rm(file: String): Unit = rm(new File(file))
def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f)))
def rm(file: File) {
if(file == null) {
return
} else if(file.isDirectory) {
val files = file.listFiles()
if(files != null) {
for(f <- files)
rm(f)
}
file.delete()
} else {
file.delete()
}
}
def registerMBean(mbean: Object, name: String): Boolean = {
try {
val mbs = ManagementFactory.getPlatformMBeanServer()
mbs synchronized {
val objName = new ObjectName(name)
if(mbs.isRegistered(objName))
mbs.unregisterMBean(objName)
mbs.registerMBean(mbean, objName)
true
}
} catch {
case e: Exception => {
error("Failed to register Mbean " + name, e)
false
}
}
}
def unregisterMBean(name: String) {
val mbs = ManagementFactory.getPlatformMBeanServer()
mbs synchronized {
val objName = new ObjectName(name)
if(mbs.isRegistered(objName))
mbs.unregisterMBean(objName)
}
}
def readUnsignedInt(buffer: ByteBuffer): Long =
buffer.getInt() & 0xffffffffL
def readUnsignedInt(buffer: ByteBuffer, index: Int): Long =
buffer.getInt(index) & 0xffffffffL
def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit =
buffer.putInt((value & 0xffffffffL).asInstanceOf[Int])
def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit =
buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int])
def crc32(bytes: Array[Byte]): Long = crc32(bytes, 0, bytes.length)
def crc32(bytes: Array[Byte], offset: Int, size: Int): Long = {
val crc = new Crc32()
crc.update(bytes, offset, size)
crc.getValue()
}
def hashcode(as: Any*): Int = {
if(as == null)
return 0
var h = 1
var i = 0
while(i < as.length) {
if(as(i) != null) {
h = 31 * h + as(i).hashCode
i += 1
}
}
return h
}
def groupby[K,V](vals: Iterable[V], f: V => K): Map[K,List[V]] = {
val m = new mutable.HashMap[K, List[V]]
for(v <- vals) {
val k = f(v)
m.get(k) match {
case Some(l: List[V]) => m.put(k, v :: l)
case None => m.put(k, List(v))
}
}
m
}
def read(channel: ReadableByteChannel, buffer: ByteBuffer): Int = {
channel.read(buffer) match {
case -1 => throw new EOFException("Received -1 when reading from channel, socket has likely been closed.")
case n: Int => n
}
}
def notNull[V](v: V) = {
if(v == null)
throw new KafkaException("Value cannot be null.")
else
v
}
def stackTrace(e: Throwable): String = {
val sw = new StringWriter
val pw = new PrintWriter(sw)
e.printStackTrace(pw)
sw.toString()
}
def parseCsvMap(str: String): Map[String, String] = {
val map = new mutable.HashMap[String, String]
if("".equals(str))
return map
val keyVals = str.split("\\s*,\\s*").map(s => s.split("\\s*:\\s*"))
keyVals.map(pair => (pair(0), pair(1))).toMap
}
def parseCsvList(csvList: String): Seq[String] = {
if(csvList == null || csvList.isEmpty)
Seq.empty[String]
else {
csvList.split("\\s*,\\s*").filter(v => !v.equals(""))
}
}
def createObject[T<:AnyRef](className: String, args: AnyRef*): T = {
val klass = Class.forName(className).asInstanceOf[Class[T]]
val constructor = klass.getConstructor(args.map(_.getClass): _*)
constructor.newInstance(args: _*).asInstanceOf[T]
}
def nullOrEmpty(s: String): Boolean = s == null || s.equals("")
def circularIterator[T](coll: Iterable[T]) = {
val stream: Stream[T] =
for (forever <- Stream.continually(1); t <- coll) yield t
stream.iterator
}
def readFileAsString(path: String, charset: Charset = Charset.defaultCharset()): String = {
val stream = new FileInputStream(new File(path))
try {
val fc = stream.getChannel()
val bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size())
charset.decode(bb).toString()
}
finally {
stream.close()
}
}
def abs(n: Int) = if(n == Integer.MIN_VALUE) 0 else math.abs(n)
def replaceSuffix(s: String, oldSuffix: String, newSuffix: String): String = {
if(!s.endsWith(oldSuffix))
throw new IllegalArgumentException("Expected string to end with '%s' but string is '%s'".format(oldSuffix, s))
s.substring(0, s.length - oldSuffix.length) + newSuffix
}
def createFile(path: String): File = {
val f = new File(path)
val created = f.createNewFile()
if(!created)
throw new KafkaStorageException("Failed to create file %s.".format(path))
f
}
def asString(props: Properties): String = {
val writer = new StringWriter()
props.store(writer, "")
writer.toString
}
def readProps(s: String, defaults: Properties): Properties = {
val reader = new StringReader(s)
val props = new Properties(defaults)
props.load(reader)
props
}
def readInt(bytes: Array[Byte], offset: Int): Int = {
((bytes(offset) & 0xFF) << 24) |
((bytes(offset + 1) & 0xFF) << 16) |
((bytes(offset + 2) & 0xFF) << 8) |
(bytes(offset + 3) & 0xFF)
}
def inLock[T](lock: Lock)(fun: => T): T = {
lock.lock()
try {
fun
} finally {
lock.unlock()
}
}
def inReadLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.readLock)(fun)
def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun)
def JSONEscapeString (s : String) : String = {
s.map {
case '"' => "\\\""
case '\\' => "\\\\"
case '/' => "\\/"
case '\b' => "\\b"
case '\f' => "\\f"
case '\n' => "\\n"
case '\r' => "\\r"
case '\t' => "\\t"
case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int)
case c => c
}.mkString
}
def duplicates[T](s: Traversable[T]): Iterable[T] = {
s.groupBy(identity)
.map{ case (k,l) => (k,l.size)}
.filter{ case (k,l) => (l > 1) }
.keys
}
}