当前位置: 首页 > article >正文

使用Protocol Buffers传输数据

使用 Google Protocol Buffers(ProtoBuf)与 Kafka 结合来定义和传输数据,可以确保传输数据的结构性、可扩展性和高效性。以下是一个简单的步骤指南,帮助你实现生产者和消费者。

1. 定义 ProtoBuf 消息格式

首先,你需要定义传输内容的消息格式。

示例:message.proto

syntax = "proto3";

message ExampleMessage {
  int32 id = 1;
  string name = 2;
  double value = 3;
}

2. 编译 Proto 文件

使用 protoc 编译 .proto 文件,生成相应语言的类文件。假设你使用的是 Java:

protoc --java_out=./src/main/java message.proto

这将生成一个 ExampleMessage 的 Java 类,用于序列化和反序列化数据。

3. 实现 Kafka 生产者

接下来,编写 Kafka 生产者,将 ProtoBuf 序列化的数据发送到 Kafka。

示例:Producer.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import example.protobuf.ExampleMessage; // 这是由 protoc 生成的类

import java.util.Properties;

public class Producer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", ByteArraySerializer.class.getName());
        props.put("value.serializer", ByteArraySerializer.class.getName());

        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);

        // 创建一个 ExampleMessage 实例
        ExampleMessage message = ExampleMessage.newBuilder()
                .setId(1)
                .setName("Test")
                .setValue(10.5)
                .build();

        // 序列化消息并发送
        producer.send(new ProducerRecord<>("your_topic", message.toByteArray()));

        producer.close();
    }
}

4. 实现 Kafka 消费者

然后,编写 Kafka 消费者,接收并反序列化 ProtoBuf 数据。

示例:Consumer.java

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import example.protobuf.ExampleMessage;

import java.util.Collections;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", ByteArrayDeserializer.class.getName());
        props.put("value.deserializer", ByteArrayDeserializer.class.getName());

        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("your_topic"));

        while (true) {
            ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
            for (ConsumerRecord<byte[], byte[]> record : records) {
                try {
                    ExampleMessage message = ExampleMessage.parseFrom(record.value());
                    System.out.println("Received message: " + message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

5. 编译和运行

确保你已经编译了 .proto 文件并将生成的类文件包含在你的项目中。然后你可以编译和运行生产者和消费者。

javac Producer.java Consumer.java -cp "path_to_kafka_clients_jar:path_to_protobuf_jar"
java Producer
java Consumer

总结

  • ProtoBuf 提供了一种高效的方式来定义和序列化消息,而 Kafka 是一种分布式流处理平台。
  • 通过将 ProtoBuf 与 Kafka 结合,可以在不同服务之间以结构化的方式传输高效的数据。
  • 你需要使用 protoc 编译 .proto 文件,并在生产者和消费者中使用生成的类来序列化和反序列化数据。

这样,生产者可以发送结构化的 ProtoBuf 消息到 Kafka,消费者可以接收并解析这些消息。


http://www.kler.cn/a/295024.html

相关文章:

  • 检查系统是否安装 OpenSSH,root 用户无法登录问题处理
  • 什么是嵌入式操作系统?
  • 安卓图片的着色教程(tint的使用)
  • 如何做好多项目进度管理
  • 数据结构C语言描述2(图文结合)--有头单链表,无头单链表(两种方法),链表反转、有序链表构建、排序等操作,考研可看
  • Oracle 第27章:Oracle APEX开发
  • 在vscode中用virtual env的方法
  • git如何灵活切换本地账号对应远程github的两个账号
  • 代码随想录:279. 完全平方数
  • 如何在Selenium中使用Chrome进行网络限速
  • ComfyUI+Krea免费利用AI制作网站萌宠IP,五步搞定制作AI萌宠
  • React 响应事件
  • 【Godot4.3】多边形的斜线填充效果基础实现
  • 在Ubuntu 20.04上安装Nginx的方法
  • 懒人笔记-opencv4.8.0篇
  • 【详解 Java 注解】
  • 一些数学经验总结——关于将原一元二次函数增加一些限制条件后最优结果的对比(主要针对公平关切相关的建模)
  • 分数阶微积分MATLAB计算
  • 将你的github仓库设置为web代理
  • Java零基础-如何在分布式系统中进行日志管理?
  • 【鸿蒙】HarmonyOS NEXT星河入门到实战1-开发环境准备
  • Vulnhub:Dr4g0n b4ll 1
  • Qt/C++开源项目 TCP客户端调试助手(源码分享+发布链接下载)
  • <class ‘pyspark.sql.dataframe.DataFrame‘>
  • Eureka原理与实践:构建高可用微服务架构的基石
  • MCU5.51单片机的最小系统