package kafka.api
import kafka.common.KafkaException
import java.nio.ByteBuffer
object RequestKeys {
val ProduceKey: Short = 0
val FetchKey: Short = 1
val OffsetsKey: Short = 2
val MetadataKey: Short = 3
val LeaderAndIsrKey: Short = 4
val StopReplicaKey: Short = 5
val UpdateMetadataKey: Short = 6
val ControlledShutdownKey: Short = 7
val OffsetCommitKey: Short = 8
val OffsetFetchKey: Short = 9
val ConsumerMetadataKey: Short = 10
val JoinGroupKey: Short = 11
val HeartbeatKey: Short = 12
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
FetchKey -> ("Fetch", FetchRequest.readFrom),
OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom),
JoinGroupKey -> ("JoinGroup", JoinGroupRequestAndHeader.readFrom),
HeartbeatKey -> ("Heartbeat", HeartbeatRequestAndHeader.readFrom)
)
def nameForKey(key: Short): String = {
keyToNameAndDeserializerMap.get(key) match {
case Some(nameAndSerializer) => nameAndSerializer._1
case None => throw new KafkaException("Wrong request type %d".format(key))
}
}
def deserializerForKey(key: Short): (ByteBuffer) => RequestOrResponse = {
keyToNameAndDeserializerMap.get(key) match {
case Some(nameAndSerializer) => nameAndSerializer._2
case None => throw new KafkaException("Wrong request type %d".format(key))
}
}
}