package kafka.utils
import java.lang.management.ManagementFactory
import javax.management.ObjectName
object Mx4jLoader extends Logging {
def maybeLoad(): Boolean = {
val props = new VerifiableProperties(System.getProperties())
if (props.getBoolean("kafka_mx4jenable", false))
false
val address = props.getString("mx4jaddress", "0.0.0.0")
val port = props.getInt("mx4jport", 8082)
try {
debug("Will try to load MX4j now, if it's in the classpath");
val mbs = ManagementFactory.getPlatformMBeanServer()
val processorName = new ObjectName("Server:name=XSLTProcessor")
val httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor")
val httpAdaptor = httpAdaptorClass.newInstance()
httpAdaptorClass.getMethod("setHost", classOf[String]).invoke(httpAdaptor, address.asInstanceOf[AnyRef])
httpAdaptorClass.getMethod("setPort", Integer.TYPE).invoke(httpAdaptor, port.asInstanceOf[AnyRef])
val httpName = new ObjectName("system:name=http")
mbs.registerMBean(httpAdaptor, httpName)
val xsltProcessorClass = Class.forName("mx4j.tools.adaptor.http.XSLTProcessor")
val xsltProcessor = xsltProcessorClass.newInstance()
httpAdaptorClass.getMethod("setProcessor", Class.forName("mx4j.tools.adaptor.http.ProcessorMBean")).invoke(httpAdaptor, xsltProcessor.asInstanceOf[AnyRef])
mbs.registerMBean(xsltProcessor, processorName)
httpAdaptorClass.getMethod("start").invoke(httpAdaptor)
info("mx4j successfuly loaded")
true
}
catch {
case e: ClassNotFoundException => {
info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
}
case e: Throwable => {
warn("Could not start register mbean in JMX", e);
}
}
false
}
}