package kafka.message
import java.nio._
import scala.math._
import kafka.utils._
object Message {
val CrcOffset = 0
val CrcLength = 4
val MagicOffset = CrcOffset + CrcLength
val MagicLength = 1
val AttributesOffset = MagicOffset + MagicLength
val AttributesLength = 1
val KeySizeOffset = AttributesOffset + AttributesLength
val KeySizeLength = 4
val KeyOffset = KeySizeOffset + KeySizeLength
val ValueSizeLength = 4
val MessageOverhead = KeyOffset + ValueSizeLength
val MinHeaderSize = CrcLength + MagicLength + AttributesLength + KeySizeLength ValueSizeLength
val CurrentMagicValue: Byte = 0
val CompressionCodeMask: Int = 0x07
val NoCompression: Int = 0
}
class Message(val buffer: ByteBuffer) {
import kafka.message.Message._
def this(bytes: Array[Byte],
key: Array[Byte],
codec: CompressionCodec,
payloadOffset: Int,
payloadSize: Int) = {
this(ByteBuffer.allocate(Message.CrcLength +
Message.MagicLength +
Message.AttributesLength +
Message.KeySizeLength +
(if(key == null) 0 else key.length) +
Message.ValueSizeLength +
(if(bytes == null) 0
else if(payloadSize >= 0) payloadSize
else bytes.length - payloadOffset)))
buffer.position(MagicOffset)
buffer.put(CurrentMagicValue)
var attributes: Byte = 0
if (codec.codec > 0)
attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
buffer.put(attributes)
if(key == null) {
buffer.putInt(-1)
} else {
buffer.putInt(key.length)
buffer.put(key, 0, key.length)
}
val size = if(bytes == null) -1
else if(payloadSize >= 0) payloadSize
else bytes.length - payloadOffset
buffer.putInt(size)
if(bytes != null)
buffer.put(bytes, payloadOffset, size)
buffer.rewind()
Utils.writeUnsignedInt(buffer, CrcOffset, computeChecksum)
}
def this(bytes: Array[Byte], key: Array[Byte], codec: CompressionCodec) =
this(bytes = bytes, key = key, codec = codec, payloadOffset = 0, payloadSize = -1)
def this(bytes: Array[Byte], codec: CompressionCodec) =
this(bytes = bytes, key = null, codec = codec)
def this(bytes: Array[Byte], key: Array[Byte]) =
this(bytes = bytes, key = key, codec = NoCompressionCodec)
def this(bytes: Array[Byte]) =
this(bytes = bytes, key = null, codec = NoCompressionCodec)
def computeChecksum(): Long =
Utils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset)
def checksum: Long = Utils.readUnsignedInt(buffer, CrcOffset)
def isValid: Boolean = checksum == computeChecksum
def ensureValid() {
if(!isValid)
throw new InvalidMessageException("Message is corrupt (stored crc = " + checksum + ", computed crc = " + computeChecksum() + ")")
}
def size: Int = buffer.limit
def keySize: Int = buffer.getInt(Message.KeySizeOffset)
def hasKey: Boolean = keySize >= 0
private def payloadSizeOffset = Message.KeyOffset + max(0, keySize)
def payloadSize: Int = buffer.getInt(payloadSizeOffset)
def isNull(): Boolean = payloadSize < 0
def magic: Byte = buffer.get(MagicOffset)
def attributes: Byte = buffer.get(AttributesOffset)
def compressionCodec: CompressionCodec =
CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask)
def payload: ByteBuffer = sliceDelimited(payloadSizeOffset)
def key: ByteBuffer = sliceDelimited(KeySizeOffset)
private def sliceDelimited(start: Int): ByteBuffer = {
val size = buffer.getInt(start)
if(size < 0) {
null
} else {
var b = buffer.duplicate
b.position(start + 4)
b = b.slice()
b.limit(size)
b.rewind
b
}
}
override def toString(): String =
"Message(magic = %d, attributes = %d, crc = %d, key = %s, payload = %s)".format(magic, attributes, checksum, key, payload)
override def equals(any: Any): Boolean = {
any match {
case that: Message => this.buffer.equals(that.buffer)
case _ => false
}
}
override def hashCode(): Int = buffer.hashCode
}