package kafka.log
import kafka.common._
import kafka.message._
import kafka.utils._
import kafka.metrics.KafkaMetricsGroup
import scala.collection._
import scala.math
import java.nio._
import java.util.Date
import java.io.File
import java.lang.IllegalStateException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import com.yammer.metrics.core.Gauge
class LogCleaner(val config: CleanerConfig,
val logDirs: Array[File],
val logs: Pool[TopicAndPartition, Log],
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs);
private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
checkIntervalMs = 300,
throttleDown = true,
"cleaner-io",
"bytes",
time = time)
private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
newGauge("max-buffer-utilization-percent",
new Gauge[Int] {
def value: Int = cleaners.map(_.lastStats).map(100 * _.bufferUtilization).max.toInt
})
newGauge("cleaner-recopy-percent",
new Gauge[Int] {
def value: Int = {
val stats = cleaners.map(_.lastStats)
val recopyRate = stats.map(_.bytesWritten).sum.toDouble / math.max(stats.map(_.bytesRead).sum, 1)
(100 * recopyRate).toInt
}
})
newGauge("max-clean-time-secs",
new Gauge[Int] {
def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt
})
def startup() {
info("Starting the log cleaner")
cleaners.foreach(_.start())
}
def shutdown() {
info("Shutting down the log cleaner.")
cleaners.foreach(_.shutdown())
}
def abortCleaning(topicAndPartition: TopicAndPartition) {
cleanerManager.abortCleaning(topicAndPartition)
}
def updateCheckpoints(dataDir: File) {
cleanerManager.updateCheckpoints(dataDir, update=None);
}
def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) {
cleanerManager.abortAndPauseCleaning(topicAndPartition)
}
def resumeCleaning(topicAndPartition: TopicAndPartition) {
cleanerManager.resumeCleaning(topicAndPartition)
}
def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = {
while(!cleanerManager.allCleanerCheckpoints.contains(TopicAndPartition(topic, part)))
Thread.sleep(10)
}
private class CleanerThread(threadId: Int)
extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) {
override val loggerName = classOf[LogCleaner].getName
if(config.dedupeBufferSize / config.numThreads > Int.MaxValue)
warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...")
val cleaner = new Cleaner(id = threadId,
offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt,
hashAlgorithm = config.hashAlgorithm),
ioBufferSize = config.ioBufferSize / config.numThreads / 2,
maxIoBufferSize = config.maxMessageSize,
dupBufferLoadFactor = config.dedupeBufferLoadFactor,
throttler = throttler,
time = time,
checkDone = checkDone)
@volatile var lastStats: CleanerStats = new CleanerStats()
private val backOffWaitLatch = new CountDownLatch(1)
private def checkDone(topicAndPartition: TopicAndPartition) {
if (!isRunning.get())
throw new ThreadShutdownException
cleanerManager.checkCleaningAborted(topicAndPartition)
}
override def doWork() {
cleanOrSleep()
}
override def shutdown() = {
initiateShutdown()
backOffWaitLatch.countDown()
awaitShutdown()
}
private def cleanOrSleep() {
cleanerManager.grabFilthiestLog() match {
case None =>
backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
case Some(cleanable) =>
var endOffset = cleanable.firstDirtyOffset
try {
endOffset = cleaner.clean(cleanable)
recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats)
} catch {
case pe: LogCleaningAbortedException =>
} finally {
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
}
}
}
def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) {
this.lastStats = stats
cleaner.statsUnderlying.swap
def mb(bytes: Double) = bytes / (1024*1024)
val message =
"%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) +
"\t%,.1f MB of log processed in %,.1f seconds (%,.1f MB/sec).%n".format(mb(stats.bytesRead),
stats.elapsedSecs,
mb(stats.bytesRead/stats.elapsedSecs)) +
"\tIndexed %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.mapBytesRead),
stats.elapsedIndexSecs,
mb(stats.mapBytesRead)/stats.elapsedIndexSecs,
100 * stats.elapsedIndexSecs.toDouble/stats.elapsedSecs) +
"\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) +
"\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead),
stats.elapsedSecs - stats.elapsedIndexSecs,
mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) +
"\tStart size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesRead), stats.messagesRead) +
"\tEnd size: %,.1f MB (%,d messages)%n".format(mb(stats.bytesWritten), stats.messagesWritten) +
"\t%.1f%% size reduction (%.1f%% fewer messages)%n".format(100.0 * (1.0 - stats.bytesWritten.toDouble/stats.bytesRead),
100.0 * (1.0 - stats.messagesWritten.toDouble/stats.messagesRead))
info(message)
}
}
}
private[log] class Cleaner(val id: Int,
val offsetMap: OffsetMap,
ioBufferSize: Int,
maxIoBufferSize: Int,
dupBufferLoadFactor: Double,
throttler: Throttler,
time: Time,
checkDone: (TopicAndPartition) => Unit) extends Logging {
override val loggerName = classOf[LogCleaner].getName
this.logIdent = "Cleaner " + id + ": "
val statsUnderlying = (new CleanerStats(time), new CleanerStats(time))
def stats = statsUnderlying._1
private var readBuffer = ByteBuffer.allocate(ioBufferSize)
private var writeBuffer = ByteBuffer.allocate(ioBufferSize)
private[log] def clean(cleanable: LogToClean): Long = {
stats.clear()
info("Beginning cleaning of log %s.".format(cleanable.log.name))
val log = cleanable.log
info("Building offset map for %s...".format(cleanable.log.name))
val upperBoundOffset = log.activeSegment.baseOffset
val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1
stats.indexDone()
val deleteHorizonMs =
log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
}
info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs)))
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
cleanSegments(log, group, offsetMap, deleteHorizonMs)
stats.bufferUtilization = offsetMap.utilization
stats.allDone()
endOffset
}
private[log] def cleanSegments(log: Log,
segments: Seq[LogSegment],
map: OffsetMap,
deleteHorizonMs: Long) {
val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
logFile.delete()
val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
indexFile.delete()
val messages = new FileMessageSet(logFile)
val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
try {
for (old <- segments) {
val retainDeletes = old.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
.format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes)
}
index.trimToValidSize()
cleaned.flush()
val modified = segments.last.lastModified
cleaned.lastModified = modified
info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name))
log.replaceSegments(cleaned, segments)
} catch {
case e: LogCleaningAbortedException =>
cleaned.delete()
throw e
}
}
private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment,
dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) {
var position = 0
while (position < source.log.sizeInBytes) {
checkDone(topicAndPartition)
readBuffer.clear()
writeBuffer.clear()
val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position))
throttler.maybeThrottle(messages.sizeInBytes)
var messagesRead = 0
for (entry <- messages) {
messagesRead += 1
val size = MessageSet.entrySize(entry.message)
position += size
stats.readMessage(size)
val key = entry.message.key
require(key != null, "Found null key in log segment %s which is marked as dedupe.".format(source.log.file.getAbsolutePath))
val foundOffset = map.get(key)
val redundant = foundOffset >= 0 && entry.offset < foundOffset
val obsoleteDelete = !retainDeletes && entry.message.isNull
if (!redundant && !obsoleteDelete) {
ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
stats.recopyMessage(size)
}
}
if(writeBuffer.position > 0) {
writeBuffer.flip()
val retained = new ByteBufferMessageSet(writeBuffer)
dest.append(retained.head.offset, retained)
throttler.maybeThrottle(writeBuffer.limit)
}
if(readBuffer.limit > 0 && messagesRead == 0)
growBuffers()
}
restoreBuffers()
}
def growBuffers() {
if(readBuffer.capacity >= maxIoBufferSize || writeBuffer.capacity >= maxIoBufferSize)
throw new IllegalStateException("This log contains a message larger than maximum allowable size of %s.".format(maxIoBufferSize))
val newSize = math.min(this.readBuffer.capacity * 2, maxIoBufferSize)
info("Growing cleaner I/O buffers from " + readBuffer.capacity + "bytes to " + newSize + " bytes.")
this.readBuffer = ByteBuffer.allocate(newSize)
this.writeBuffer = ByteBuffer.allocate(newSize)
}
def restoreBuffers() {
if(this.readBuffer.capacity > this.ioBufferSize)
this.readBuffer = ByteBuffer.allocate(this.ioBufferSize)
if(this.writeBuffer.capacity > this.ioBufferSize)
this.writeBuffer = ByteBuffer.allocate(this.ioBufferSize)
}
private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]] = {
var grouped = List[List[LogSegment]]()
var segs = segments.toList
while(!segs.isEmpty) {
var group = List(segs.head)
var logSize = segs.head.size
var indexSize = segs.head.index.sizeInBytes
segs = segs.tail
while(!segs.isEmpty &&
logSize + segs.head.size < maxSize &&
indexSize + segs.head.index.sizeInBytes < maxIndexSize) {
group = segs.head :: group
logSize += segs.head.size
indexSize += segs.head.index.sizeInBytes
segs = segs.tail
}
grouped ::= group.reverse
}
grouped.reverse
}
private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: OffsetMap): Long = {
map.clear()
val dirty = log.logSegments(start, end).toSeq
info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
var offset = dirty.head.baseOffset
require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name))
val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong
for (segment <- dirty) {
checkDone(log.topicAndPartition)
if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor)
offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
}
info("Offset map for log %s complete.".format(log.name))
offset
}
private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = {
var position = 0
var offset = segment.baseOffset
while (position < segment.log.sizeInBytes) {
checkDone(topicAndPartition)
readBuffer.clear()
val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position))
throttler.maybeThrottle(messages.sizeInBytes)
val startPosition = position
for (entry <- messages) {
val message = entry.message
require(message.hasKey)
val size = MessageSet.entrySize(message)
position += size
map.put(message.key, entry.offset)
offset = entry.offset
stats.indexMessage(size)
}
if(position == startPosition)
growBuffers()
}
restoreBuffers()
offset
}
}
private case class CleanerStats(time: Time = SystemTime) {
var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead, messagesWritten = 0L
var bufferUtilization = 0.0d
clear()
def readMessage(size: Int) {
messagesRead += 1
bytesRead += size
}
def recopyMessage(size: Int) {
messagesWritten += 1
bytesWritten += size
}
def indexMessage(size: Int) {
mapMessagesRead += 1
mapBytesRead += size
}
def indexDone() {
mapCompleteTime = time.milliseconds
}
def allDone() {
endTime = time.milliseconds
}
def elapsedSecs = (endTime - startTime)/1000.0
def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0
def clear() {
startTime = time.milliseconds
mapCompleteTime = -1L
endTime = -1L
bytesRead = 0L
bytesWritten = 0L
mapBytesRead = 0L
mapMessagesRead = 0L
messagesRead = 0L
messagesWritten = 0L
bufferUtilization = 0.0d
}
}
private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] {
val cleanBytes = log.logSegments(-1, firstDirtyOffset-1).map(_.size).sum
val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
val cleanableRatio = dirtyBytes / totalBytes.toDouble
def totalBytes = cleanBytes + dirtyBytes
override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
}