package kafka.api
import java.nio._
import kafka.api.ApiUtils._
import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException}
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.network.RequestChannel.Response
import kafka.utils.Logging
import collection.Set
object StopReplicaRequest extends Logging {
val CurrentVersion = 0.shortValue
val DefaultClientId = ""
val DefaultAckTimeout = 100
def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
val controllerId = buffer.getInt
val controllerEpoch = buffer.getInt
val deletePartitions = buffer.get match {
case 1 => true
case 0 => false
case x =>
throw new InvalidRequestException("Invalid byte %d in delete partitions field. (Assuming false.)".format(x))
}
val topicPartitionPairCount = buffer.getInt
val topicPartitionPairSet = new collection.mutable.HashSet[TopicAndPartition]()
(1 to topicPartitionPairCount) foreach { _ =>
topicPartitionPairSet.add(TopicAndPartition(readShortString(buffer), buffer.getInt))
}
StopReplicaRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
deletePartitions, topicPartitionPairSet.toSet)
}
}
case class StopReplicaRequest(versionId: Short,
correlationId: Int,
clientId: String,
controllerId: Int,
controllerEpoch: Int,
deletePartitions: Boolean,
partitions: Set[TopicAndPartition])
extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId,
controllerId, controllerEpoch, deletePartitions, partitions)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
buffer.putInt(controllerId)
buffer.putInt(controllerEpoch)
buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
buffer.putInt(partitions.size)
for (topicAndPartition <- partitions) {
writeShortString(buffer, topicAndPartition.topic)
buffer.putInt(topicAndPartition.partition)
}
}
def sizeInBytes(): Int = {
var size =
2 +
4 +
ApiUtils.shortStringLength(clientId) +
4 +
4 +
1 +
4
for (topicAndPartition <- partitions){
size += (ApiUtils.shortStringLength(topicAndPartition.topic)) +
4
}
size
}
override def toString(): String = {
describe(true)
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val responseMap = partitions.map {
case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}.toMap
val errorResponse = StopReplicaResponse(correlationId, responseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
override def describe(details: Boolean): String = {
val stopReplicaRequest = new StringBuilder
stopReplicaRequest.append("Name: " + this.getClass.getSimpleName)
stopReplicaRequest.append("; Version: " + versionId)
stopReplicaRequest.append("; CorrelationId: " + correlationId)
stopReplicaRequest.append("; ClientId: " + clientId)
stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
stopReplicaRequest.append("; ControllerId: " + controllerId)
stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)
if(details)
stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
stopReplicaRequest.toString()
}
}