package kafka.utils;
import kafka.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
import java.util.Random
import scala.math._
@threadsafe
class Throttler(val desiredRatePerSec: Double,
val checkIntervalMs: Long = 100L,
val throttleDown: Boolean = true,
metricName: String = "throttler",
units: String = "entries",
val time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
private val lock = new Object
private val meter = newMeter(metricName, units, TimeUnit.SECONDS)
private var periodStartNs: Long = time.nanoseconds
private var observedSoFar: Double = 0.0
def maybeThrottle(observed: Double) {
meter.mark(observed.toLong)
lock synchronized {
observedSoFar += observed
val now = time.nanoseconds
val elapsedNs = now - periodStartNs
if(elapsedNs > checkIntervalMs * Time.NsPerMs && observedSoFar > 0) {
val rateInSecs = (observedSoFar * Time.NsPerSec) / elapsedNs
val needAdjustment = !(throttleDown ^ (rateInSecs > desiredRatePerSec))
if(needAdjustment) {
val desiredRateMs = desiredRatePerSec / Time.MsPerSec.toDouble
val elapsedMs = elapsedNs / Time.NsPerMs
val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
if(sleepTime > 0) {
trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, desiredRatePerSec, sleepTime))
time.sleep(sleepTime)
}
}
periodStartNs = now
observedSoFar = 0
}
}
}
}
object Throttler {
def main(args: Array[String]) {
val rand = new Random()
val throttler = new Throttler(100000, 100, true, time = SystemTime)
val interval = 30000
var start = System.currentTimeMillis
var total = 0
while(true) {
val value = rand.nextInt(1000)
Thread.sleep(1)
throttler.maybeThrottle(value)
total += value
val now = System.currentTimeMillis
if(now - start >= interval) {
println(total / (interval/1000.0))
start = now
total = 0
}
}
}
}