package kafka.api
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.utils.Logging
import kafka.network.{BoundedByteBufferSend, RequestChannel}
import kafka.network.RequestChannel.Response
import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
object OffsetFetchRequest extends Logging {
val CurrentVersion: Short = 1
val DefaultClientId = ""
def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val consumerGroupId = readShortString(buffer)
val topicCount = buffer.getInt
val pairs = (1 to topicCount).flatMap(_ => {
val topic = readShortString(buffer)
val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
val partitionId = buffer.getInt
TopicAndPartition(topic, partitionId)
})
})
OffsetFetchRequest(consumerGroupId, pairs, versionId, correlationId, clientId)
}
}
case class OffsetFetchRequest(groupId: String,
requestInfo: Seq[TopicAndPartition],
versionId: Short = OffsetFetchRequest.CurrentVersion,
correlationId: Int = 0,
clientId: String = OffsetFetchRequest.DefaultClientId)
extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) {
lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
writeShortString(buffer, groupId)
buffer.putInt(requestInfoGroupedByTopic.size)
requestInfoGroupedByTopic.foreach( t1 => {
writeShortString(buffer, t1._1)
buffer.putInt(t1._2.size)
t1._2.foreach( t2 => {
buffer.putInt(t2.partition)
})
})
}
override def sizeInBytes =
2 +
4 +
shortStringLength(clientId) +
shortStringLength(groupId) +
4 +
requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
count + shortStringLength(t._1) +
4 +
t._2.size * 4
})
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val responseMap = requestInfo.map {
case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
offset = OffsetAndMetadata.InvalidOffset,
error = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
))
}.toMap
val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
override def describe(details: Boolean): String = {
val offsetFetchRequest = new StringBuilder
offsetFetchRequest.append("Name: " + this.getClass.getSimpleName)
offsetFetchRequest.append("; Version: " + versionId)
offsetFetchRequest.append("; CorrelationId: " + correlationId)
offsetFetchRequest.append("; ClientId: " + clientId)
offsetFetchRequest.append("; GroupId: " + groupId)
if(details)
offsetFetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
offsetFetchRequest.toString()
}
override def toString: String = {
describe(details = true)
}
}