Rabbitmq中得RPC调用代码详解
文章目录
- 1.RPC客户端
- 2.RabbitMQ连接信息实体类
- 3.XML工具类
本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得
直接上代码了
1.RPC客户端
RPC客户端
/**
* @ClassName: RPCClient
* @Description: RPC 客户端
* @Author: XHao
* @Date: 2024/8/30 11:14
*/
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "XYG.HS.MES.PRD.CNXsvr";
private String replyQueueName;
public RPCClient() {
}
public RPCClient(XygMqIesConnInfo xygMqIesConnInfo) throws IOException, TimeoutException {
//建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(xygMqIesConnInfo.getHost());
factory.setPort(xygMqIesConnInfo.getPort());
factory.setUsername(xygMqIesConnInfo.getUserName());
factory.setPassword(xygMqIesConnInfo.getPwd());
factory.setVirtualHost("IES");
try {
connection = factory.newConnection();
System.err.println("===============创建通道===============");
channel = connection.createChannel();
System.err.println("===============创建成功===============");
}catch (Exception e){
System.err.println("报错信息=============="+e.getMessage());
}
//定义一个临时变量的接受队列名
System.err.println("===============定义一个临时变量的接受队列名===============");
replyQueueName = channel.queueDeclare().getQueue();
}
//发送RPC请求
public String call(String message) throws IOException, InterruptedException {
//生成一个唯一的字符串作为回调队列的编号
String corrId = UUID.randomUUID().toString();
//发送请求消息,消息使用了两个属性:replyto和correlationId
//服务端根据replyto返回结果,客户端根据correlationId判断响应是不是给自己的
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
.build();
//发布一个消息,requestQueueName路由规则
System.err.println("===============发布一个消息===============");
System.err.println("===============消息内容===============");
System.err.println("==============="+ message + "===============");
System.err.println("===============================");
System.err.println("===============================");
System.err.println("===============================");
System.err.println("===============回调队列的编号===============");
System.err.println("===============请求时间:"+new Date());
System.err.println("==============="+ "correlationId::"+props.getCorrelationId() + "===============");
System.err.println("==============="+ "ReplyTo::"+props.getReplyTo() + "===============");
channel.basicPublish("", requestQueueName, props, message.getBytes(StandardCharsets.UTF_8));
//由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。
//这里我们创建的 容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
//获取响应消息
System.err.println("===============获取响应消息===============");
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
//检查它的correlationId是否是我们所要找的那个
if (properties.getCorrelationId().equals(corrId)) {
//如果是,则响应BlockingQueue
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
public void close() throws IOException {
connection.close();
}
public static JSONObject getResult(XygMqIesConnInfo xygMqIesConnInfo, String msg) {
if (Objects.isNull(xygMqIesConnInfo) || Objects.isNull(msg)) {
return null;
}
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient(xygMqIesConnInfo);
response = fibonacciRpc.call(msg);
System.err.println("响应消息+"+response);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
throw new RuntimeException("RPC调用异常");
} catch (TimeoutException e) {
e.printStackTrace();
throw new RuntimeException("RPC调用超时");
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (IOException ignore) {
}
}
}
return XmlUtil.xmlToJson(response);
}
}
2.RabbitMQ连接信息实体类
RabbitMQ连接信息实体类
/**
* @ClassName: XygMqIESConnectionInfo
* @Description: RabbitMQ连接信息
* @Author: XHao
* @Date: 2024/8/22 17:07
*/
@Data
@JsonInclude(JsonInclude.Include.NON_NULL)
@TableName("xyg_mq_ies_conn_info")
@ApiModel(value = "连接信息对象", description = "IES RabbitMQ连接信息表")
public class XygMqIesConnInfo {
private static final long serialVersionUID = 1L;
@TableId(value = "ID", type = IdType.AUTO)
private Long id;
@ApiModelProperty(value = "园区ID")
private String parkId;
@ApiModelProperty(value = "车间编码")
private String workshopCode;
@ApiModelProperty(value = "主机地址")
private String host;
@ApiModelProperty(value = "端口")
private Integer port;
@ApiModelProperty(value = "用户名")
private String userName;
@ApiModelProperty(value = "密码")
private String pwd;
@ApiModelProperty(value = "队列名称")
private String queueName;
@ApiModelProperty(value = "交换机名称")
private String exchangeName;
@ApiModelProperty(value = "路由键")
private String routingKey;
}
3.XML工具类
XML工具类
/**
* @ClassName: XmlUtil
* @Description: xml 解析与生成工具类
* @Author: XHao
* @Date: 2024/8/20 14:28
*/
public class XmlUtil {
/**
* XML节点转换JSON对象
*
* @param element 节点
* @param object 新的JSON存储
* @return JSON对象
*/
private static JSONObject xmlToJson(Element element, JSONObject object) {
List<Element> elements = element.elements();
for (Element child : elements) {
Object value = object.get(child.getName());
Object newValue;
if (child.elements().size() > 0) {
JSONObject jsonObject = xmlToJson(child, new JSONObject(true));
if (!jsonObject.isEmpty()) {
newValue = jsonObject;
} else {
newValue = child.getText();
}
} else {
newValue = child.getText();
}
List<Attribute> attributes = child.attributes();
if (!attributes.isEmpty()) {
JSONObject attrJsonObject = new JSONObject();
for (Attribute attribute : attributes) {
attrJsonObject.put(attribute.getName(), attribute.getText());
attrJsonObject.put("content", newValue);
}
newValue = attrJsonObject;
}
if (newValue != null) {
if (value != null) {
if (value instanceof JSONArray) {
((JSONArray) value).add(newValue);
} else {
JSONArray array = new JSONArray();
array.add(value);
array.add(newValue);
object.put(child.getName(), array);
}
} else {
object.put(child.getName(), newValue);
}
}
}
return object;
}
/**
* XML字符串转换JSON对象
*
* @param xmlStr XML字符串
* @return JSON对象
*/
public static JSONObject xmlToJson(String xmlStr) {
JSONObject result = new JSONObject(true);
SAXReader xmlReader = new SAXReader();
try {
Document document = xmlReader.read(new StringReader(xmlStr));
Element element = document.getRootElement();
return xmlToJson(element, result);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* XML文件转换JSON对象
*
* @param xmlString xml字符串
* @param node 选择节点
* @return JSON对象
*/
public static JSONObject xmlToJson(String xmlString, String node) {
JSONObject result = new JSONObject(true);
SAXReader xmlReader = new SAXReader();
try {
//将给定的String文本解析为XML文档并返回新创建的document
org.dom4j.Document document = DocumentHelper.parseText(xmlString);
// Document document = xmlReader.read(file);
Element element;
if (StringUtils.isBlank(node)) {
element = document.getRootElement();
} else {
element = (Element) document.selectSingleNode(node);
}
return xmlToJson(element, result);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 生成xml格式的字符串
*
* @return
*/
public static String createXmlString(XmlParam xmlParam) {
//创建document对象
org.dom4j.Document document = DocumentHelper.createDocument();
//设置编码
document.setXMLEncoding("UTF-8");
//创建根节点
Element message = document.addElement("Message");
// 开始组装 Header 节点
// 在 Header 节点下加入子节点
Element header = message.addElement("Header");
// 组装固定值
for (HeaderEnum h : HeaderEnum.values()) {
Element childNode = header.addElement(h.name());
childNode.setText(h.getValue());
}
// 组装传参值
Map<String, String> headerMap = JSONObject.parseObject(JSONObject.toJSONString(xmlParam.getHeader()), Map.class);
headerMap.forEach((k, v) -> {
Element childNode = header.addElement(k.toUpperCase());
childNode.setText(v);
});
// 组装事务ID,唯一值:当前时间戳
Element transactionId = header.addElement("TRANSACTIONID");
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
transactionId.setText(sdf.format(new Date()) + String.valueOf(Calendar.getInstance().getTimeInMillis()));
Element listener = header.addElement("listener");
listener.setText("QueueListener");
// 开始组装 Body 节点
Element body = message.addElement("Body");
Map<String, String> bodyMap = JSONObject.parseObject(JSONObject.toJSONString(xmlParam.getBody()), Map.class);
bodyMap.forEach((k, v) -> {
if (Objects.isNull(v)) {
return;
}
Element childNode = body.addElement(k.toUpperCase());
childNode.setText(v);
});
//将document对象转换成字符串
String xml = document.asXML();
// 去掉 XML 声明
if (xml.startsWith("<?xml")) {
xml = xml.substring(xml.indexOf(">") + 1);
}
return xml;
}
如果点赞多,评论多会更新详细教程,待补充。