IntelliJ+SpringBoot项目实战(23)--整合RabbitMQ
一、前言
在上节课中介绍了Quartz定时作业实现定时任务处理。但是在实际的项目中,很多业务不能通过定时轮询的方式,比如订单下单付款了,通过定时去扫付款的订单去做后续业务处理的话,会非常影响性能。这个时候我们需要引入消息中间件产品进行异步处理,比如订单付款后,在付款的后台业务代码中产生一个异步消息通知,然后订阅消息的业务监听组件收到消息后,异步进行后续的处理。
关于消息中间件产品有很多,目前主要有RabbitMQ、RocketMQ、Kafka等,这三者有什么差异呢?
RabbitMQ、RocketMQ和Kafka都是流行的开源消息队列系统,每个系统都有自己的特点和用途。
-
RabbitMQ:
高度可靠,支持AMQP(高级消息队列协议);用于系统间的异步通信和数据分发。支持消息持久化和高可用性。
-
RocketMQ:
设计为高可用和高并发的消息中间件;支持分布式事务消息和顺序消息。阿里巴巴的开源项目,广泛应用于金融行业。
-
Kafka:设计为高吞吐量的分布式消息系统。适用于大数据场景下的实时数据处理。支持数据持久化和复制。
对比RabbitMQ、RocketMQ和Kafka的特性,可以从以下几个方面进行考虑:
-
可靠性:RabbitMQ和RocketMQ支持持久化消息,Kafka默认将消息持久化到磁盘。
-
吞吐量:Kafka具有很高的吞吐量,能够处理大量的数据。
-
延迟:Kafka具有较低的延迟,适合处理需要实时处理的数据。
-
分布式:RocketMQ和Kafka支持分布式部署,可以横向扩展。
-
开源协议:RabbitMQ支持AMQP,RocketMQ支持JMS,Kafka支持自有的TCP协议。
-
成本:RabbitMQ和Kafka可能需要较高的硬件成本,RocketMQ可能更容易管理。
-
社区支持:RabbitMQ和RocketMQ有较大的社区支持和商业支持。
在选择消息队列时,需要根据具体的应用场景和需求来决定。例如,对于需要高吞吐量和低延迟的实时数据处理,Kafka可能是更好的选择;而对于需要高可靠性和复杂消息队列功能的场景,RabbitMQ或许是更合适的。
因为RabbitMQ具有高的可靠性,所以我们在做业务处理的时候,可以选择RabbitMQ作为业务处理的中间件。下面介绍RabbitMQ的开发过程。
二、引入RabbitMQ依赖
在openjweb-starter下的mq-openjweb-starter中引入RabbitMQ依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后在openjweb-sys中引入依赖mq-openjweb-starter:
<dependency>
<groupId>org.openjweb</groupId>
<artifactId>mq-openjweb-starter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
三、RabbitMQ消息队列开发
现在我们需要定义关于rabbitMQ的几个配置参数,在openjweb-sys中的application-dev.xml中增加下面的配置:
mq:
rabbit:
namePrefix: openjweb-cloud
host: 127.0.0.1:5672
username: guest
password: guest
上面参数中,namePrefix是定义MQ交换机、队列的名称前缀,host是rabbitMQ主机的ip和端口,username和password 分别是rabbitMQ登录用户和密码。
然后我们在mq-openjweb-starter中定义一个属性类,用于加载上面的配置:
package org.openjweb.mq.starter.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@ConfigurationProperties(prefix = "mq.rabbit")
@Data
@Component
public class DefaultMqProperties {
/**
* rabbitMq 名称前缀(交换机、队列名称)
*/
private String namePrefix ;
/**
* mq host
*/
private String host ;
/**
* mq 用户名
*/
private String username ;
/**
* mq 密码
*/
private String password ;
}
然后我们在mq-openjweb-starter中定义一个RabbitMQ的自动配置类RabbitMqAutoConfig:
package org.openjweb.mq.starter.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(DefaultMqProperties.class)
public class RabbitMqAutoConfig {
@Autowired DefaultMqProperties defaultMqProperties;
private CachingConnectionFactory connectionFactory;
/**
* 连接工厂使用自定义配置
*
* @param connectionFactory connectionFactory
* @param defaultMqProperties starhumanMqProperties
*/
public RabbitMqAutoConfig(CachingConnectionFactory connectionFactory,
DefaultMqProperties defaultMqProperties) {
connectionFactory.setUsername(defaultMqProperties.getUsername());
connectionFactory.setPassword(defaultMqProperties.getPassword());
connectionFactory.setAddresses(defaultMqProperties.getHost());
connectionFactory.afterPropertiesSet();
this.connectionFactory = connectionFactory;
System.setProperty("spring.amqp.deserialization.trust.all","true");
//更新常量
}
/**
* 交换机
*
* @return DirectExchange
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange( defaultMqProperties.getNamePrefix()+"_exchange", true, false);
}
/**
* 延迟队列交换机
*
* @return DirectExchange
*/
@Bean
public DirectExchange lazyExchange() {
DirectExchange directExchange = new DirectExchange(defaultMqProperties.getNamePrefix()+"_lazy_exchange", true, false);
directExchange.setDelayed(true);
return directExchange;
}
/**
* admin操作日志队列
*
* @return Queue
*/
@Bean
public Queue adminLogQueue() {
return new Queue("LOG_QUEUE", true);
}
@Bean
public Binding adminLogBinding() {
return BindingBuilder.bind(adminLogQueue()).to(directExchange()).with("LOG_QUEUE");
}
/**
* 连接工厂,单一消费者,发生异常丢弃消息
*/
@Bean("factory_single_pass_err")
public SimpleRabbitListenerContainerFactory starhumanMqFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setBatchSize(1);
//跳过异常
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
return factory;
}
}
代码说明:
在上面的代码中,定义了2个交换机,directExchange()定义的交换机命名为openjweb_cloud_exchange,lazyExchange定义了一个交换机命名为openjweb_cloud_lazy_exchange,但是第二个设置了延迟加载为true,是延迟加载交换机,延迟加载需要使用延迟加载组件,后面介绍RabbitMQ安装的时候会介绍这个插件安装。
在业务开发的时候,如果定义多个消息队列,比如订单下单成功消息、支付成功消息队列等。步骤是,首先增在这个类中增加一个类型为Queue的Bean:
@Bean
public Queue adminLogQueue() {
return new Queue("LOG_QUEUE", true);
}
LOG_QUEUE 是自定义的消息队列的名字。
第二步是将这个Bean及对应的消息队列绑定到一个交换机:
@Bean
public Binding adminLogBinding() {
return BindingBuilder.bind(adminLogQueue()).to(directExchange()).with("LOG_QUEUE");
}
注意上面的写法。然后我们在openjweb-sys的模块中增加一个测试接口,用于产生消息:
package org.openjweb.sys.api;
import lombok.extern.slf4j.Slf4j;
import org.openjweb.mq.starter.config.DefaultMqProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* http://localhost:8001/demo/api/mq/test
*/
@Slf4j
@RequestMapping("/demo/api/mq")
@RestController
public class RabbitMQDemoApi {
@Autowired
DefaultMqProperties defaultMqProperties;
@RequestMapping("test")
public String test(){
String host = defaultMqProperties.getHost();//读取成功
return host;
}
@Resource
private RabbitTemplate rabbitTemplate;
//http://localhost:8001/demo/api/mq/newMsg
//http://localhost:15672/#/
@RequestMapping("newMsg")
public String newMsg(){
rabbitTemplate.convertAndSend( "openjweb-cloud_exchange", "LOG_QUEUE", "hello,i'm msg");
//String host = defaultMqProperties.getHost();//读取成功
return "success";
}
}
在上面的代码中,newMsg方法是向交换机 openjweb-cloud_exchange的LOG_QUEUE消息队列发送一个hello,i'm msg的消息,注意最后这个hello,i'm msg这个字符串参数也可以替换成其他的可序列化的对象,例如JSON,或者业务类的实例。
另外我们再开发一个消息监听类,在openjweb-sys中实现一个消息监听类MqLogConsumer:
package org.openjweb.sys.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* mq处理日志信息
*
* @author xd
*/
@Slf4j
@Component
public class MqLogConsumer {
@RabbitListener(queues = "LOG_QUEUE", containerFactory ="factory_single_pass_err")
public void adminLogBindingConsumer(String msg) {
try {
log.info("接收日志消息::::::");
log.info(msg);
} catch (Exception e) {
//修改拼团活动团队状态失败
}
}
}
运行测试:http://localhost:8001/demo/api/mq/newMsg
控制台输出信息:
这样RabbitMQ的例子就运行成功,大家可以试着再参考上面的示例再增加一个消息队列Bean,另外发送消息的时候可以发送一个类实例来替代字符串。下面是RabbitMQ后台消息队列:
下面是RabbitMQ后台的交换机:
四、RabbitMQ安装步骤
Linux环境中安装RabbitMQ需要按照下面的步骤:
(1)首先安装erlang。
(2)安装rabbitMQ。
(3)安装RabbitMQ的延迟加载组件。
注意三者的版本要匹配。
RabbitMQ的配置(vi /etc/rabbitmq/rabbitmq.config 写入以下配置信息)
[
{rabbit, [
{ tcp_listeners, [ 5672 ] }, { ssl_listeners, [ ] },
{default_user, <<"guest">>}, {default_pass, <<"guest">>}, {loopback_users, []},
{auth_mechanisms, ['EXTERNAL']} ]
},
{ rabbitmq_management, [ { listener, [
{ port, 15672 }, { ssl, false }
]
}
] }
].
在上面设置了rabbitMQ 的监听端口为5672,管理后台的端口为15672,创建的默认用户名和密码都是guest。
网上下载RabbitMQ匹配版本的延迟加载组件,其后缀是.ez,放到RabbitMQ主目录的plugins下,然后启动RabbitMQ:
systemctl start rabbitmq-server
systemctl enable rabbitmq-server
在Windows环境下安装RabbitMQ,一是可以使用hyper-V虚拟机管理软件安装Linux虚拟机,另外就是下载Windows版的RabbitMQ,Windows版本的RabbitMQ更新的不是很及时,可以使用3.7.10版本的。网上找的rabbitmq_delayed_message_exchange-3.8.0.ez 延迟加载组件可以在3.7.10下使用。erlang版本对应的是21。
网上下载延迟加载组件的地址:
Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
3.7系列的RabbitMQ使用下面的插件:
下载后,放到rabbitMQ安装目录的plugins目录下,如C:\rabbitmq\rabbitmq_server-3.7.10\plugins(实际目录根据本地安装情况做下修改)。
RabbitMQ安装后,以管理员的身份运行下面的命令:
- 输入指令激活插件:rabbitmq-plugins.bat enable rabbitmq_management
- 延迟插件rabbitmq_delayed_message_exchange-3.8.0.ez复制到plugins目录,然后运行:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- rabbitmq_server-3.7.10\etc目录将rabbitmq.config.example改为rabbitmq.config,然后找到
去掉标红处的%%,就是启用配置,配置了guest用户和guest密码和default_vhost主机/
- 重启服务器:net stop RabbitMQ && net start RabbitMQ
若出现服务启动之后闪退的情况,一般是因为启动了ActiveMq,导致Java.exe的程序占用了5672端口。关闭ActiveMq,解决此问题。
此时也可到服务中查看RabbitMQ,手动重启
若还未解决,可以查看报错日志C:\Users\Administrator\AppData\Roaming\RabbitMQ\log具体分析
登录验证:
后台登录地址:http://localhost:15672/
默认账户: guest 密码:guest
- 由于账号guest具有所有的操作权限,并且又是默认账号,出于安全因素的考虑,guest用户只能通过localhost登陆使用
将ebin目录下rabbit.app中loopback_users里的<<"guest">>删除,或者在配置文件rabbitmq.config中对该项进行配置,用以解决上述问题。
也可以新建用户管理 rabbitmq,常见用户管理命令:
- 新增用户 rabbitmqctl add_user Username Password
- 删除用户 rabbitmqctl delete_user Username
- 修改用户密码 rabbitmqctl change_password Username Newpassword
- 查看当前用户列表 rabbitmqctl list_users
本示例完整代码可从github下载:
GitHub - openjweb/cloud at masterOpenJWeb is a java bases low code platform. Contribute to openjweb/cloud development by creating an account on GitHub.https://github.com/openjweb/cloud/tree/master