package kafka.server
import kafka.admin.AdminUtils
import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.message.ByteBufferMessageSet
import kafka.api.{OffsetRequest, FetchResponsePartitionData}
import kafka.common.{KafkaStorageException, TopicAndPartition}
class ReplicaFetcherThread(name:String,
sourceBroker: Broker,
brokerConfig: KafkaConfig,
replicaMgr: ReplicaManager)
extends AbstractFetcherThread(name = name,
clientId = name,
sourceBroker = sourceBroker,
socketTimeout = brokerConfig.replicaSocketTimeoutMs,
socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes,
fetchSize = brokerConfig.replicaFetchMaxBytes,
fetcherBrokerId = brokerConfig.brokerId,
maxWait = brokerConfig.replicaFetchWaitMaxMs,
minBytes = brokerConfig.replicaFetchMinBytes,
isInterruptible = false) {
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
try {
val topic = topicAndPartition.topic
val partitionId = topicAndPartition.partition
val replica = replicaMgr.getReplica(topic, partitionId).get
val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
if (fetchOffset != replica.logEndOffset.messageOffset)
throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
.format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw))
replica.log.get.append(messageSet, assignOffsets = false)
trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
.format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicAndPartition))
val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.hw)
replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
.format(replica.brokerId, topic, partitionId, followerHighWatermark))
} catch {
case e: KafkaStorageException =>
fatal("Disk error while replicating data.", e)
Runtime.getRuntime.halt(1)
}
}
def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
if (leaderEndOffset < replica.logEndOffset.messageOffset) {
if (!LogConfig.fromProps(brokerConfig.props.props, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
topicAndPartition.topic)).uncleanLeaderElectionEnable) {
fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
" Current leader %d's latest offset %d is less than replica %d's latest offset %d"
.format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset))
Runtime.getRuntime.halt(1)
}
replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset))
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d"
.format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
leaderEndOffset
} else {
val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
.format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
leaderStartOffset
}
}
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
}
}