package kafka.log
import java.io._
import java.util.concurrent.TimeUnit
import kafka.utils._
import scala.collection._
import kafka.common.{TopicAndPartition, KafkaException}
import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint}
import java.util.concurrent.{Executors, ExecutorService, ExecutionException, Future}
@threadsafe
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs()
val cleaner: LogCleaner =
if(cleanerConfig.enableCleaner)
new LogCleaner(cleanerConfig, logDirs, logs, time = time)
else
null
private def createAndValidateLogDirs(dirs: Seq[File]) {
if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
for(dir <- dirs) {
if(!dir.exists) {
info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
val created = dir.mkdirs()
if(!created)
throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath)
}
if(!dir.isDirectory || !dir.canRead)
throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
}
}
private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
dirs.map { dir =>
val lock = new FileLock(new File(dir, LockFile))
if(!lock.tryLock())
throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath +
". A Kafka instance in another process or thread is using this directory.")
lock
}
}
private def loadLogs(): Unit = {
info("Loading logs.")
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
for (dir <- this.logDirs) {
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
if (cleanShutdownFile.exists) {
debug(
"Found clean shutdown file. " +
"Skipping recovery for all logs in data directory: " +
dir.getAbsolutePath)
} else {
brokerState.newState(RecoveringFromUncleanShutdown)
}
val recoveryPoints = this.recoveryPointCheckpoints(dir).read
val jobsForDir = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory
} yield {
Utils.runnable {
debug("Loading log '" + logDir.getName + "'")
val topicPartition = Log.parseTopicPartitionName(logDir.getName)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
val previous = this.logs.put(topicPartition, current)
if (previous != null) {
throw new IllegalArgumentException(
"Duplicate log directories found: %s, %s!".format(
current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
}
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
}
try {
for ((cleanShutdownFile, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
cleanShutdownFile.delete()
}
} catch {
case e: ExecutionException => {
error("There was an error in one of the threads during logs loading: " + e.getCause)
throw e.getCause
}
} finally {
threadPools.foreach(_.shutdown())
}
info("Logs loading complete.")
}
def startup() {
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
def shutdown() {
info("Shutting down.")
val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
if (cleaner != null) {
Utils.swallow(cleaner.shutdown())
}
for (dir <- this.logDirs) {
debug("Flushing and closing logs at " + dir)
val pool = Executors.newFixedThreadPool(ioThreads)
threadPools.append(pool)
val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values
val jobsForDir = logsInDir map { log =>
Utils.runnable {
log.flush()
log.close()
}
}
jobs(dir) = jobsForDir.map(pool.submit).toSeq
}
try {
for ((dir, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
debug("Updating recovery points at " + dir)
checkpointLogsInDir(dir)
debug("Writing clean shutdown marker at " + dir)
Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())
}
} catch {
case e: ExecutionException => {
error("There was an error in one of the threads during LogManager shutdown: " + e.getCause)
throw e.getCause
}
} finally {
threadPools.foreach(_.shutdown())
dirLocks.foreach(_.destroy())
}
info("Shutdown complete.")
}
def truncateTo(partitionAndOffsets: Map[TopicAndPartition, Long]) {
for ((topicAndPartition, truncateOffset) <- partitionAndOffsets) {
val log = logs.get(topicAndPartition)
if (log != null) {
val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset)
if (needToStopCleaner && cleaner != null)
cleaner.abortAndPauseCleaning(topicAndPartition)
log.truncateTo(truncateOffset)
if (needToStopCleaner && cleaner != null)
cleaner.resumeCleaning(topicAndPartition)
}
}
checkpointRecoveryPointOffsets()
}
def truncateFullyAndStartAt(topicAndPartition: TopicAndPartition, newOffset: Long) {
val log = logs.get(topicAndPartition)
if (log != null) {
if (cleaner != null)
cleaner.abortAndPauseCleaning(topicAndPartition)
log.truncateFullyAndStartAt(newOffset)
if (cleaner != null)
cleaner.resumeCleaning(topicAndPartition)
}
checkpointRecoveryPointOffsets()
}
def checkpointRecoveryPointOffsets() {
this.logDirs.foreach(checkpointLogsInDir)
}
private def checkpointLogsInDir(dir: File): Unit = {
val recoveryPoints = this.logsByDir.get(dir.toString)
if (recoveryPoints.isDefined) {
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}
}
def getLog(topicAndPartition: TopicAndPartition): Option[Log] = {
val log = logs.get(topicAndPartition)
if (log == null)
None
else
Some(log)
}
def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
logCreationOrDeletionLock synchronized {
var log = logs.get(topicAndPartition)
if(log != null)
return log
val dataDir = nextLogDir()
val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
dir.mkdirs()
log = new Log(dir,
config,
recoveryPoint = 0L,
scheduler,
time)
logs.put(topicAndPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicAndPartition.topic,
topicAndPartition.partition,
dataDir.getAbsolutePath,
{import JavaConversions._; config.toProps.mkString(", ")}))
log
}
}
def deleteLog(topicAndPartition: TopicAndPartition) {
var removedLog: Log = null
logCreationOrDeletionLock synchronized {
removedLog = logs.remove(topicAndPartition)
}
if (removedLog != null) {
if (cleaner != null) {
cleaner.abortCleaning(topicAndPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
removedLog.delete()
info("Deleted log for partition [%s,%d] in %s."
.format(topicAndPartition.topic,
topicAndPartition.partition,
removedLog.dir.getAbsolutePath))
}
}
private def nextLogDir(): File = {
if(logDirs.size == 1) {
logDirs(0)
} else {
val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
var dirCounts = (zeros ++ logCounts).toBuffer
val leastLoaded = dirCounts.sortBy(_._2).head
new File(leastLoaded._1)
}
}
private def cleanupExpiredSegments(log: Log): Int = {
val startMs = time.milliseconds
log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
}
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
return 0
var diff = log.size - log.config.retentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size
true
} else {
false
}
}
log.deleteOldSegments(shouldDelete)
}
def cleanupLogs() {
debug("Beginning log cleanup...")
var total = 0
val startMs = time.milliseconds
for(log <- allLogs; if !log.config.compact) {
debug("Garbage collecting '" + log.name + "'")
total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
}
debug("Log cleanup completed. " + total + " files deleted in " +
(time.milliseconds - startMs) / 1000 + " seconds")
}
def allLogs(): Iterable[Log] = logs.values
def logsByTopicPartition = logs.toMap
private def logsByDir = {
this.logsByTopicPartition.groupBy {
case (_, log) => log.dir.getParent
}
}
private def flushDirtyLogs() = {
debug("Checking for dirty logs to flush...")
for ((topicAndPartition, log) <- logs) {
try {
val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + log.config.flushMs +
" last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush)
if(timeSinceLastFlush >= log.config.flushMs)
log.flush
} catch {
case e: Throwable =>
error("Error flushing topic " + topicAndPartition.topic, e)
}
}
}
}