通过多线程的方式每次发送10条MQ消息
背景:传入一个List<person>,不知道list中有多少条数据。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.Message;
import org.apache.rocketmq.client.producer.SendResult;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiThreadMessageSender {
public static void main(String[] args) {
// 设置生产者组名
DefaultMQProducer producer = new DefaultMQProducer("your_producer_group");
// 设置NameServer地址,多个地址用分号分隔
producer.setNamesrvAddr("your_namesrv_address");
// 启动生产者实例
try {
producer.start();
// 创建消息集合
List<Person> personList = // 从某处获取您的 Person 数据
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5); // 这里使用固定大小为5的线程池,您可以根据需要进行调整
// 每10条数据为一批,提交到线程池处理
for (int i = 0; i < personList.size(); i += 10) {
List<Person> subList = personList.subList(i, Math.min(i + 10, personList.size()));
// 提交任务到线程池
executorService.submit(() -> sendMessages(subList, producer));
}
// 关闭线程池
executorService.shutdown();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭生产者实例
producer.shutdown();
}
}
private static void sendMessages(List<Person> subList, DefaultMQProducer producer) {
try {
// 创建消息集合
List<Message> messages = new ArrayList<>();
// 构造消息
for (Person person : subList) {
// 将 Person 对象转换为字符串,作为消息内容
String messageContent = person.getName() + "," + person.getAge();
Message message = new Message("your_topic", "your_tag", messageContent.getBytes());
messages.add(message);
}
// 发送消息
SendResult sendResult = producer.send(messages);
System.out.println("Thread " + Thread.currentThread().getId() + " Send Result: " + sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
}
在上述示例中,我们使用了Java的ExecutorService
线程池来管理线程。每个线程负责处理10条Person
对象,将它们转换为RocketMQ消息并发送。这样,多个线程可以并行处理不同的批次,提高了消息发送的效率。