package kafka.log
import java.io.File
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import kafka.utils.{Logging, Pool}
import kafka.server.OffsetCheckpoint
import collection.mutable
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.Utils._
import java.util.concurrent.TimeUnit
import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
private[log] sealed trait LogCleaningState
private[log] case object LogCleaningInProgress extends LogCleaningState
private[log] case object LogCleaningAborted extends LogCleaningState
private[log] case object LogCleaningPaused extends LogCleaningState
private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
override val loggerName = classOf[LogCleaner].getName
private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
private val lock = new ReentrantLock
private val pausedCleaningCond = lock.newCondition()
@volatile private var dirtiestLogCleanableRatio = 0.0
newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt })
def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
checkpoints.values.flatMap(_.read()).toMap
def grabFilthiestLog(): Option[LogToClean] = {
inLock(lock) {
val lastClean = allCleanerCheckpoints()
val dirtyLogs = logs.filter(l => l._2.config.compact)
.filterNot(l => inProgress.contains(l._1))
.map(l => LogToClean(l._1, l._2,
lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset)))
.filter(l => l.totalBytes > 0)
this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0
val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio)
if(cleanableLogs.isEmpty) {
None
} else {
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
Some(filthiest)
}
}
}
def abortCleaning(topicAndPartition: TopicAndPartition) {
inLock(lock) {
abortAndPauseCleaning(topicAndPartition)
resumeCleaning(topicAndPartition)
info("The cleaning for partition %s is aborted".format(topicAndPartition))
}
}
def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
inLock(lock) {
inProgress.get(topicAndPartition) match {
case None =>
inProgress.put(topicAndPartition, LogCleaningPaused)
case Some(state) =>
state match {
case LogCleaningInProgress =>
inProgress.put(topicAndPartition, LogCleaningAborted)
case s =>
throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state."
.format(topicAndPartition, s))
}
}
while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
}
}
def resumeCleaning(topicAndPartition: TopicAndPartition) {
inLock(lock) {
inProgress.get(topicAndPartition) match {
case None =>
throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused."
.format(topicAndPartition))
case Some(state) =>
state match {
case LogCleaningPaused =>
inProgress.remove(topicAndPartition)
case s =>
throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state."
.format(topicAndPartition, s))
}
}
}
info("Compaction for partition %s is resumed".format(topicAndPartition))
}
def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = {
inProgress.get(topicAndPartition) match {
case None => return false
case Some(state) =>
if (state == expectedState)
return true
else
return false
}
}
def checkCleaningAborted(topicAndPartition: TopicAndPartition) {
inLock(lock) {
if (isCleaningInState(topicAndPartition, LogCleaningAborted))
throw new LogCleaningAbortedException()
}
}
def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
inLock(lock) {
val checkpoint = checkpoints(dataDir)
val existing = checkpoint.read().filterKeys(logs.keys) ++ update
checkpoint.write(existing)
}
}
def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) {
inLock(lock) {
inProgress(topicAndPartition) match {
case LogCleaningInProgress =>
updateCheckpoints(dataDir,Option(topicAndPartition, endOffset))
inProgress.remove(topicAndPartition)
case LogCleaningAborted =>
inProgress.put(topicAndPartition, LogCleaningPaused)
pausedCleaningCond.signalAll()
case s =>
throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s))
}
}
}
}