当前位置: 首页 > article >正文

Flink SQL 支持 kafka 开启 kerberos 权限控制.

一. 背景.

最近在验证kafka 开启kerberos的情况下, flink任务的支持情况.
但是验证的时候发现一个互斥的情况. 在读取数据的时候, 在开启kafka gruop id的权限控制的时候, flink sql 即使设置了gruop id , 竟然还能读取数据.
这个和预期不符. 所以才较真验证了一下.

二. kafka消费topic数据姿势

消费kafka的数据的时候首先要构造KafkaConsumer客户端, 然后KafkaConsumer客户端有两种方式读取topic 中的数据.

  • 使用 subscribe 是最常见的,因为它支持动态分区再均衡和消费者组的管理,适合多数场景。
  • 使用 assign 适合需要精确控制分区消费的特定场景,但不支持自动再均衡,因此需要开发者手动管理分区分配和调整。

2.1. subscribe 方法

  1. 目的:主要用于订阅一个或多个主题。消费者会自动分配这些主题的分区。
  2. 使用场景:适合使用消费者组(Consumer Group)的场景。Kafka 会自动进行分区的再均衡(rebalancing),确保同一消费者组内不会有多个消费者消费同一分区。
  3. 自动分配:使用 subscribe 时,Kafka 会自动为消费者分配它所订阅主题下的分区。
  4. 再均衡监听器:可以通过实现 ConsumerRebalanceListener 接口来自定义在分区再均衡时的行为。
  5. 动态性:如果新的分区被添加到主题中,消费者将自动开始消费新的分区。
  6. API 示例:
    List<String> topics = Arrays.asList("topic1", "topic2");
    consumer.subscribe(topics);
    

2.2. assign 方法

  1. 目的:用于手动分配消费者要消费的具体分区。
  2. 使用场景:适合需要对某些特定分区进行精确控制的场景。例如,需要单独处理特定分区时。
  3. 手动分配:通过 assign 方法,开发者显式指定消费者应该消费哪些分区。
  4. 无再均衡:使用 assign 时,Kafka 不会执行分区再均衡。消费者组的概念在这种模式下不适用。
  5. 静态性:如果主题增加了新的分区,消费者不会自动开始消费这些新分区,除非显式地调用 assign 方法来分配新的分区。
  6. API 示例:
    List<TopicPartition> partitions = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1));
    consumer.assign(partitions);
    

三. 使用java client 验证.

3.1. 总结

  1. 无论subscribe 和assign 都需要授权topic .
  2. subscribe 方法需要指定group id , 所以需要group id 授权.
  3. assign 方法 group id 不是必填项, 不指定group id 的时候, group id 不生效, 指定了之后group id , 权限控制就会生效.

3.2. subscribe 方法

        
        
    public static void main(String[] args) {
        System.setProperty("java.security.krb5.conf", "tmp/krb5.conf");

        Properties props = new Properties();
        // group.id,指定了消费者所属群组
        props.put("bootstrap.servers", "master01:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "kafka-group-01");
        props.put("auto.offset.reset","earliest");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " +
                "useKeyTab=true " +
                "keyTab=\"/opt/iceberg/gydev_easyops-cluster/sloth.keytab\" " +
                "storeKey=true " +
                "useTicketCache=false " +
                "serviceName=\"kafka\" " +
                "principal=\"sloth/ALL@BDMS.COM\";");


        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Collections.singletonList("sloth-validate-01"));
        
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
        for (ConsumerRecord<String, String> record : records) {
            LOG.info("KafkaConsumerDemoSubscribe#ConsumerRecord  -> KEY : {} , VALUE : {} ", record.key(),record.value());
         }

    }
    

3.3. assign 方法示例


    public static void main(String[] args) {
        System.setProperty("java.security.krb5.conf", "tmp/krb5.conf");

        Properties props = new Properties();
        // group.id,指定了消费者所属群组
        props.put("bootstrap.servers", "easydata-dev13.gy.ntes:9092,easydata-dev14.gy.ntes:9092,easydata-dev12.gy.ntes:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        props.put("group.id", "sloth");
        props.put("auto.offset.reset","earliest");
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "GSSAPI");
        props.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " +
                "useKeyTab=true " +
                "keyTab=\"/opt/iceberg/gydev_easyops-cluster/sloth.keytab\" " +
                "storeKey=true " +
                "useTicketCache=false " +
                "serviceName=\"kafka\" " +
                "principal=\"sloth/ALL@BDMS.COM\";");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        String topic = "sloth-validate-01";
        topic= "sloth_kerberos";
        consumer.assign(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2)));
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

        for (ConsumerRecord<String, String> record : records) {
             LOG.info("KafkaConsumerDemoAssign#ConsumerRecord  -> KEY : {} , VALUE : {} ", record.key(),record.value());
         }
         
    }

四. FLINK SQL 任务验证…

flink 官方文档:
FLINK 使用assign构建KafkaConsumer , scan.startup.mode 配置项决定了 Kafka consumer 的启动模式。

序号参数含义kafka gruop id 是否必填
1group-offsets (默认)从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。
2earliest-offset从可能的最早偏移量开始
3latest-offset从最末尾偏移量开始
4timestamp从用户为每个 partition 指定的时间戳开始
4specific-offsets从用户为每个 partition 指定的偏移量开始
  • 只有使用scan.startup.mode group-offsets flink任务运行的时候才会报gruop id 相关的权限异常.

异常信息:

Caused by: org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: kafka-validate-group-xx


http://www.kler.cn/a/449061.html

相关文章:

  • Day13 苍穹外卖项目 工作台功能实现、Apache POI、导出数据到Excel表格
  • JVM系列(十三) -常用调优工具介绍
  • 差分矩阵(Difference Matrix)与累计和矩阵(Running Sum Matrix)的概念与应用:中英双语
  • Linux 中的 cat 命令:使用、原理与源码解析
  • NLP 中文拼写检测开源-01-基于贝叶斯公式的拼写检查器 CSC
  • 本机如何连接虚拟机MYSQL
  • MySQL 数据库连接数查询、配置
  • GraalVM完全指南:云原生时代下使用GraalVM将Spring Boot 3应用转换为高效Linux可执行文件
  • 【微信小程序】2|轮播图 | 我的咖啡店-综合实训
  • 服务器建立-错误:pyenv环境建立后python版本不对
  • 如何解决 ‘adb‘ 不是内部或外部命令,也不是可运行的程序或批处理文件的问题
  • 观成科技:轻量级内网穿透工具natpass加密流量分析
  • Qt中的异步相关类
  • JDK11下载安装和配置超详细过程
  • c++介绍
  • Vue3之状态管理Vuex
  • 选择屏幕的用法
  • Lua脚本在FreeSWITCH中的应用
  • VBA技术资料MF243:利用第三方软件复制PDF数据到EXCEL
  • Nginx 负载均衡的实现
  • 智能体实战(六顶思考帽)一、六顶思考帽智能体实现(基于柳丁思考帽理论,让大模型在不同角度对问题进行思考并给出答案)
  • Highcharts 饼图:数据可视化利器
  • 谷歌集群数据集:负载均衡云服务测试数据
  • 自动驾驶控制算法-横向误差微分方程LQR前馈控制
  • 深度学习在语音识别中的应用
  • 【Spring】控制反转(IoC)与依赖注入(DI)—依赖注入的分类与方式