当前位置: 首页 > article >正文

RabbitMQ的主题模式

主题模式

image-20230810180351000

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:# 匹配零个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

生产者代码
public class TopicProducer {
	public static void main(String[] args) throws Exception {
		//1.创建连接
		ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("192.168.64.140");
		cf.setUsername("guest");
		cf.setPassword("guest");
		
		Connection nc = cf.newConnection();
		Channel cc = nc.createChannel();
		
		//2.定义交换机
		cc.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
		//3.发送数据
		while(true) {
			Scanner scanner = new Scanner(System.in);
			System.out.println("消息:");
			String s = scanner.nextLine();
			System.out.println("路由键:");
			String key=scanner.nextLine();
			cc.basicPublish("topic_logs", key, null, s.getBytes());
			System.out.println("=======================================");
		}
	}
}
消费者代码
public class TopicConsumer {
	public static void main(String[] args) throws Exception {
		//1.创建连接
		ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("192.168.64.140");
		cf.setUsername("guest");
		cf.setPassword("guest");
		
		Connection nc = cf.newConnection();
		Channel cc = nc.createChannel();
		//2.定义交换机、队列、绑定
		cc.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
		String queue = cc.queueDeclare().getQueue();
		System.out.print("输入绑定键:");
		String s = new Scanner(System.in).nextLine();
		String[] a = s.split("\\s+");
		for (String key : a) {
			cc.queueBind(queue, "topic_logs", key);
		}
		//3.处理消息
		DeliverCallback deliverCallback = new DeliverCallback() {
			
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				byte[] a = message.getBody();
				String s = new String(a);
				String key = message.getEnvelope().getRoutingKey();
				System.out.println(s+"--"+key);
				System.out.println("========================================");
			}
		};
		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		cc.basicConsume(queue, true,deliverCallback,cancelCallback);
	}
}

RPC模式

img

如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为远程过程调用,即RPC。

在本节中,我们将会学习使用RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器。为了模拟一个耗时任务,我们将创建一个返回斐波那契数列的虚拟的RPC服务。

客户端

在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果

RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println( "第四个斐波那契数是: " + result);
回调队列

使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:

//定义回调队列,
//自动生成对列名,非持久,独占,自动删除
callbackQueueName = ch.queueDeclare().getQueue();

//用来设置回调队列的参数对象
BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();
//发送调用消息
ch.basicPublish("", "rpc_queue", props, message.getBytes());
消息属性 Message Properties
AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:
deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。
contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json。
replyTo:通常用于指定回调队列。
correlationId:将RPC响应与请求关联起来非常有用。
关联id(correlationId)

在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。

这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。

