RocketMQ安装与使用
什么是消息中间件
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)例如:寄快递
消息中间件使用场景
异步处理
场景说明:用户注册后,需要发注册邮件和注册短信
将注册信息写入数据库成功后,发送注册邮件,再发送注册短信,以上三个任务全部完成后,返回给客户端
引入消息队列,改造后的架构如下
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此架构改变后,系统的吞吐量比串行提高了3倍,比并行提高了2倍
应用解耦
场景说明:用户下单后,订单系统需要通知库存系统,传统的做法是订单系统通过调用库存系统的接口来对库存进行操作
解耦合后:
订单系统:假如在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现订单系统与库存系统的应用解耦
常见消息中间件比较
特性MQ | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
生产者消费者模式 | 支持 | 支持 | 支持 | 支持 |
发布订阅模式 | 支持 | 支持 | 支持 | 支持 |
请求回应模式 | 支持 | 支持 | 不支持 | 不支持 |
Api完备性 | 高 | 高 | 高 | 高 |
多语言支持 | 支持 | 支持 | java | 支持 |
单机吞吐量 | 万级 | 万级 | 万级 | 十万级 |
消息延迟 | 无 | 微秒级 | 毫秒级 | 毫秒级 |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
消息丢失 | 低 | 低 | 理论上不会丢失 | 理论上不会丢失 |
文档的完备性 | 高 | 高 | 较高 | 高 |
提供快速入门 | 有 | 有 | 有 | 有 |
社区活跃度 | 高 | 高 | 中 | 高 |
商业支持 | 无 | 无 | 商业云 | 商业云 |
安装
解压即安装,注意 jdk 版本为 8
启动
启动broker使用命令,可以开启自动创建topic,否则会报错
建议使用命令启动
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
autoCreateTopicEnable=true
开启自动创建topic
使用
硬编码方式发送
导入依赖无需配置
<!--MQ-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
- 创建消息生产者, 指定生产者所属的组名
- 指定Nameserver地址
- 启动生产者
- 创建消息对象,指定主题、标签和消息体
- 发送消息
- 关闭生产者
package com.ape.rocketmq.test;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
// 发送同步消息
public class RocketMQSendTest01 {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
//1. 创建消息生产者, 指定生产者所属的组名
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
//2. 指定Nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
//3. 启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
//4. 创建消息对象,指定主题、标签和消息体
Message msg = new Message("myTopic", "myTag", ("十行代码九个错误八个警告竟敢说七日精通六天学会五湖四海也不见如此三心二意之程序简直一等下流" + i).getBytes());
//5. 发送消息
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
//6. 关闭生产者
producer.shutdown();
}
}
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。只会等待MQ发送状态
相比于同步消息,就是在发送的时候new一个SendCallback类重写onSuccess以及onException方法
package com.ape.rocketmq.test;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
//发送异步消息
//异步发送比较浪费性能,经常会失败,所以发送多几次并且让线程休眠几秒
public class RocketMQSendTest02 {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("myTopic", "myTag2", ("十行代码九个错误八个警告竟敢说七日精通六天学会五湖四海也不见如此三心二意之程序简直一等下流" + i).getBytes());
// 发送消息时候new一个类
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功:" + sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败:" + throwable);
}
});
TimeUnit.SECONDS.sleep(3);
}
producer.shutdown();
}
}
发送单向消息
单向发送消息 类似UPD 只管发不管能不能收到,这种方式主要用在不特别关心发送结果的场景,例如日志发送。
相比于同步消息发送时候没有返回值
package com.ape.rocketmq.test;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit;
// 单向发送消息 类似UPD 只管发不管能不能收到
public class RocketMQSendTest03 {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("myTopic", "myTag3", ("我是单向发送,类似于UPD" + i).getBytes());
// 发送单向消息,没有任何返回结果
producer.send(message);
TimeUnit.SECONDS.sleep(3);
}
producer.shutdown();
}
}
硬编码方式接收
消息接收步骤:
- 创建消息消费者, 指定消费者所属的组名
- 指定Nameserver地址
- 指定消费者订阅的主题和标签
- 设置回调函数,编写处理消息的方法
- 启动消息消费者
package com.ape.rocketmq.test;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
// 接收消息
public class RocketMQReceiveTest01 {
public static void main(String[] args) throws Exception {
// 创建消费者 指定所属组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumer-group");
// 指定NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 指定订阅生产者的主题和标签
consumer.subscribe("myTopic", "*");
//CLUSTERING-clustering 集群
//BROADCASTING-broadcasting 广播
// consumer.setMessageModel(MessageModel.BROADCASTING);
// 设置回调方法 编写消息处理方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(new String(list.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5. 启动消息消费者
consumer.start();
System.out.println("消费者1已经启动……");
}
}
负载均衡模式(默认模式)
不设置接收模式默认就是负载均衡,轮询
也就是不写consumer.setMessageModel(MessageModel.BROADCASTING);
广播模式
消费者采用广播的方式消费消息,每个消费者(订阅同一个主题的)都能接收到消息,并且每个消费者消费的消息都是相同的
consumer.setMessageModel(MessageModel.BROADCASTING);
集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
与spingboot集成
业务场景:下单成功之后,向下单用户发送短信
添加依赖与配置文件
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
生产者配置文件
server:
port: 8091
rocketmq:
name-server: localhost:9876
producer:
group: shop-order
消费者配置文件
server:
port: 8071
rocketmq:
name-server: localhost:9876
项目地址E:\Codes\Idea_java_works\apesource\springboot\微服务\springboot_rocketmq02
结构如下
生产者
package com.apesource.shoporder.controller;
import com.apesource.shopcommon.pojo.Order;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class OrderController {
@Autowired(required = false)
private RocketMQTemplate template;
@RequestMapping("/order/prod/{pid}")
public Order order(Integer pid){
// 下单创建订单 根据商品id查询数据库的到信息赋值给order 这里直接新建order模拟
Order order = new Order();
order.setUid(1);
order.setUsername("测试用户");
order.setPid(pid);
order.setPname("大豫竹");
order.setPprice(2.0);
order.setNumber(1);
// 下订单 给数据库的order表新增一行数据 这里输出来模拟
System.out.println(order);
// 下单成功 给用户发短信 这里直接发送order对象来模拟
template.convertAndSend("order-topic",order);
return order;
}
}
消费者监听器
package com.apesource.shopuser.listener;
import com.apesource.shopcommon.pojo.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
//接收信息并且发送短信给用户
@Component
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
System.out.println(order);
}
}
按照逻辑前端发送请求
order服务会打印order对象,user服务也会打印order对象
order服务
user服务