当前位置: 首页 > article >正文

【kafka】使用kafka client连接 kerberos认证的 kafka,scala版

注意keytab路径中不要使用\\,都使用/作为分隔符

使用kerberos需要配置jaas如下日志打印,两个配置至少设置一个:

[DEBUG] org.apache.kafka.common.security.JaasContext:106 --- System property 'java.security.auth.login.config' and Kafka SASL property 'sasl.jaas.config' are not set, using default JAAS configuration.
import org.slf4j.Logger
trait Logging {
  val LOG: Logger = org.slf4j.LoggerFactory.getLogger(this.getClass)
}
import org.apache.kafka.clients.producer.ProducerRecord
import org.scalatest.FunSuite
import java.util.Properties
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import java.time.Duration
import scala.collection.JavaConversions._

class KafkaClientTest extends FunSuite with Logging {

  val JAAS_CONFIG_KEYTAB_TEMPLATE: String =
    s"""
       |com.sun.security.auth.module.Krb5LoginModule required
       |debug=true
       |doNotPrompt=true
       |storeKey=true
       |useKeyTab=true
       |keyTab="%s"
       |principal="%s";
       |""".stripMargin

  val topic = "tmp_test"
  val bootstrapServers = "01.xxx.com:6667,01.xxx..com:6667,01.xxx..com:6667"

  val principal = "xxx@XXXXX.COM"
  val keytab = "D:/xxx/xxx.keytab"
  val krb5conf = "D:/xxx/krb5.conf"

  def getProducerProps: Properties = {
    val properties = new Properties()
    properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
    properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
    properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI")
    properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
    properties
  }

  def getConsumerProps: Properties = {
    val properties = new Properties()
    properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
    properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
    properties.setProperty(SaslConfigs.SASL_MECHANISM, "GSSAPI")
    properties.setProperty(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "tester")
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    properties
  }

  def setEnv(props: Properties) = {
    System.setProperty("java.security.krb5.conf", krb5conf)

    // 以下二者选其中之一就可以了。
    // 方式一:
    System.setProperty("java.security.auth.login.config", "D:/configs/kafka_client_jaas.conf")

    // 方式二:
    val jaasStr = JAAS_CONFIG_KEYTAB_TEMPLATE.format(keytab, principal).trim
    LOG.warn(s"format str: \n${jaasStr}")
    props.setProperty(SaslConfigs.SASL_JAAS_CONFIG, jaasStr)
  }

  test("produce") {
    val props = getConsumerProps
    setEnv(props)
    try {
      val producer = new KafkaProducer[String, String](props)
      var counter = 0;
      while (true) {
        val record = new ProducerRecord[String, String](topic, s"hello ${counter}")
        val fu = producer.send(record)
        Thread.sleep(3000L)
        if (counter % 10 == 0) {
          producer.flush()
        }
        counter += 1
      }
    } catch {
      case e: Exception =>
        throw new RuntimeException(e)
    }
  }

  test("consumer") {
    val props = getConsumerProps
    setEnv(props)
    val consumer = new KafkaConsumer(props)
    consumer.subscribe(List(topic))
    while (true) {
      val record = consumer.poll(Duration.ofSeconds(3))
      val it = record.iterator()
      while (it.hasNext) {
        LOG.info(s"${it.next().value()}")
      }
    }
  }
}

kafka_client_jaas.conf 文件内容:
文件模板参考:KAFKA_HOME/conf/kafka_client_jaas.conf

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab = true
    useTicketCache=false
    storeKey = true
    keyTab="D:/***/xxx.keytab"
    principal="xxx@XXXXX.COM"
    serviceName="kafka";
};

http://www.kler.cn/news/233182.html

相关文章:

  • 书生·浦语大模型第三课作业
  • Blender教程(基础)--试图的显示模式-22
  • TDengine用户权限管理
  • 图论:合适的环
  • 【Docker】了解Docker Desktop桌面应用程序,TA是如何管理和运行Docker容器(2)
  • Spring第三天
  • Vscode编译运行多个C++文件
  • Unity GC
  • 题目练习(生死时速2.0版)
  • C#既然数组长度不可改变,那么如何动态调整集合类型数组大小,以便添加或删除元素?
  • 学习通考试怎么搜题找答案? #学习方法#微信#其他
  • Gradle IDEA 乱码
  • 图灵之旅--二叉树堆排序
  • Android 判断通知是进度条通知
  • 生成式人工智能攻击的一年:2024
  • 【Mybatis】从0学习Mybatis(2)
  • Electron基本介绍
  • [职场] 如何通过运营面试_1 #笔记#媒体#经验分享
  • centos7.9 安装rabbitmq 3.6.15 集群
  • MySQL的DDL语言
  • idea: 无法创建Java Class文件(SpringBoot)已解决
  • 部署一个在线OCR工具
  • [office] 怎么在Excel2003菜单栏自定义一个选项卡 #其他#微信#知识分享
  • Bean 的六种作用域
  • 破除Github API接口的访问次数限制
  • Android 车载应用开发之车载操作系统
  • Flink cdc3.0动态变更表结构——源码解析
  • Spring 开发 pom.xml 配置文件(通用配置)
  • C++类和对象(7)
  • 【k8s系列】(202402) 证书apiserver_client_certificate_expiration_seconds