将RocketMQ集成到了Spring Boot项目中,实现站内信功能
1. 添加依赖
首先,在pom.xml
中添加RocketMQ的依赖:
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Starter Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- H2 Database (或你选择的其他数据库) -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Redis Cache -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
</dependencies>
2. 配置文件
在application.properties
中配置RocketMQ的相关信息:
# Redis Configuration
spring.redis.host=localhost
spring.redis.port=6379
# RocketMQ Configuration
rocketmq.name-server=localhost:9876
rocketmq.producer.group=my-producer-group
3. 数据模型
定义Message
实体类:
package com.example.inbox.model;
import javax.persistence.*;
import java.time.LocalDateTime;
@Entity
public class Message {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String senderId;
private String receiverId;
private String subject;
private String body;
private LocalDateTime timestamp;
private boolean read;
// Getters and Setters
}
4. Repository接口
创建MessageRepository
接口:
package com.example.inbox.repository;
import com.example.inbox.model.Message;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface MessageRepository extends JpaRepository<Message, Long> {
}
5. Service层
在Service层中集成缓存和RocketMQ的消息发送与接收:
package com.example.inbox.service;
import com.example.inbox.model.Message;
import com.example.inbox.repository.MessageRepository;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class MessageService {
@Autowired
private MessageRepository messageRepository;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Cacheable(value = "messages", key = "#receiverId")
public List<Message> getMessagesByReceiver(String receiverId) {
return messageRepository.findByReceiverId(receiverId);
}
@CacheEvict(value = "messages", key = "#message.receiverId")
public void sendMessage(Message message) {
rocketMQTemplate.convertAndSend("messageTopic", message);
}
}
6. 消息监听器
创建一个监听器来处理来自RocketMQ的消息:
package com.example.inbox.listener;
import com.alibaba.fastjson.JSON;
import com.example.inbox.model.Message;
import com.example.inbox.service.MessageService;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "messageTopic", consumerGroup = "my-consumer-group")
public class MessageListener implements RocketMQListener<String> {
@Autowired
private MessageService messageService;
@Override
public void onMessage(String messageJson) {
Message message = JSON.parseObject(messageJson, Message.class);
message.setTimestamp(java.time.LocalDateTime.now());
messageService.saveMessage(message);
}
}
7. Controller
创建Controller来处理HTTP请求:
package com.example.inbox.controller;
import com.example.inbox.model.Message;
import com.example.inbox.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("/messages")
public class MessageController {
@Autowired
private MessageService messageService;
@GetMapping("/{receiverId}")
public List<Message> getMessages(@PathVariable String receiverId) {
return messageService.getMessagesByReceiver(receiverId);
}
@PostMapping
public void sendMessage(@RequestBody Message message) {
messageService.sendMessage(message);
}
}
8. 启动类
确保你的Spring Boot应用启动类包含必要的注解:
package com.example.inbox;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
@SpringBootApplication
@EnableCaching
public class InboxApplication {
public static void main(String[] args) {
SpringApplication.run(InboxApplication.class, args);
}
}
总结
通过以上步骤,我们成功地将RocketMQ集成到了Spring Boot项目中,实现了站内信系统的异步处理。主要步骤包括:
- 添加RocketMQ依赖:在
pom.xml
中添加RocketMQ相关的依赖。 - 配置RocketMQ:在
application.properties
中配置RocketMQ的相关参数。 - 数据模型和Repository:定义实体类和Repository接口。
- Service层:在Service层中集成RocketMQ的消息发送和Redis缓存。
- 消息监听器:使用
@RocketMQMessageListener
注解创建消息监听器,处理接收到的消息。 - Controller:创建RESTful API来处理HTTP请求。