【kafka】使用kafka client连接 kerberos认证的 kafka,scala版
注意keytab路径中不要使用\\,都使用/作为分隔符
使用kerberos需要配置jaas如下日志打印,两个配置至少设置一个:
[DEBUG] org.apache.kafka.common.security.JaasContext:106 --- System property 'java.security.auth.login.config' and Kafka SASL property 'sasl.jaas.config' are not set, using default JAAS configuration.
import org.slf4j.Logger
trait Logging {
val LOG: Logger = org.slf4j.LoggerFactory.getLogger(this.getClass)
}
import org.apache.kafka.clients.producer.ProducerRecord
import org.scalatest.FunSuite
import java.util.Properties
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import java.time.Duration
import scala.collection.JavaConversions._
class KafkaClientTest extends FunSuite with Logging {
val JAAS_CONFIG_KEYTAB_TEMPLATE: String =
s"""
|com.sun.security.auth.module.Krb5LoginModule required
|debug=true
|doNotPrompt=true
|storeKey=true
|useKeyTab=true
|keyTab="%s"
|principal="%s";
|""".stripMargin
val topic = "tmp_test"
val bootstrapServers = "01.xxx.com:6667,01.xxx..com:6667,01.xxx..com:6667"
val principal = "xxx@XXXXX.COM"
val keytab = "D:/xxx/xxx.keytab"
val krb5conf = "D:/xxx/krb5.conf"
def getProducerProps: Properties = {
val properties = new Properties()
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI")
properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
properties
}
def getConsumerProps: Properties = {
val properties = new Properties()
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI")
properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "tester")
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties
}
def setEnv(props: Properties) = {
System.setProperty("java.security.krb5.conf", krb5conf)
// 以下二者选其中之一就可以了。
// 方式一:
System.setProperty("java.security.auth.login.config", "D:/configs/kafka_client_jaas.conf")
// 方式二:
val jaasStr = JAAS_CONFIG_KEYTAB_TEMPLATE.format(keytab, principal).trim
LOG.warn(s"format str: \n${jaasStr}")
props.setProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasStr)
}
test("produce") {
val props = getConsumerProps
setEnv(props)
try {
val producer = new KafkaProducer[String, String](props)
var counter = 0;
while (true) {
val record = new ProducerRecord[String, String](topic, s"hello ${counter}")
val fu = producer.send(record)
Thread.sleep(3000L)
if (counter % 10 == 0) {
producer.flush()
}
counter += 1
}
} catch {
case e: Exception =>
throw new RuntimeException(e)
}
}
test("consumer") {
val props = getConsumerProps
setEnv(props)
val consumer = new KafkaConsumer(props)
consumer.subscribe(List(topic))
while (true) {
val record = consumer.poll(Duration.ofSeconds(3))
val it = record.iterator()
while (it.hasNext) {
LOG.info(s"${it.next().value()}")
}
}
}
}
kafka_client_jaas.conf 文件内容:
文件模板参考:KAFKA_HOME/conf/kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab = true
useTicketCache=false
storeKey = true
keyTab="D:/***/xxx.keytab"
principal="xxx@XXXXX.COM"
serviceName="kafka";
};