当前位置: 首页 > article >正文

通过多线程的方式每次发送10条MQ消息

背景:传入一个List<person>,不知道list中有多少条数据。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.Message;
import org.apache.rocketmq.client.producer.SendResult;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiThreadMessageSender {

    public static void main(String[] args) {
        // 设置生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("your_producer_group");
        // 设置NameServer地址,多个地址用分号分隔
        producer.setNamesrvAddr("your_namesrv_address");

        // 启动生产者实例
        try {
            producer.start();

            // 创建消息集合
            List<Person> personList = // 从某处获取您的 Person 数据

            // 创建线程池
            ExecutorService executorService = Executors.newFixedThreadPool(5); // 这里使用固定大小为5的线程池,您可以根据需要进行调整

            // 每10条数据为一批,提交到线程池处理
            for (int i = 0; i < personList.size(); i += 10) {
                List<Person> subList = personList.subList(i, Math.min(i + 10, personList.size()));

                // 提交任务到线程池
                executorService.submit(() -> sendMessages(subList, producer));
            }

            // 关闭线程池
            executorService.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭生产者实例
            producer.shutdown();
        }
    }

    private static void sendMessages(List<Person> subList, DefaultMQProducer producer) {
        try {
            // 创建消息集合
            List<Message> messages = new ArrayList<>();

            // 构造消息
            for (Person person : subList) {
                // 将 Person 对象转换为字符串,作为消息内容
                String messageContent = person.getName() + "," + person.getAge();
                Message message = new Message("your_topic", "your_tag", messageContent.getBytes());
                messages.add(message);
            }

            // 发送消息
            SendResult sendResult = producer.send(messages);
            System.out.println("Thread " + Thread.currentThread().getId() + " Send Result: " + sendResult);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在上述示例中,我们使用了Java的ExecutorService线程池来管理线程。每个线程负责处理10条Person对象,将它们转换为RocketMQ消息并发送。这样,多个线程可以并行处理不同的批次,提高了消息发送的效率。


http://www.kler.cn/a/134877.html

相关文章:

  • 【Mode Management】AUTOSAR架构下唤醒源检测函数EcuM_CheckWakeup详解
  • 使用支付宝沙箱完成商品下单
  • 服务号消息折叠折射出的腾讯傲慢:上云会不会也一样?
  • PostgreSQL 开启密码验证插件
  • 【测试框架篇】单元测试框架pytest(1):环境安装和配置
  • Vue7种组件之间通信方式
  • 用向量数据库Milvus Cloud搭建GPT大模型+私有知识库的定制AI助手——PPT大纲助手
  • 企业怎样申请SSL证书?
  • vue动态配置路由
  • 应用软件安全编程--21密钥长度应该足够长
  • 网络协议入门 笔记一
  • 数据结构八种内部排序算法c++实现
  • Mac开发指南
  • MySQL 的执行原理(四)
  • 通过U盘重装Win10教程图解
  • 如何看待阿里云发布的全球首个容器计算服务 ACS?
  • LeetCode【32】最长的有效括号
  • 系列七、GC垃圾回收【四大垃圾算法-标记压缩算法】
  • Prompt提示词——什么是CRISPE框架?QCIPSPE框架?
  • 通达信的ebk文件
  • IDA的各个视图的含义,View-A、Hex View-1等
  • 大数据基础设施搭建 - MySQL
  • 合并两个有序链表(冒泡排序实现)
  • 【MySql密码爆破脚本】用于其他爆破工具无法使用的情况下
  • 概念解析 | 网络安全数字孪生(Digital Twin of Cyber Security, DTCS)技术
  • 力扣刷题:1. 两数之和