package kafka.api
import java.nio.ByteBuffer
import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.api.ApiUtils._
object OffsetResponse {
def readFrom(buffer: ByteBuffer): OffsetResponse = {
val correlationId = buffer.getInt
val numTopics = buffer.getInt
val pairs = (1 to numTopics).flatMap(_ => {
val topic = readShortString(buffer)
val numPartitions = buffer.getInt
(1 to numPartitions).map(_ => {
val partition = buffer.getInt
val error = buffer.getShort
val numOffsets = buffer.getInt
val offsets = (1 to numOffsets).map(_ => buffer.getLong)
(TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets))
})
})
OffsetResponse(correlationId, Map(pairs:_*))
}
}
case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) {
override def toString(): String = {
new String("error: " + ErrorMapping.exceptionFor(error).getClass.getName + " offsets: " + offsets.mkString)
}
}
case class OffsetResponse(correlationId: Int,
partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
extends RequestOrResponse() {
lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
def hasError = partitionErrorAndOffsets.values.exists(_.error != ErrorMapping.NoError)
val sizeInBytes = {
4 +
4 +
offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
val (topic, errorAndOffsetsMap) = currTopic
foldedTopics +
shortStringLength(topic) +
4 +
errorAndOffsetsMap.foldLeft(0)((foldedPartitions, currPartition) => {
foldedPartitions +
4 +
2 +
4 +
currPartition._2.offsets.size * 8
})
})
}
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putInt(offsetsGroupedByTopic.size)
offsetsGroupedByTopic.foreach {
case((topic, errorAndOffsetsMap)) =>
writeShortString(buffer, topic)
buffer.putInt(errorAndOffsetsMap.size)
errorAndOffsetsMap.foreach {
case((TopicAndPartition(_, partition), errorAndOffsets)) =>
buffer.putInt(partition)
buffer.putShort(errorAndOffsets.error)
buffer.putInt(errorAndOffsets.offsets.size)
errorAndOffsets.offsets.foreach(buffer.putLong(_))
}
}
}
override def describe(details: Boolean):String = { toString }
}