package kafka.log
import kafka.utils._
import kafka.message._
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats}
import java.io.{IOException, File}
import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
import java.util.concurrent.atomic._
import java.text.NumberFormat
import scala.collection.JavaConversions
import com.yammer.metrics.core.Gauge
@threadsafe
class Log(val dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
private val lock = new Object
private val lastflushedTime = new AtomicLong(time.milliseconds)
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
loadSegments()
@volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
newGauge("NumLogSegments",
new Gauge[Int] {
def value = numberOfSegments
},
tags)
newGauge("LogStartOffset",
new Gauge[Long] {
def value = logStartOffset
},
tags)
newGauge("LogEndOffset",
new Gauge[Long] {
def value = logEndOffset
},
tags)
newGauge("Size",
new Gauge[Long] {
def value = size
},
tags)
def name = dir.getName()
private def loadSegments() {
dir.mkdirs()
for(file <- dir.listFiles if file.isFile) {
if(!file.canRead)
throw new IOException("Could not read file " + file)
val filename = file.getName
if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
file.delete()
} else if(filename.endsWith(SwapFileSuffix)) {
val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
if(baseName.getPath.endsWith(IndexFileSuffix)) {
file.delete()
} else if(baseName.getPath.endsWith(LogFileSuffix)){
val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
index.delete()
val renamed = file.renameTo(baseName)
if(renamed)
info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath))
else
throw new KafkaException("Failed to rename file %s.".format(file.getPath))
}
}
}
for(file <- dir.listFiles if file.isFile) {
val filename = file.getName
if(filename.endsWith(IndexFileSuffix)) {
val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
if(!logFile.exists) {
warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
file.delete()
}
} else if(filename.endsWith(LogFileSuffix)) {
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
val hasIndex = Log.indexFilename(dir, start).exists
val segment = new LogSegment(dir = dir,
startOffset = start,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time)
if(!hasIndex) {
error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
segment.recover(config.maxMessageSize)
}
segments.put(start, segment)
}
}
if(logSegments.size == 0) {
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time))
} else {
recoverLog()
activeSegment.index.resize(config.maxIndexSize)
}
for (s <- logSegments)
s.index.sanityCheck()
}
private def updateLogEndOffset(messageOffset: Long) {
nextOffsetMetadata = new LogOffsetMetadata(messageOffset, activeSegment.baseOffset, activeSegment.size.toInt)
}
private def recoverLog() {
if(hasCleanShutdownFile) {
this.recoveryPoint = activeSegment.nextOffset
return
}
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
while(unflushed.hasNext) {
val curr = unflushed.next
info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
val truncatedBytes =
try {
curr.recover(config.maxMessageSize)
} catch {
case e: InvalidOffsetException =>
val startOffset = curr.baseOffset
warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
"creating an empty one with starting offset " + startOffset)
curr.truncateTo(startOffset)
}
if(truncatedBytes > 0) {
warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
unflushed.foreach(deleteSegment)
}
}
}
private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists()
def numberOfSegments: Int = segments.size
def close() {
debug("Closing log " + name)
lock synchronized {
for(seg <- logSegments)
seg.close()
}
}
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
val appendInfo = analyzeAndValidateMessageSet(messages)
if(appendInfo.shallowCount == 0)
return appendInfo
var validMessages = trimInvalidBytes(messages, appendInfo)
try {
lock synchronized {
appendInfo.firstOffset = nextOffsetMetadata.messageOffset
if(assignOffsets) {
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
try {
validMessages = validMessages.assignOffsets(offset, appendInfo.codec)
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
appendInfo.lastOffset = offset.get - 1
} else {
if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " + messages)
}
for(messageAndOffset <- validMessages.shallowIterator) {
if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))
}
}
if(validMessages.sizeInBytes > config.segmentSize) {
throw new MessageSetSizeTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
.format(validMessages.sizeInBytes, config.segmentSize))
}
val segment = maybeRoll(validMessages.sizeInBytes)
segment.append(appendInfo.firstOffset, validMessages)
updateLogEndOffset(appendInfo.lastOffset + 1)
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
.format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages))
if(unflushedMessages >= config.flushInterval)
flush()
appendInfo
}
} catch {
case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
}
}
case class LogAppendInfo(var firstOffset: Long, var lastOffset: Long, codec: CompressionCodec, shallowCount: Int, validBytes: Int, offsetsMonotonic: Boolean)
private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
var shallowMessageCount = 0
var validBytesCount = 0
var firstOffset, lastOffset = -1L
var codec: CompressionCodec = NoCompressionCodec
var monotonic = true
for(messageAndOffset <- messages.shallowIterator) {
if(firstOffset < 0)
firstOffset = messageAndOffset.offset
if(lastOffset >= messageAndOffset.offset)
monotonic = false
lastOffset = messageAndOffset.offset
val m = messageAndOffset.message
val messageSize = MessageSet.entrySize(m)
if(messageSize > config.maxMessageSize) {
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)
throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
.format(messageSize, config.maxMessageSize))
}
m.ensureValid()
shallowMessageCount += 1
validBytesCount += messageSize
val messageCodec = m.compressionCodec
if(messageCodec != NoCompressionCodec)
codec = messageCodec
}
LogAppendInfo(firstOffset, lastOffset, codec, shallowMessageCount, validBytesCount, monotonic)
}
private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = {
val messageSetValidBytes = info.validBytes
if(messageSetValidBytes < 0)
throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
if(messageSetValidBytes == messages.sizeInBytes) {
messages
} else {
val validByteBuffer = messages.buffer.duplicate()
validByteBuffer.limit(messageSetValidBytes)
new ByteBufferMessageSet(validByteBuffer)
}
}
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {
trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
val next = nextOffsetMetadata.messageOffset
if(startOffset == next)
return FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
var entry = segments.floorEntry(startOffset)
if(startOffset > next || entry == null)
throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
while(entry != null) {
val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)
if(fetchInfo == null) {
entry = segments.higherEntry(entry.getKey)
} else {
return fetchInfo
}
}
FetchDataInfo(nextOffsetMetadata, MessageSet.Empty)
}
def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
try {
val fetchDataInfo = read(offset, 1)
fetchDataInfo.fetchOffset
} catch {
case e: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata
}
}
def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
val lastSegment = activeSegment
val deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0))
val numToDelete = deletable.size
if(numToDelete > 0) {
lock synchronized {
if(segments.size == numToDelete)
roll()
deletable.foreach(deleteSegment(_))
}
}
numToDelete
}
def size: Long = logSegments.map(_.size).sum
def logStartOffset: Long = logSegments.head.baseOffset
def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
def logEndOffset: Long = nextOffsetMetadata.messageOffset
private def maybeRoll(messagesSize: Int): LogSegment = {
val segment = activeSegment
if (segment.size > config.segmentSize - messagesSize ||
segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs ||
segment.index.isFull) {
debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
.format(name,
segment.size,
config.segmentSize,
segment.index.entries,
segment.index.maxEntries,
time.milliseconds - segment.created,
config.segmentMs - segment.rollJitterMs))
roll()
} else {
segment
}
}
def roll(): LogSegment = {
val start = time.nanoseconds
lock synchronized {
val newOffset = logEndOffset
val logFile = logFilename(dir, newOffset)
val indexFile = indexFilename(dir, newOffset)
for(file <- List(logFile, indexFile); if file.exists) {
warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
file.delete()
}
segments.lastEntry() match {
case null =>
case entry => entry.getValue.index.trimToValidSize()
}
val segment = new LogSegment(dir,
startOffset = newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time)
val prev = addSegment(segment)
if(prev != null)
throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0)))
segment
}
}
def unflushedMessages() = this.logEndOffset - this.recoveryPoint
def flush(): Unit = flush(this.logEndOffset)
def flush(offset: Long) : Unit = {
if (offset <= this.recoveryPoint)
return
debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
time.milliseconds + " unflushed = " + unflushedMessages)
for(segment <- logSegments(this.recoveryPoint, offset))
segment.flush()
lock synchronized {
if(offset > this.recoveryPoint) {
this.recoveryPoint = offset
lastflushedTime.set(time.milliseconds)
}
}
}
private[log] def delete() {
lock synchronized {
logSegments.foreach(_.delete())
segments.clear()
Utils.rm(dir)
}
}
private[log] def truncateTo(targetOffset: Long) {
info("Truncating log %s to offset %d.".format(name, targetOffset))
if(targetOffset < 0)
throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
if(targetOffset > logEndOffset) {
info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1))
return
}
lock synchronized {
if(segments.firstEntry.getValue.baseOffset > targetOffset) {
truncateFullyAndStartAt(targetOffset)
} else {
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
deletable.foreach(deleteSegment(_))
activeSegment.truncateTo(targetOffset)
updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
}
}
}
private[log] def truncateFullyAndStartAt(newOffset: Long) {
debug("Truncate and start log '" + name + "' to " + newOffset)
lock synchronized {
val segmentsToDelete = logSegments.toList
segmentsToDelete.foreach(deleteSegment(_))
addSegment(new LogSegment(dir,
newOffset,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time))
updateLogEndOffset(newOffset)
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
}
}
def lastFlushTime(): Long = lastflushedTime.get
def activeSegment = segments.lastEntry.getValue
def logSegments: Iterable[LogSegment] = {
import JavaConversions._
segments.values
}
def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
import JavaConversions._
lock synchronized {
val floor = segments.floorKey(from)
if(floor eq null)
segments.headMap(to).values
else
segments.subMap(floor, true, to, false).values
}
}
override def toString() = "Log(" + dir + ")"
private def deleteSegment(segment: LogSegment) {
info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))
lock synchronized {
segments.remove(segment.baseOffset)
asyncDeleteSegment(segment)
}
}
private def asyncDeleteSegment(segment: LogSegment) {
segment.changeFileSuffixes("", Log.DeletedFileSuffix)
def deleteSeg() {
info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
segment.delete()
}
scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
}
private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment]) {
lock synchronized {
newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)
addSegment(newSegment)
for(seg <- oldSegments) {
if(seg.baseOffset != newSegment.baseOffset)
segments.remove(seg.baseOffset)
asyncDeleteSegment(seg)
}
newSegment.changeFileSuffixes(Log.SwapFileSuffix, "")
}
}
def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment)
}
object Log {
val LogFileSuffix = ".log"
val IndexFileSuffix = ".index"
val DeletedFileSuffix = ".deleted"
val CleanedFileSuffix = ".cleaned"
val SwapFileSuffix = ".swap"
val CleanShutdownFile = ".kafka_cleanshutdown"
def filenamePrefixFromOffset(offset: Long): String = {
val nf = NumberFormat.getInstance()
nf.setMinimumIntegerDigits(20)
nf.setMaximumFractionDigits(0)
nf.setGroupingUsed(false)
nf.format(offset)
}
def logFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
def indexFilename(dir: File, offset: Long) =
new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
def parseTopicPartitionName(name: String): TopicAndPartition = {
val index = name.lastIndexOf('-')
TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
}
}