package kafka.log
import java.io._
import java.nio._
import java.nio.channels._
import java.util.concurrent.atomic._
import kafka.utils._
import kafka.message._
import kafka.common.KafkaException
import java.util.concurrent.TimeUnit
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
@nonthreadsafe
class FileMessageSet private[kafka](@volatile var file: File,
private[log] val channel: FileChannel,
private[log] val start: Int,
private[log] val end: Int,
isSlice: Boolean) extends MessageSet with Logging {
private val _size =
if(isSlice)
new AtomicInteger(end - start)
else
new AtomicInteger(math.min(channel.size().toInt, end) - start)
if (!isSlice)
channel.position(channel.size)
def this(file: File, channel: FileChannel) =
this(file, channel, start = 0, end = Int.MaxValue, isSlice = false)
def this(file: File) =
this(file, Utils.openChannel(file, mutable = true))
def this(file: File, mutable: Boolean) = this(file, Utils.openChannel(file, mutable))
def this(file: File, channel: FileChannel, start: Int, end: Int) =
this(file, channel, start, end, isSlice = true)
def read(position: Int, size: Int): FileMessageSet = {
if(position < 0)
throw new IllegalArgumentException("Invalid position: " + position)
if(size < 0)
throw new IllegalArgumentException("Invalid size: " + size)
new FileMessageSet(file,
channel,
start = this.start + position,
end = math.min(this.start + position + size, sizeInBytes()))
}
def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
val size = sizeInBytes()
while(position + MessageSet.LogOverhead < size) {
buffer.rewind()
channel.read(buffer, position)
if(buffer.hasRemaining)
throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
.format(targetOffset, startingPosition, file.getAbsolutePath))
buffer.rewind()
val offset = buffer.getLong()
if(offset >= targetOffset)
return OffsetPosition(offset, position)
val messageSize = buffer.getInt()
if(messageSize < Message.MessageOverhead)
throw new IllegalStateException("Invalid message size: " + messageSize)
position += MessageSet.LogOverhead + messageSize
}
null
}
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
val newSize = math.min(channel.size().toInt, end) - start
if (newSize < _size.get()) {
throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
.format(file.getAbsolutePath, _size.get(), newSize))
}
val bytesTransferred = channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt
trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
+ " bytes requested for transfer : " + math.min(size, sizeInBytes))
bytesTransferred
}
override def iterator() = iterator(Int.MaxValue)
def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] {
var location = start
val sizeOffsetBuffer = ByteBuffer.allocate(12)
override def makeNext(): MessageAndOffset = {
if(location >= end)
return allDone()
sizeOffsetBuffer.rewind()
channel.read(sizeOffsetBuffer, location)
if(sizeOffsetBuffer.hasRemaining)
return allDone()
sizeOffsetBuffer.rewind()
val offset = sizeOffsetBuffer.getLong()
val size = sizeOffsetBuffer.getInt()
if(size < Message.MinHeaderSize)
return allDone()
if(size > maxMessageSize)
throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
val buffer = ByteBuffer.allocate(size)
channel.read(buffer, location + 12)
if(buffer.hasRemaining)
return allDone()
buffer.rewind()
location += size + 12
new MessageAndOffset(new Message(buffer), offset)
}
}
}
def sizeInBytes(): Int = _size.get()
def append(messages: ByteBufferMessageSet) {
val written = messages.writeTo(channel, 0, messages.sizeInBytes)
_size.getAndAdd(written)
}
def flush() = {
channel.force(true)
}
def close() {
flush()
channel.close()
}
def delete(): Boolean = {
Utils.swallow(channel.close())
file.delete()
}
def truncateTo(targetSize: Int): Int = {
val originalSize = sizeInBytes
if(targetSize > originalSize || targetSize < 0)
throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
" size of this log segment is " + originalSize + " bytes.")
channel.truncate(targetSize)
channel.position(targetSize)
_size.set(targetSize)
originalSize - targetSize
}
def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = {
channel.read(buffer, relativePosition + this.start)
buffer.flip()
buffer
}
def renameTo(f: File): Boolean = {
val success = this.file.renameTo(f)
this.file = f
success
}
}
object LogFlushStats extends KafkaMetricsGroup {
val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
}