package kafka.consumer
import kafka.utils.Logging
import java.util.regex.{PatternSyntaxException, Pattern}
import kafka.common.Topic
sealed abstract class TopicFilter(rawRegex: String) extends Logging {
val regex = rawRegex
.trim
.replace(',', '|')
.replace(" ", "")
.replaceAll("""^["']+""","")
.replaceAll("""["']+$""","")
try {
Pattern.compile(regex)
}
catch {
case e: PatternSyntaxException =>
throw new RuntimeException(regex + " is an invalid regex.")
}
override def toString = regex
def isTopicAllowed(topic: String, excludeInternalTopics: Boolean): Boolean
}
case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
val allowed = topic.matches(regex) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics)
debug("%s %s".format(
topic, if (allowed) "allowed" else "filtered"))
allowed
}
}
case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
val allowed = (!topic.matches(regex)) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics)
debug("%s %s".format(
topic, if (allowed) "allowed" else "filtered"))
allowed
}
}