package kafka.api
import java.nio.ByteBuffer
import kafka.cluster.Broker
import kafka.common.ErrorMapping
object ConsumerMetadataResponse {
val CurrentVersion = 0
private val NoBrokerOpt = Some(Broker(id = -1, host = "", port = -1))
def readFrom(buffer: ByteBuffer) = {
val correlationId = buffer.getInt
val errorCode = buffer.getShort
val broker = Broker.readFrom(buffer)
val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
Some(broker)
else
None
ConsumerMetadataResponse(coordinatorOpt, errorCode, correlationId)
}
}
case class ConsumerMetadataResponse (coordinatorOpt: Option[Broker], errorCode: Short, correlationId: Int = 0)
extends RequestOrResponse() {
def sizeInBytes =
4 +
2 +
coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerOpt).get.sizeInBytes
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
buffer.putShort(errorCode)
coordinatorOpt.orElse(ConsumerMetadataResponse.NoBrokerOpt).foreach(_.writeTo(buffer))
}
def describe(details: Boolean) = toString
}