服务端代码
public class RPCServer {
	public static void main(String[] args) throws Exception {
		//1.接受客户端发送的调用信息(正整数n)
		//2.执行算法求第n个斐波那契数的结果
		//3.向客户端发送计算结果
		ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("192.168.64.140");
		cf.setPort(5672);
		cf.setUsername("guest");
		cf.setPassword("guest");
		Connection nc = cf.newConnection();
		Channel cc = nc.createChannel();
		
		cc.queueDeclare("rpc_queue", false, false, false, null);
		DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				//从message中取出消息:n,返回队列的队列名,关联id
				String s = new String(message.getBody());
				String replyTo = message.getProperties().getReplyTo();//返回队列的队列名
				String cid = message.getProperties().getCorrelationId();
				long fbnq = fbnq(Integer.parseInt(s));
				BasicProperties props = new BasicProperties.Builder().correlationId(cid).build();
				cc.basicPublish("", replyTo, props, (""+fbnq).getBytes());
			}
		};
		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		cc.basicConsume("rpc_queue", true,deliverCallback,cancelCallback);
	}
	
	//服务:接受一个整数值n,求第n个斐波那契数
	//1 1 2 3 5 8 13 21 34 55 89 144 ......
	//递归求斐波那契数,递归的效率是非常低的
	//递归效率低,可以用来模拟服务器端的耗时运算
	static long fbnq(int n) {
		if(n==1 || n==2) {
			return 1;
		}
		return fbnq(n-1)+fbnq(n-2);
	}
}
客户端代码
public class RPCClient {
	static BlockingQueue<Long> q=new ArrayBlockingQueue<Long>(10);
	public static void main(String[] args) throws Exception {
		System.out.print("输入求第几个斐波那契数:");
		int n = new Scanner(System.in).nextInt();
		long fbnq=fbnq(n);
	    System.out.println(fbnq);
	}
	//异步调用服务器,从服务器获取结果
	private static long fbnq(int n) throws Exception {
		ConnectionFactory cf = new ConnectionFactory();
		cf.setHost("192.168.64.140");
		cf.setPort(5672);
		cf.setUsername("admin");
		cf.setPassword("admin");
		Connection nc = cf.newConnection();
		Channel cc = nc.createChannel();
		//定义发送调用消息的队列
		cc.queueDeclare("rpc_queue", false, false, false, null);
		cc.queuePurge("rpc_queue");
		//返回队列
		String replyTo = cc.queueDeclare().getQueue();
		//关联id
		String cid = UUID.randomUUID().toString();
		 BasicProperties props = new BasicProperties.Builder()
				 .replyTo(replyTo)
				 .correlationId(cid)
				 .build();
		cc.basicPublish("", "rpc_queue", props, (""+n).getBytes());
		//模拟执行其他运算,不等待计算结果
		System.out.println("调用消息已发送");
		System.out.println("模拟执行其他运算,不立即等待计算结果");
		//获取计算结果
		DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
			    //处理数据之前,先比对关联id
				if(cid.equals(message.getProperties().getCorrelationId())) {
					String s=new String(message.getBody());
					long fbnq = Integer.parseInt(s);
					q.offer(fbnq);
					ch.getConnection().close();
				}
			}
		};
		CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		cc.basicConsume(replyTo, true,deliverCallback,cancelCallback);
		return q.take();
	}
}

上一篇文章:https://blog.csdn.net/Z0412_J0103/article/details/143355002icon-default.png?t=O83Ahttps://blog.csdn.net/Z0412_J0103/article/details/143355002下一篇文章: 


http://www.kler.cn/a/377358.html

相关文章:

  • 05LangChain实战课 - 提示工程与FewShotPromptTemplate的应用
  • 基于SpringBoot的免税商品优选购物商城的设计与实现
  • 高效集成金蝶云星空销售出库单的解决方案
  • 高亚科技签约酸动力,助力研发管理数字化升级
  • 嵌入式Linux入门具备:C语言基础与基本驱动学习(2):Linux GIibc IO基础
  • C++ | Leetcode C++题解之第540题有序数组中的单一元素
  • ensp中acl的使用
  • Vue页面带参数跳转
  • UE5 材质篇 0 创建一个材质
  • 如何在社媒平台上使用代理IP来保护帐号安全
  • solidity selfdestruct合约销毁
  • C语言专题
  • CSS元素类型(二)
  • 单个相机矫正畸变
  • 【图解版】力扣第121题:买卖股票的最佳时机
  • 使用贪心策略求解糖果罐调整次数
  • C# 单个函数实现各进制数间转换
  • 设计模式 - 简单工厂模式
  • 使用官网tar包制作OpenSSL及OpenSSH rpm包进行升级安装(OpenSSH_9.9p1, without OpenSSL未解决)
  • 在平衡中追寻高度:探秘AVL树的自我调节之美
  • 基础算法——排序算法(冒泡排序,选择排序,堆排序,插入排序,希尔排序,归并排序,快速排序,计数排序,桶排序,基数排序,Java排序)
  • 【已解决】element-plus配置主题色后,sass兼容问题。set-color-mix-level() is...in Dart Sass 3
  • 分布式光伏系统开发数字化解决方案
  • ASRPRO 记事本2
  • Linux——— 信号
  • Flutter加载本地HTML的优雅解决方案:轻松实现富文本展示