package kafka.api
import java.nio.ByteBuffer
import collection.mutable.HashMap
import collection.immutable.Map
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.api.ApiUtils._
object StopReplicaResponse {
def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
val correlationId = buffer.getInt
val errorCode = buffer.getShort
val numEntries = buffer.getInt
val responseMap = new HashMap[TopicAndPartition, Short]()
for (i<- 0 until numEntries){
val topic = readShortString(buffer)
val partition = buffer.getInt
val partitionErrorCode = buffer.getShort()
responseMap.put(TopicAndPartition(topic, partition), partitionErrorCode)
}
new StopReplicaResponse(correlationId, responseMap.toMap, errorCode)
}
}
case class StopReplicaResponse(val correlationId: Int,
val responseMap: Map[TopicAndPartition, Short],
val errorCode: Short = ErrorMapping.NoError)
extends RequestOrResponse() {
def sizeInBytes(): Int ={
var size =
4 +
2 +
4
for ((key, value) <- responseMap) {
size +=
2 + key.topic.length +
4 +
2
}
size
}
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putShort(errorCode)
buffer.putInt(responseMap.size)
for ((topicAndPartition, errorCode) <- responseMap){
writeShortString(buffer, topicAndPartition.topic)
buffer.putInt(topicAndPartition.partition)
buffer.putShort(errorCode)
}
}
override def describe(details: Boolean):String = { toString }
}