package kafka.cluster
import kafka.common._
import kafka.admin.AdminUtils
import kafka.utils._
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
import kafka.server.{LogOffsetMetadata, OffsetManager, ReplicaManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet
import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.Utils.{inReadLock,inWriteLock}
import scala.collection.immutable.Set
import com.yammer.metrics.core.Gauge
class Partition(val topic: String,
val partitionId: Int,
time: Time,
replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
private val localBrokerId = replicaManager.config.brokerId
private val logManager = replicaManager.logManager
private val zkClient = replicaManager.zkClient
private val assignedReplicaMap = new Pool[Int, Replica]
private val leaderIsrUpdateLock = new ReentrantReadWriteLock()
private var zkVersion: Int = LeaderAndIsr.initialZKVersion
@volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
@volatile var leaderReplicaIdOpt: Option[Int] = None
@volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
this.logIdent = "Partition [%s,%d] on broker %d: ".format(topic, partitionId, localBrokerId)
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
newGauge("UnderReplicated",
new Gauge[Int] {
def value = {
if (isUnderReplicated) 1 else 0
}
},
Map("topic" -> topic, "partition" -> partitionId.toString)
)
def isUnderReplicated(): Boolean = {
leaderReplicaIfLocal() match {
case Some(_) =>
inSyncReplicas.size < assignedReplicas.size
case None =>
false
}
}
def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
val replicaOpt = getReplica(replicaId)
replicaOpt match {
case Some(replica) => replica
case None =>
if (isReplicaLocal(replicaId)) {
val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic))
val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read
if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
val localReplica = new Replica(replicaId, this, time, offset, Some(log))
addReplicaIfNotExists(localReplica)
} else {
val remoteReplica = new Replica(replicaId, this, time)
addReplicaIfNotExists(remoteReplica)
}
getReplica(replicaId).get
}
}
def getReplica(replicaId: Int = localBrokerId): Option[Replica] = {
val replica = assignedReplicaMap.get(replicaId)
if (replica == null)
None
else
Some(replica)
}
def leaderReplicaIfLocal(): Option[Replica] = {
leaderReplicaIdOpt match {
case Some(leaderReplicaId) =>
if (leaderReplicaId == localBrokerId)
getReplica(localBrokerId)
else
None
case None => None
}
}
def addReplicaIfNotExists(replica: Replica) = {
assignedReplicaMap.putIfNotExists(replica.brokerId, replica)
}
def assignedReplicas(): Set[Replica] = {
assignedReplicaMap.values.toSet
}
def removeReplica(replicaId: Int) {
assignedReplicaMap.remove(replicaId)
}
def delete() {
inWriteLock(leaderIsrUpdateLock) {
assignedReplicaMap.clear()
inSyncReplicas = Set.empty[Replica]
leaderReplicaIdOpt = None
try {
logManager.deleteLog(TopicAndPartition(topic, partitionId))
} catch {
case e: IOException =>
fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e)
Runtime.getRuntime().halt(1)
}
}
}
def getLeaderEpoch(): Int = {
return this.leaderEpoch
}
def makeLeader(controllerId: Int,
partitionStateInfo: PartitionStateInfo, correlationId: Int,
offsetManager: OffsetManager): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
allReplicas.foreach(replica => getOrCreateReplica(replica))
val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = newInSyncReplicas
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some(localBrokerId)
val newLeaderReplica = getReplica().get
newLeaderReplica.convertHWToLocalOffsetMetadata()
assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata)
maybeIncrementLeaderHW(newLeaderReplica)
if (topic == OffsetManager.OffsetsTopicName)
offsetManager.loadOffsetsFromLog(partitionId)
true
}
}
def makeFollower(controllerId: Int,
partitionStateInfo: PartitionStateInfo,
correlationId: Int, offsetManager: OffsetManager): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val newLeaderBrokerId: Int = leaderAndIsr.leader
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
allReplicas.foreach(r => getOrCreateReplica(r))
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = Set.empty[Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt.foreach { leaderReplica =>
if (topic == OffsetManager.OffsetsTopicName &&
leaderReplica == localBrokerId)
offsetManager.clearOffsetsInPartition(partitionId)
}
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
false
}
else {
leaderReplicaIdOpt = Some(newLeaderBrokerId)
true
}
}
}
def updateLeaderHWAndMaybeExpandIsr(replicaId: Int) {
inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val replica = getReplica(replicaId).get
val leaderHW = leaderReplica.highWatermark
if (!inSyncReplicas.contains(replica) &&
assignedReplicas.map(_.brokerId).contains(replicaId) &&
replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
val newInSyncReplicas = inSyncReplicas + replica
info("Expanding ISR for partition [%s,%d] from %s to %s"
.format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
updateIsr(newInSyncReplicas)
replicaManager.isrExpandRate.mark()
}
maybeIncrementLeaderHW(leaderReplica)
case None =>
}
}
}
def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val curInSyncReplicas = inSyncReplicas
val numAcks = curInSyncReplicas.count(r => {
if (!r.isLocal)
r.logEndOffset.messageOffset >= requiredOffset
else
true
})
val minIsr = leaderReplica.log.get.config.minInSyncReplicas
trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
if (minIsr <= curInSyncReplicas.size) {
(true, ErrorMapping.NoError)
} else {
(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
}
} else if (requiredAcks > 0 && numAcks >= requiredAcks) {
(true, ErrorMapping.NoError)
} else
(false, ErrorMapping.NoError)
case None =>
(false, ErrorMapping.NotLeaderForPartitionCode)
}
}
private def maybeIncrementLeaderHW(leaderReplica: Replica) {
val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark
if(oldHighWatermark.precedes(newHighWatermark)) {
leaderReplica.highWatermark = newHighWatermark
debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
val requestKey = new TopicAndPartition(this.topic, this.partitionId)
replicaManager.unblockDelayedFetchRequests(requestKey)
replicaManager.unblockDelayedProduceRequests(requestKey)
} else {
debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
.format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
}
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.size > 0)
info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
updateIsr(newInSyncReplicas)
maybeIncrementLeaderHW(leaderReplica)
replicaManager.isrShrinkRate.mark()
}
case None =>
}
}
}
def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = {
val leaderLogEndOffset = leaderReplica.logEndOffset
val candidateReplicas = inSyncReplicas - leaderReplica
val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs)
if(stuckReplicas.size > 0)
debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
val slowReplicas = candidateReplicas.filter(r =>
r.logEndOffset.messageOffset >= 0 &&
leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)
if(slowReplicas.size > 0)
debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
stuckReplicas ++ slowReplicas
}
def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = {
inReadLock(leaderIsrUpdateLock) {
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
case Some(leaderReplica) =>
val log = leaderReplica.log.get
val minIsr = log.config.minInSyncReplicas
val inSyncSize = inSyncReplicas.size
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
.format(topic,partitionId,minIsr,inSyncSize))
}
val info = log.append(messages, assignOffsets = true)
replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))
maybeIncrementLeaderHW(leaderReplica)
info
case None =>
throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
.format(topic, partitionId, localBrokerId))
}
}
}
private def updateIsr(newIsr: Set[Replica]) {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partitionId,
newLeaderAndIsr, controllerEpoch, zkVersion)
if(updateSucceeded) {
inSyncReplicas = newIsr
zkVersion = newVersion
trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
} else {
info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
}
}
override def equals(that: Any): Boolean = {
if(!(that.isInstanceOf[Partition]))
return false
val other = that.asInstanceOf[Partition]
if(topic.equals(other.topic) && partitionId == other.partitionId)
return true
false
}
override def hashCode(): Int = {
31 + topic.hashCode() + 17*partitionId
}
override def toString(): String = {
val partitionString = new StringBuilder
partitionString.append("Topic: " + topic)
partitionString.append("; Partition: " + partitionId)
partitionString.append("; Leader: " + leaderReplicaIdOpt)
partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
partitionString.toString()
}
}