当前位置: 首页 > article >正文

kafka使用

异步发送数据

package com.shf.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
 * 异步发送
 */
public class CustomProducer {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.4:9092");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

异步回调

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * 返回消息的信息
 */
public class CustomProducerCallback {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.120.20:9092,192.168.120.20:9093,192.168.120.20:9094");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("success");
                        System.out.println("主体:"+recordMetadata.topic());
                        System.out.println("分区:"+recordMetadata.partition());
                    } else {
                        System.out.println("fail");
                    }
                }
            });
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }

}

看原理图,返回参数就是RecordAccumulator中的
在这里插入图片描述

同步发送

import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerSync {
    @SneakyThrows
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.120.20:9092,192.168.120.20:9093,192.168.120.20:9094");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i)).get();
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }

}

原理图如下,保证生产者100%发送消息
在这里插入图片描述

分区情况

在这里插入图片描述
可以通过如果设置了key,那么分区则会通过对key进行取模得出对应的分区,自定义分区

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String msgValues = value.toString();

        int partition;

        if (msgValues.contains("shf")) {
            partition = 0;
        } else {
            partition = 1;
        }
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

docker 创建kafka

https://www.cnblogs.com/JcHome/p/16475990.html

http://www.kler.cn/a/286696.html

相关文章:

  • 线性数据结构
  • 抛弃node和vscode,如何用记事本开发出一个完整的vue前端项目
  • “腾讯、钉钉、飞书” 会议开源平替,免费功能强大
  • Unity自学之旅05
  • Python装饰器的高级用法:动态装饰器与参数传递的深度解析
  • 电容的一些常用数值
  • 基于asp.net软件缺陷跟踪系统设计与实现
  • Java:数字验证
  • Web入门-06.HTTP协议-协议解析
  • Axure设计效率提升:实战策略与技巧
  • 五种常见的人工智能错误以及如何避免它们?
  • Websocket测试工具,在线调试 - 在线工具
  • 数学建模强化宝典(6)0-1规划
  • Python操作PDF文件
  • 惠中科技 RDS 自清洁膜层:光伏领域的卓越创新
  • 【QNX+Android虚拟化方案】109 - Android 侧添加支持 busybox telnetd 服务
  • H264码流结构讲解
  • 【Go - 10分钟,快速搭建一个简易日志回传系统】
  • python-pptx - Python 操作 PPT 幻灯片
  • Golang 开发使用 gorm 时打印 SQL 语句
  • 基于nodejs+vue+uniapp的摄影竞赛小程序
  • 【MCAL】TC397+EB-tresos之SPI配置实战 - (同步/异步)
  • python从谷歌地图获取经纬度坐标之间的导航信息
  • 【KingbaseES 人大金仓】| Docker 部署 | 详细步骤
  • (mcu) 嵌入式基础简单入门(程序架构分析)
  • Python自适应光学模态星形小波分析和像差算法