package kafka.api
import java.nio.ByteBuffer
import kafka.utils.Logging
import kafka.common.{ErrorMapping, TopicAndPartition}
object OffsetCommitResponse extends Logging {
val CurrentVersion: Short = 1
def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
val correlationId = buffer.getInt
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topic = ApiUtils.readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
val error = buffer.getShort
(TopicAndPartition(topic, partitionId), error)
})
})
OffsetCommitResponse(Map(pairs:_*), correlationId)
}
}
case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
correlationId: Int = 0)
extends RequestOrResponse() {
lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode != ErrorMapping.NoError }
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putInt(commitStatusGroupedByTopic.size)
commitStatusGroupedByTopic.foreach { case(topic, statusMap) =>
ApiUtils.writeShortString(buffer, topic)
buffer.putInt(statusMap.size)
statusMap.foreach { case(topicAndPartition, errorCode) =>
buffer.putInt(topicAndPartition.partition)
buffer.putShort(errorCode)
}
}
}
override def sizeInBytes =
4 +
4 +
commitStatusGroupedByTopic.foldLeft(0)((count, partitionStatusMap) => {
val (topic, partitionStatus) = partitionStatusMap
count +
ApiUtils.shortStringLength(topic) +
4 +
partitionStatus.size * ( 4 + 2 )
})
override def describe(details: Boolean):String = { toString }
}