package kafka.network
import java.nio._
import java.nio.channels._
import kafka.utils._
@nonthreadsafe
private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging {
private val sizeBuffer = ByteBuffer.allocate(4)
private var contentBuffer: ByteBuffer = null
def this() = this(Int.MaxValue)
var complete: Boolean = false
def buffer: ByteBuffer = {
expectComplete()
contentBuffer
}
def readFrom(channel: ReadableByteChannel): Int = {
expectIncomplete()
var read = 0
if(sizeBuffer.remaining > 0)
read += Utils.read(channel, sizeBuffer)
if(contentBuffer == null && !sizeBuffer.hasRemaining) {
sizeBuffer.rewind()
val size = sizeBuffer.getInt()
if(size <= 0)
throw new InvalidRequestException("%d is not a valid request size.".format(size))
if(size > maxSize)
throw new InvalidRequestException("Request of length %d is not valid, it is larger than the maximum size of %d bytes.".format(size, maxSize))
contentBuffer = byteBufferAllocate(size)
}
if(contentBuffer != null) {
read = Utils.read(channel, contentBuffer)
if(!contentBuffer.hasRemaining) {
contentBuffer.rewind()
complete = true
}
}
read
}
private def byteBufferAllocate(size: Int): ByteBuffer = {
var buffer: ByteBuffer = null
try {
buffer = ByteBuffer.allocate(size)
} catch {
case e: OutOfMemoryError =>
error("OOME with size " + size, e)
throw e
case e2: Throwable =>
throw e2
}
buffer
}
}