package kafka.log
import kafka.message._
import kafka.common._
import kafka.utils._
import kafka.server.{LogOffsetMetadata, FetchDataInfo}
import scala.math._
import java.io.File
@nonthreadsafe
class LogSegment(val log: FileMessageSet,
val index: OffsetIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
time: Time) extends Logging {
var created = time.milliseconds
private var bytesSinceLastIndexEntry = 0
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time) =
this(new FileMessageSet(file = Log.logFilename(dir, startOffset)),
new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
startOffset,
indexIntervalBytes,
rollJitterMs,
time)
def size: Long = log.sizeInBytes()
@nonthreadsafe
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
@threadsafe
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
val mapping = index.lookup(offset)
log.searchFor(offset, max(mapping.position, startingFilePosition))
}
@threadsafe
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
if(maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
val logSize = log.sizeInBytes
val startPosition = translateOffset(startOffset)
if(startPosition == null)
return null
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
if(maxSize == 0)
return FetchDataInfo(offsetMetadata, MessageSet.Empty)
val length =
maxOffset match {
case None =>
maxSize
case Some(offset) => {
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
logSize
else
mapping.position
min(endPosition - startPosition.position, maxSize)
}
}
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}
@nonthreadsafe
def recover(maxMessageSize: Int): Int = {
index.truncate()
index.resize(index.maxIndexSize)
var validBytes = 0
var lastIndexEntry = 0
val iter = log.iterator(maxMessageSize)
try {
while(iter.hasNext) {
val entry = iter.next
entry.message.ensureValid()
if(validBytes - lastIndexEntry > indexIntervalBytes) {
val startOffset =
entry.message.compressionCodec match {
case NoCompressionCodec =>
entry.offset
case _ =>
ByteBufferMessageSet.decompress(entry.message).head.offset
}
index.append(startOffset, validBytes)
lastIndexEntry = validBytes
}
validBytes += MessageSet.entrySize(entry.message)
}
} catch {
case e: InvalidMessageException =>
logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
val truncated = log.sizeInBytes - validBytes
log.truncateTo(validBytes)
index.trimToValidSize()
truncated
}
override def toString() = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
@nonthreadsafe
def truncateTo(offset: Long): Int = {
val mapping = translateOffset(offset)
if(mapping == null)
return 0
index.truncateTo(offset)
index.resize(index.maxIndexSize)
val bytesTruncated = log.truncateTo(mapping.position)
if(log.sizeInBytes == 0)
created = time.milliseconds
bytesSinceLastIndexEntry = 0
bytesTruncated
}
@threadsafe
def nextOffset(): Long = {
val ms = read(index.lastOffset, None, log.sizeInBytes)
if(ms == null) {
baseOffset
} else {
ms.messageSet.lastOption match {
case None => baseOffset
case Some(last) => last.nextOffset
}
}
}
@threadsafe
def flush() {
LogFlushStats.logFlushTimer.time {
log.flush()
index.flush()
}
}
def changeFileSuffixes(oldSuffix: String, newSuffix: String) {
val logRenamed = log.renameTo(new File(Utils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
if(!logRenamed)
throw new KafkaStorageException("Failed to change the log file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
val indexRenamed = index.renameTo(new File(Utils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix)))
if(!indexRenamed)
throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset))
}
def close() {
Utils.swallow(index.close)
Utils.swallow(log.close)
}
def delete() {
val deletedLog = log.delete()
val deletedIndex = index.delete()
if(!deletedLog && log.file.exists)
throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
if(!deletedIndex && index.file.exists)
throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
}
def lastModified = log.file.lastModified
def lastModified_=(ms: Long) = {
log.file.setLastModified(ms)
index.file.setLastModified(ms)
}
}