package kafka.network
import java.net.InetSocketAddress
import java.nio.channels._
import kafka.utils.{nonthreadsafe, Logging}
import kafka.api.RequestOrResponse
object BlockingChannel{
val UseDefaultBufferSize = -1
}
@nonthreadsafe
class BlockingChannel( val host: String,
val port: Int,
val readBufferSize: Int,
val writeBufferSize: Int,
val readTimeoutMs: Int ) extends Logging {
private var connected = false
private var channel: SocketChannel = null
private var readChannel: ReadableByteChannel = null
private var writeChannel: GatheringByteChannel = null
private val lock = new Object()
private val connectTimeoutMs = readTimeoutMs
def connect() = lock synchronized {
if(!connected) {
try {
channel = SocketChannel.open()
if(readBufferSize > 0)
channel.socket.setReceiveBufferSize(readBufferSize)
if(writeBufferSize > 0)
channel.socket.setSendBufferSize(writeBufferSize)
channel.configureBlocking(true)
channel.socket.setSoTimeout(readTimeoutMs)
channel.socket.setKeepAlive(true)
channel.socket.setTcpNoDelay(true)
channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)
writeChannel = channel
readChannel = Channels.newChannel(channel.socket().getInputStream)
connected = true
val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d."
debug(msg.format(channel.socket.getSoTimeout,
readTimeoutMs,
channel.socket.getReceiveBufferSize,
readBufferSize,
channel.socket.getSendBufferSize,
writeBufferSize,
connectTimeoutMs))
} catch {
case e: Throwable => disconnect()
}
}
}
def disconnect() = lock synchronized {
if(channel != null) {
swallow(channel.close())
swallow(channel.socket.close())
channel = null
writeChannel = null
}
if(readChannel != null) {
swallow(readChannel.close())
readChannel = null
}
connected = false
}
def isConnected = connected
def send(request: RequestOrResponse):Int = {
if(!connected)
throw new ClosedChannelException()
val send = new BoundedByteBufferSend(request)
send.writeCompletely(writeChannel)
}
def receive(): Receive = {
if(!connected)
throw new ClosedChannelException()
val response = new BoundedByteBufferReceive()
response.readCompletely(readChannel)
response
}
}