仿 RabbitMQ 的消息队列2(实战项目)
六,数据库设计
紧接上一篇博客,我们将对数据库的 增,删,查已经写完了,下面我们就建一个管理数据库的类:DataBaseManager
通过这个类来封装针对数据库的操作
//对于所有的数据进行整合处理
public class DataManager {
//由于,我们并不打算将DataManager交给spring管理,所以,就不能直接使用@Autowired注解,而是手动的获取MetaMapper bean对象
private MetaMapper metaMapper;
public void init(){
//手动获取bean对象:
metaMapper = MqApplication.context.getBean(MetaMapper.class);
//如果此时数据库已经存在,不用初始化:
if(checkIsExits()){
System.out.println("[DataManager] 数据已经存在!!!");
}else {
//这里其实是没有 创建 meta.db文件的,原因如下:
File file = new File("./data");
file.mkdirs();
//如果没有初始化过,那就先将表建好
createTable();
//然后填入一些默认数据:
fillDefaultData();
System.out.println("[DataManager] 数据库初始化完成!!!");
}
}
public void deleteDM(){
File file = new File("data/meta.db");
boolean ret1 = file.delete();
if(ret1){
System.out.println("[DataManager] 删除数据库文件成功!");
}else {
System.out.println("[DataManager] 删除数据库文件失败!");
}
File file2 = new File("./data");
boolean ret2 = file2.delete();
if(ret2) {
System.out.println("[DataManager] 删除数据库目录/data 成功!");
}else {
System.out.println("[DataManager] 删除数据库目录/data 失败!");
}
}
private boolean checkIsExits() {
File file = new File("data/meta.db");
if(file.exists()){
return true;
}
return false;
}
//这个方法用来建表 ,
//建库操作并不需要手动创建 mata.db 文件
//首次执行这里的数据库操作的时候,就会自动的创建出meta.db文件来 (MyBatis 帮我们完成的)
private void createTable() {
metaMapper.createExchangeTable();
metaMapper.createMESGQueueTable();
metaMapper.createBindingTable();
System.out.println("[DataManager] 创建数据库表完成");
}
//默认只是初始化一个交换机
private void fillDefaultData() {
Exchange exchange = new Exchange();
exchange.setName("");
exchange.setType(ExchangeType.DIRECT);
exchange.setDurable(true);
exchange.setAutoDelete(false);
metaMapper.insertExchangeTable(exchange);
}
//insert操作:
public void insertExchange(Exchange exchange){
metaMapper.insertExchangeTable(exchange);
}
public void insertMESGQueue(MESGQueue mesgQueue){
metaMapper.insertMESGQueueTable(mesgQueue);
}
public void insertBinding(Binding binding){
metaMapper.insertBindingTable(binding);
}
//deleta操作:
public void deleteExchange(String exchangeName){
metaMapper.deleteExchangeTable(exchangeName);
}
public void deleteMESGQueue(String MESGQueueName){
metaMapper.deleteMESGQueueTable(MESGQueueName);
}
public void deleteBinding(Binding binding){
metaMapper.deleteBindingTable(binding);
}
public List<Exchange> selectExchange(){
return metaMapper.selectExchangeTable();
}
public List<MESGQueue> selectMESGQueue(){
return metaMapper.selectMESGQueueTable();
}
public List<Binding> selectBinding(){
return metaMapper.selectBindingTable();
}
}
手动获取bean对象需要的context 必须要先初始化:所以我们在启动类里获取。
@SpringBootApplication
public class MqApplication {
//获取这个东西是,为了手动获取bean对象而准备的
public static ConfigurableApplicationContext context;
public static void main(String[] args) {
context = SpringApplication.run(MqApplication.class, args);
}
}
测试 DataManager
使⽤ Spring ⾃带的单元测试, 针对上述代码进⾏测试验证.
在 test ⽬录中, 创建 DataManagerTest
准备工作,为了让一组测试数据不影响另一组测试数据,要进行准备和首尾两个工作如下的setAll和setUp,setDown。
• @SpringBootTest 注解表⽰该类是⼀个测试类.
• @BeforeAll 在所有测试执⾏之前执⾏. 此处先删除之前的数据库, 避免⼲扰.
• @BeforeEach 每个测试⽤例之前执⾏. ⼀般⽤来做准备⼯作. 此处进⾏数据库初始化, 以及针对Spring 服务的初始化.
• @AfterEach 每个测试⽤例之后执⾏. ⼀般⽤来做收尾⼯作. 此处需要先关闭 Spring 服务, 再删除数据库.
@SpringBootTest
public class DataManagerTest {
//为了不让 一组测试数据影响另一测试数据,我们会进行 准备和收尾两个工作:
private static DataManager dataManager = new DataManager();
@BeforeAll
public static void setAll(){
dataManager.deleteDM();
}
@BeforeEach
public void setUp(){
//由于要想执行init方法,需要context参数,这一句就是将程序运行起来,给context赋值
MqApplication.context = SpringApplication.run(MqApplication.class);
dataManager.init();
}
@AfterEach
public void setDown(){
//这个close操作一方面:获取context对象会占用8080端口,close操作会将8080端口 释放掉。
// 另一方面是在Windows环境中如果打开的文件没有关闭,此时删除文件是失败的。
//此时的context对象获取了metaMapper,而metaMapper又打开了数据库文件menta.db
//要知道在windows环境下,删除一个已经打开的文件是会失败的。所以,此时要先关闭掉context对象。
MqApplication.context.close();
dataManager.deleteDM();
}
}
编写测试用例:
• @Test 注解表⽰⼀个测试⽤例.
• Assertions 是断⾔, ⽤来断定执⾏结果.
• 每个⽤例执⾏之前, 都会⾃动调⽤到 setUp, 每次⽤例执⾏结束之后, 都会⾃动调⽤ tearDown
我们使用的是Assertions.assertEquals(X,y)断言,X表示一个常量,y是我们要判断的值,如果X == y, 断言就正确,反之报错。
//测试 插入交换机操作是否正确:
public Exchange createExchange(String name){
Exchange exchange = new Exchange();
exchange.setName(name);
exchange.setType(ExchangeType.TOPIC);
exchange.setDurable(true);
exchange.setAutoDelete(false);
Map<String,Object> argument = new HashMap<>();
argument.put("aaa",1);
argument.put("bbb",2);
exchange.setArguments(argument);
return exchange;
}
//测试exchange insert,和select ,没有问题
@Test
public void insertExchangeTest(){
//先执行插入操作:
Exchange exchange = createExchange("111");
dataManager.insertExchange(exchange);
List<Exchange> list = dataManager.selectExchange();
Assertions.assertEquals("111",list.get(1).getName());
Assertions.assertEquals(2,list.size());
Assertions.assertEquals(ExchangeType.TOPIC,list.get(1).getType());
Assertions.assertEquals(true,list.get(1).isDurable());
Assertions.assertEquals(false,list.get(1).isAutoDelete());
Assertions.assertEquals(1,list.get(1).getArguments("aaa"));
Assertions.assertEquals(2,list.get(1).getArguments("bbb"));
}
//测试exchange delete,没有问题:
@Test
public void deleteExchangeTest(){
Exchange exchange = createExchange("111");
dataManager.insertExchange(exchange);
List<Exchange> list = dataManager.selectExchange();
Assertions.assertEquals("111",list.get(1).getName());
Assertions.assertEquals(2,list.size());
Assertions.assertEquals(ExchangeType.TOPIC,list.get(1).getType());
dataManager.deleteExchange("111");
List<Exchange> list2 = dataManager.selectExchange();
Assertions.assertEquals("",list2.get(0).getName());
Assertions.assertEquals(1,list2.size());
Assertions.assertEquals(ExchangeType.DIRECT,list2.get(0).getType());
}
private MESGQueue createMESGQueue(){
MESGQueue mesgQueue = new MESGQueue();
mesgQueue.setName("queue");
mesgQueue.setDurable(true);
mesgQueue.setExclusive(false);
mesgQueue.setAutoDelete(false);
Map<String, Object> map = new HashMap<>();
map.put("aaa",111);
map.put("bbb",222);
mesgQueue.setArguments(map);
return mesgQueue;
}
//测试 MESGQueue 插入,和查询 没有问题:
@Test
public void insertMESGQueueTest(){
MESGQueue mesgQueue = createMESGQueue();
dataManager.insertMESGQueue(mesgQueue);
List<MESGQueue> list = dataManager.selectMESGQueue();
Assertions.assertEquals("queue",list.get(0).getName());
Assertions.assertEquals(true,list.get(0).isDurable());
Assertions.assertEquals(false,list.get(0).isAutoDelete());
Assertions.assertEquals(false,list.get(0).isExclusive());
Assertions.assertEquals(111,list.get(0).getArguments("aaa"));
Assertions.assertEquals(222,list.get(0).getArguments("bbb"));
}
//测试MESGQueue 删除:没有问题:
@Test
public void deleteMESGQueueTest(){
MESGQueue mesgQueue = createMESGQueue();
dataManager.insertMESGQueue(mesgQueue);
List<MESGQueue> list = dataManager.selectMESGQueue();
Assertions.assertEquals(1,list.size());
Assertions.assertEquals("queue",list.get(0).getName());
dataManager.deleteMESGQueue("queue");
List<MESGQueue> list2 = dataManager.selectMESGQueue();
Assertions.assertEquals(0,list2.size());
}
//测试 binding 插入,查询,没有问题
@Test
public void insertBindingTest(){
Binding binding = new Binding();
binding.setExchangeName("exchange");
binding.setQueueName("queue");
binding.setBindingKey("bindingKey");
dataManager.insertBinding(binding);
List<Binding> list = dataManager.selectBinding();
Assertions.assertEquals(1,list.size());
Assertions.assertEquals("exchange",list.get(0).getExchangeName());
Assertions.assertEquals("queue",list.get(0).getQueueName());
Assertions.assertEquals("bindingKey",list.get(0).getBindingKey());
}
//测试binding 删除,没有问题:
@Test
public void deleteBindingTest(){
Binding binding = new Binding();
binding.setExchangeName("exchange");
binding.setQueueName("queue");
binding.setBindingKey("bindingKey");
dataManager.insertBinding(binding);
List<Binding> list = dataManager.selectBinding();
Assertions.assertEquals(1,list.size());
Assertions.assertEquals("exchange",list.get(0).getExchangeName());
Assertions.assertEquals("queue",list.get(0).getQueueName());
Assertions.assertEquals("bindingKey",list.get(0).getBindingKey());
dataManager.deleteBinding(binding);
List<Binding> list2 = dataManager.selectBinding();
Assertions.assertEquals(0,list2.size());
}
测试结果:没有问题
测试也测试完了,此时就要想到我们之前遗留的一个问题:那个getter 和setter方法。我们知道:Myabatis帮我们自动管理数据库的时候,会自动调用getter和setter方法(具体来说:比如我们插入一个对象,此时传入的是一个对象,该对象含有各种要插入的属性。传入的不是直接的参数,此时Mybatis就会调用对象里的getter和setter方法手动的获取各种插入的属性),此时的getter和setter方法说白了就是因为数据库而写的,就像那句话:“就是因为这盘醋才包的饺子”。但是,你想一想,如果你想在代码之间直接获取到各种参数呢?而不是在代码与数据库之间。我们其实更希望返回的是一个字符串,如果按照之前的写法,对于arguments,我们设置的是一个map,返回的是一个map,可见性太差了,所以就要将getter和setter方法重载一下了(所有含有arguments的都要重载),这也是缺失的那部分代码。
Exchange类
public String getArguments() {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(this.arguments);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void setArguments(String arguments) {
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String, Object>>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
- MESGQueue类
public String getArguments() {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(this.arguments);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void setArguments(String arguments) {
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String, Object>>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
七. 消息存储设计
消息需要在硬盘上存储. 但是并不直接放到数据库中, ⽽是直接使⽤⽂件存储.
原因如下:
- 对于消息的操作并不需要复杂的 增删改查 .
- 对于⽂件的操作效率⽐数据库会⾼很多.
我们给每个队列分配⼀个⽬录. ⽬录的名字为 data + 队列名. 形如 ./data/testQueue
该⽬录中包含两个固定名字的⽂件.
- queue_data.txt 消息数据⽂件, ⽤来保存消息内容.
- queue_data.txt ⽂件格式:
使⽤⼆进制⽅式存储.
每个消息分成两个部分:- 前四个字节, 表⽰ Message 对象的⻓度(字节数)
- 后⾯若⼲字节, 表⽰ Message 内容.
- 消息和消息之间⾸尾相连.
- queue_stat.txt 消息统计⽂件, ⽤来保存消息统计信息.
- queue_stat.txt ⽂件格式:
使⽤⽂本⽅式存储.
⽂件中只包含⼀⾏, ⾥⾯包含两列(都是整数), 使⽤ \t 分割.
第⼀列表⽰当前总的消息数⽬. 第⼆列表⽰有效消息数⽬.
形如:
2000\t1500
创建MessageFileManager类
public class MessageFileManager {
//表示消息统计文件
static public class Stat{
public int totalCount;//总消息数量
public int validCount;//有效消息数量
}
//约定消息文件所在的路径:
//这个方法用于获取指定消息文件 所在的路径:
private String getQueueDir(String queueName){
return "./data/"+queueName;
}
//获取 指定消息数据文件 所在的路径:
//一般文本文件已.txt结尾,此处也能用,就不改了。
private String getQueueDataDir(String queueName){
return getQueueDir(queueName)+"/queue_data.txt";
}
//获取 指定消息数据统计文件 所在路径:
private String getQueueStatDir(String queueName){
return getQueueDir(queueName)+"/queue_stat.txt";
}
private Stat readStat(String queueName){
//由于上面定义的是文本文件,所以直接可以使用Scanner来访问。
//这里有一处细节,如果我们将流对象写到try()括号里,等到结束的时候,会自动的关闭流,就不用我们手动关闭了。
//流对象一定要记得关闭
try (InputStream inputStream = new FileInputStream(getQueueStatDir(queueName))){
Scanner sc = new Scanner(inputStream);
Stat stat = new Stat();
stat.totalCount = sc.nextInt();
stat.validCount = sc.nextInt();
return stat;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void writeStat(String queueName,Stat stat){
try (OutputStream outputStream = new FileOutputStream(getQueueStatDir(queueName))){
//当我们每次打开文件的时候,如果不在这个fileOutputStream第二个参数加true,就会在打开前将文件清空
//加上了之后,就成了拼接了,此时我们先不加,在后面用到的时候再说。
// OutputStream outputStream = new FileOutputStream(getQueueStatDir(queueName),true);
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.write(stat.totalCount+"/t"+stat.validCount);
printWriter.flush();//保证数据从缓冲区刷新到 硬盘 中。
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}