package kafka.server
import kafka.api._
import kafka.common.ErrorMapping
import kafka.common.TopicAndPartition
import kafka.utils.Logging
import kafka.network.RequestChannel
import scala.Some
import scala.collection.immutable.Map
import scala.collection.Seq
class DelayedProduce(override val keys: Seq[TopicAndPartition],
override val request: RequestChannel.Request,
override val delayMs: Long,
val produce: ProducerRequest,
val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
extends DelayedRequest(keys, request, delayMs) with Logging {
partitionStatus foreach { case (topicAndPartition, delayedStatus) =>
if (delayedStatus.responseStatus.error == ErrorMapping.NoError) {
delayedStatus.acksPending = true
delayedStatus.responseStatus.error = ErrorMapping.RequestTimedOutCode
} else {
delayedStatus.acksPending = false
}
trace("Initial partition status for %s is %s".format(topicAndPartition, delayedStatus))
}
def respond(offsetManager: OffsetManager): RequestOrResponse = {
val responseStatus = partitionStatus.mapValues(status => status.responseStatus)
val errorCode = responseStatus.find { case (_, status) =>
status.error != ErrorMapping.NoError
}.map(_._2.error).getOrElse(ErrorMapping.NoError)
if (errorCode == ErrorMapping.NoError) {
offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
}
val response = offsetCommitRequestOpt.map(_.responseFor(errorCode, offsetManager.config.maxMetadataSize))
.getOrElse(ProducerResponse(produce.correlationId, responseStatus))
response
}
def isSatisfied(replicaManager: ReplicaManager) = {
partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) =>
trace("Checking producer request satisfaction for %s, acksPending = %b"
.format(topicAndPartition, fetchPartitionStatus.acksPending))
if (fetchPartitionStatus.acksPending) {
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
val (hasEnough, errorCode) = partitionOpt match {
case Some(partition) =>
partition.checkEnoughReplicasReachOffset(
fetchPartitionStatus.requiredOffset,
produce.requiredAcks)
case None =>
(false, ErrorMapping.UnknownTopicOrPartitionCode)
}
if (errorCode != ErrorMapping.NoError) {
fetchPartitionStatus.acksPending = false
fetchPartitionStatus.responseStatus.error = errorCode
} else if (hasEnough) {
fetchPartitionStatus.acksPending = false
fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError
}
}
}
val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
satisfied
}
}
case class DelayedProduceResponseStatus(val requiredOffset: Long,
val responseStatus: ProducerResponseStatus) {
@volatile var acksPending = false
override def toString =
"acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format(
acksPending, responseStatus.error, responseStatus.offset, requiredOffset)
}