Java项目--仿RabbitMQ的消息队列--网络通信协议设计
目录
一、引言
二、设计
三、代码
1.Request
2.Response
3.BasicArguments
4.BasicReturns
四、方法类
1.创建交换机
2.删除交换机
3.创建队列
4.删除队列
5.创建绑定
6.删除绑定
7.消息发布
8.消费消息
9.集中返回
五、实现Broker Server类
六、实现连接
1.connectionFactory类
2.connection类
3.channel类
七、总结
一、引言
本篇文章就介绍一下本次项目的最后一个大的部分,网络通信协议的设计。
二、设计
生产者和消费者都是客户端,都需要通过网络和Broker Server进行通信。
此处使用TCP协议,来作为通信的底层协议,同时在这个基础上自定义应用层协议,完成客户端对服务器这边功能的远程调用。
三、代码
1.Request
public class Request {
private int type;
private int length;
private byte[] payload;
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getPayload() {
return payload;
}
public void setPayload(byte[] payload) {
this.payload = payload;
}
}
2.Response
public class Response {
private int type;
private int length;
private byte[] payload;
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public byte[] getPayload() {
return payload;
}
public void setPayload(byte[] payload) {
this.payload = payload;
}
}
3.BasicArguments
/*
Request的payload
*/
public class BasicArguments implements Serializable {
protected String rid;
protected String channelId;
public String getRid() {
return rid;
}
public void setRid(String rid) {
this.rid = rid;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
}
4.BasicReturns
/*
Response的payload
*/
public class BasicReturns implements Serializable {
protected String rid;
protected String channelId;
protected boolean ok;
public String getRid() {
return rid;
}
public void setRid(String rid) {
this.rid = rid;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public boolean isOk() {
return ok;
}
public void setOk(boolean ok) {
this.ok = ok;
}
}
四、方法类
对于每个VirtualHost提供的方法都要有一个类表示对应的参数
1.创建交换机
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {
private String exchangeName;
private ExchangeType exchangeType;
private boolean durable;
private boolean autoDelete;
private Map<String,Object> arguments;
}
2.删除交换机
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {
private String exchangeName;
}
3.创建队列
public class QueueDeclareArguments extends BasicArguments implements Serializable {
private String queueName;
private boolean durable;
private boolean exclusive;
private boolean autoDelete;
private Map<String,Object> arguments;
}
4.删除队列
public class QueueDeleteArguments extends BasicArguments implements Serializable {
private String queueName;
}
5.创建绑定
public class QueueBindArguments extends BasicArguments implements Serializable {
private String exchangeName;
private String queueName;
private String bindingKey;
}
6.删除绑定
public class QueueUnBindArguments extends BasicArguments implements Serializable {
private String exchangeName;
private String queueName;
}
7.消息发布
public class BasicPublishArguments extends BasicArguments implements Serializable {
private String exchangeName;
private String routingKey;
private BasicProperties basicProperties;
private byte[] body;
}
8.消费消息
public class BasicConsumeArguments {
private String consumerTag;
private String queueName;
private boolean autoAck;
}
9.集中返回
public class SubScribeReturns extends BasicArguments implements Serializable {
private String consumerTag;
private BasicProperties basicProperties;
private byte[] body;
}
五、实现Broker Server类
public class BrokerServer {
private ServerSocket serverSocket = null;
private VirtualHost virtualHost = new VirtualHost("default");
private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String,Socket>();
private ExecutorService executorService = null;
private volatile boolean runnbale = true;
public BrokerServer(int port) throws IOException {
serverSocket = new ServerSocket(port);
}
public void start() throws IOException {
System.out.println("[BrokerServer] 启动!");
executorService = Executors.newCachedThreadPool();
try{
while (runnbale){
Socket clientSocket = serverSocket.accept();
executorService.submit(() ->{
processConnection(clientSocket);
});
}
}catch (SocketException e){
System.out.println("[BrokerServer]服务器停止运行!");
}
}
public void stop() throws IOException {
runnbale = false;
executorService.shutdownNow();
serverSocket.close();
}
private void processConnection(Socket clientSocket){
try(InputStream inputStream = clientSocket.getInputStream(); OutputStream outputStream = clientSocket.getOutputStream()){
try(DataInputStream dataInputStream = new DataInputStream(inputStream)
; DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){
while (true){
Request request = readRequest(dataInputStream);
Response response = process(request,clientSocket);
writeResponse(dataOutputStream,response);
}
}catch (EOFException | SocketException e){
System.out.println("[BrokerServer] connection关闭!客户端地址:"+clientSocket.getInetAddress().toString()
+"端口号:"+clientSocket.getPort());
}
}catch (Exception e){
System.out.println("[BrokerServer] connection连接出现异常!");
e.printStackTrace();
}finally {
try{
clientSocket.close();
clearClosedSession(clientSocket);
}catch (IOException e){
e.printStackTrace();
}
}
}
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
// 1.把request中的payload做初步解析
BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());
System.out.println("[Request] rid="+basicArguments.getRid()+",channelId=" +basicArguments.getChannelId()+",type="
+request.getType()+",length="+request.getLength());
// 2.根据type的值区分要做什么操作
boolean ok = true;
if(request.getType()==0x1){
// 创建channel
sessions.put(basicArguments.getChannelId(),clientSocket);
System.out.println("[BrokerServer]创建channel完成!channelId="+basicArguments.getChannelId());
} else if (request.getType()==0x2) {
// 销毁channel
sessions.remove(basicArguments.getChannelId());
System.out.println("[BrokerServer]销毁channel完成!channelId="+basicArguments.getChannelId());
} else if (request.getType()==0x3) {
// 创建交换机
ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;
ok = virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getExchangeType()
,arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());
} else if (request.getType()==0x4) {
ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;
ok = virtualHost.exchangeDelete(arguments.getExchangeName());
} else if (request.getType()==0x5) {
QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;
ok = virtualHost.queueDeclare(arguments.getQueueName(),arguments.isDurable(),arguments.isExclusive()
,arguments.isAutoDelete(),arguments.getArguments());
} else if (request.getType()==0x6) {
QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;
ok = virtualHost.queueDelete(arguments.getQueueName());
} else if (request.getType()==0x7) {
QueueBindArguments arguments = (QueueBindArguments) basicArguments;
ok = virtualHost.queueBind(arguments.getExchangeName(),arguments.getQueueName(),arguments.getBindingKey());
} else if (request.getType()==0x8) {
QueueUnBindArguments arguments = (QueueUnBindArguments) basicArguments;
ok = virtualHost.queueUnBind(arguments.getExchangeName(),arguments.getQueueName());
} else if (request.getType()==0x9) {
BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;
ok = virtualHost.basicPublish(arguments.getExchangeName(),arguments.getRoutingKey()
,arguments.getBasicProperties(),arguments.getBody());
}else if (request.getType()==0xa){
BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;
ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {
@Override
// 回调函数:把服务器收到的消息直接推送回对应的消费者客户端
public void handleDelivery(String consumeTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
// 根据consumeTag 其实是channelId 去sessions中查询,找到对应的socket对象
// 1.根据channelId找到socket对象
Socket clientSocket = sessions.get(consumeTag);
if(clientSocket==null || clientSocket.isClosed()){
throw new MqException("[BrokerServer]订阅消息的客户端已经关闭!");
}
// 2.构造响应数据
SubScribeReturns subScribeReturns = new SubScribeReturns();
subScribeReturns.setChannelId(consumeTag);
subScribeReturns.setRid(""); // 此处只有响应,没有请求,不需要去对应
subScribeReturns.setOk(true);
subScribeReturns.setConsumerTag(consumeTag);
subScribeReturns.setBasicProperties(basicProperties);
subScribeReturns.setBody(body);
byte[] payload = BinaryTool.toBytes(subScribeReturns);
Response response = new Response();
// 0xc 表示服务器给消费者客户端推送的消息数据
response.setType(0xc);
response.setLength(payload.length);
response.setPayload(payload);
// 3.把数据写回客户端
DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());
writeResponse(dataOutputStream,response);
}
});
} else if (request.getType()==0xb) {
// 调用basicAck来确认消息
BasicAckArguments arguments = (BasicAckArguments) basicArguments;
ok = virtualHost.basicAck(arguments.getQueueName(),arguments.getMessageId());
}else {
// 当前的type是非法的
throw new MqException("[BrokerServer] 未知的type!type="+request.getType());
}
// 3.构造响应
BasicReturns basicReturns = new BasicReturns();
basicReturns.setChannelId(basicArguments.getChannelId());
basicReturns.setRid(basicArguments.getRid());
basicReturns.setOk(ok);
byte[] payload = BinaryTool.toBytes(basicReturns);
Response response = new Response();
response.setType(request.getType());
response.setLength(payload.length);
response.setPayload(payload);
System.out.println("[Response] rid="+basicReturns.getRid()+",channelId="+basicReturns.getChannelId()
+",type="+response.getType()+",length="+response.getLength());
return response;
}
private void writeResponse(DataOutputStream dataOutputStream,Response response) throws IOException {
dataOutputStream.writeInt(response.getType());
dataOutputStream.writeInt(response.getLength());
dataOutputStream.write(response.getPayload());
dataOutputStream.flush();
}
private Request readRequest(DataInputStream dataInputStream) throws IOException {
Request request = new Request();
request.setType(dataInputStream.readInt());
request.setLength(dataInputStream.readInt());
byte[] payload = new byte[request.getLength()];
int n = request.getLength();
if(n!=request.getLength()){
throw new IOException("读取请求格式出错!");
}
request.setPayload(payload);
return request;
}
private void clearClosedSession(Socket clientSocket){
List<String> toDeleteChannelId = new ArrayList<>();
for(Map.Entry<String,Socket> entry:sessions.entrySet()){
if(entry.getValue()==clientSocket){
toDeleteChannelId.add(entry.getKey());
}
}
for(String channelId:toDeleteChannelId){
sessions.remove(channelId);
}
System.out.println("[BrokerServer] 清理session完成!被清理的channelId="+toDeleteChannelId);
}
}
六、实现连接
1.connectionFactory类
public class ConnectionFactory {
private String host;
private int port;
public Connection newConnection(){
Connection connection = new Connection(host,port);
return connection;
}
}
2.connection类
public class Connection {
private Socket socket =null;
private ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();
private InputStream inputStream;
private OutputStream outputStream;
private DataInputStream dataInputStream;
private DataOutputStream dataOutputStream;
private ExecutorService callbackPool = null;
public Connection(String host,int port) throws IOException {
socket = new Socket(host,port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
dataInputStream = new DataInputStream(inputStream);
dataOutputStream = new DataOutputStream(outputStream);
callbackPool = Executors.newFixedThreadPool(4);
Thread t = new Thread(() ->{
try {
while(!socket.isClosed()){
Response response = readResponse();
dispatchResponse(response);
}
}catch (SocketException e){
System.out.println("[Connection] 连接正常断开!");
}catch (IOException | ClassNotFoundException | MqException e){
System.out.println("[Connection] 连接异常断开!");
e.printStackTrace();
}
});
}
public void close(){
try {
callbackPool.shutdownNow();
channelMap.clear();
dataOutputStream.close();
dataInputStream.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
if(response.getType()==0xc){
SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
Channel channel = channelMap.get(subScribeReturns.getChannelId());
if(channel==null){
throw new MqException("[Connection] 该消息对应的channel在客户端中不存在!channelId="+channel.getChannelId());
}
callbackPool.submit(() ->{
try {
channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag()
, subScribeReturns.getBasicProperties(), subScribeReturns.getBody());
}catch (MqException | IOException e){
e.printStackTrace();
}
});
}else {
BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
Channel channel = channelMap.get(basicReturns.getChannelId());
if(channel==null){
throw new MqException("[Connection] 该消息对应的channel在客户端中不存在!channelId="+channel.getChannelId());
}
channel.putReturns(basicReturns);
}
}
public void writeRequest(Request request) throws IOException {
dataOutputStream.writeInt(request.getType());
dataOutputStream.writeInt(request.getLength());
dataOutputStream.write(request.getPayload());
dataOutputStream.flush();
System.out.println("[Connection]发送请求!type="+request.getType()+",length="+request.getLength());
}
public Response readResponse() throws IOException {
Response response = new Response();
response.setType(dataInputStream.readInt());
response.setLength(dataInputStream.readInt());
byte[] payload = new byte[response.getLength()];
int n = dataInputStream.read(payload);
if(n!=response.getLength()){
throw new IOException("响应的数据不完整!");
}
response.setPayload(payload);
System.out.println("[Connection] 收到响应!type="+response.getType()+",length="+response.getType());
return response;
}
public Channel createChannel(){
String channelId = "C-"+ UUID.randomUUID().toString();
Channel channel = new Channel(channelId,this);
System.out.println(channelId);
channelMap.put(channelId,channel);
boolean ok = channel.createChannel();
if(!ok){
channelMap.remove(channelId);
return null;
}
return channel;
}
}
3.channel类
public class Channel {
private String channelId;
private Connection connection;
private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
private Consumer consumer = null;
public Channel(String channelId,Connection connection){
this.channelId = channelId;
this.connection = connection;
}
public boolean createChannel() throws IOException {
BasicArguments basicArguments = new BasicArguments();
basicArguments.setChannelId(channelId);
basicArguments.setRid(generateRid());
System.out.println(basicArguments.getChannelId());
System.out.println(basicArguments.getRid());
byte[] payload = BinaryTool.toBytes(basicArguments);
System.out.println(payload);
Request request = new Request();
request.setType(0x1);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns =waitResult(basicArguments.getRid());
return basicReturns.isOk();
}
private String generateRid(){
return "R-"+ UUID.randomUUID().toString();
}
private BasicReturns waitResult(String rid){
BasicReturns basicReturns =null;
while((basicReturns = basicReturnsMap.get(rid))==null){
synchronized (this){
try {
wait();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
basicReturnsMap.remove(rid);
return basicReturns;
}
public void putReturns(BasicReturns basicReturns){
basicReturnsMap.put(basicReturns.getRid(),basicReturns);
synchronized (this){
notifyAll();
}
}
public boolean close() throws IOException {
BasicArguments basicArguments = new BasicArguments();
basicArguments.setRid(generateRid());
basicArguments.setChannelId(channelId);
byte[] payload = BinaryTool.toBytes(basicArguments);
Request request = new Request();
request.setType(0x2);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(basicArguments.getRid());
return basicReturns.isOk();
}
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType
, boolean durable, boolean autoDelete, Map<String,Object> arguments) throws IOException {
ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
exchangeDeclareArguments.setRid(generateRid());
exchangeDeclareArguments.setChannelId(channelId);
exchangeDeclareArguments.setExchangeName(exchangeName);
exchangeDeclareArguments.setExchangeType(exchangeType);
exchangeDeclareArguments.setDurable(durable);
exchangeDeclareArguments.setAutoDelete(autoDelete);
exchangeDeclareArguments.setArguments(arguments);
byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);
Request request = new Request();
request.setType(0x3);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
return basicReturns.isOk();
}
public boolean exchangeDelete(String exchangeName) throws IOException {
ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();
exchangeDeleteArguments.setRid(generateRid());
exchangeDeleteArguments.setChannelId(channelId);
exchangeDeleteArguments.setExchangeName(exchangeName);
byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);
Request request = new Request();
request.setType(0x4);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns =waitResult(exchangeDeleteArguments.getRid());
return basicReturns.isOk();
}
public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete
,Map<String,Object> arguments) throws IOException {
QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
queueDeclareArguments.setRid(generateRid());
queueDeclareArguments.setChannelId(channelId);
queueDeclareArguments.setQueueName(queueName);
queueDeclareArguments.setDurable(durable);
queueDeclareArguments.setExclusive(exclusive);
queueDeclareArguments.setExclusive(autoDelete);
queueDeclareArguments.setArguments(arguments);
byte[] payload= BinaryTool.toBytes(queueDeclareArguments);
Request request = new Request();
request.setType(0x5);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
return basicReturns.isOk();
}
public boolean queueDelete(String queueName) throws IOException {
QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();
queueDeleteArguments.setRid(generateRid());
queueDeleteArguments.setChannelId(channelId);
queueDeleteArguments.setQueueName(queueName);
byte[] payload = BinaryTool.toBytes(queueDeleteArguments);
Request request = new Request();
request.setType(0x6);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());
return basicReturns.isOk();
}
public boolean queueBind(String exchangeName,String queueName,String bindingKey) throws IOException {
QueueBindArguments queueBindArguments = new QueueBindArguments();
queueBindArguments.setRid(generateRid());
queueBindArguments.setChannelId(channelId);
queueBindArguments.setExchangeName(exchangeName);
queueBindArguments.setQueueName(queueName);
queueBindArguments.setBindingKey(bindingKey);
byte[] payload = BinaryTool.toBytes(queueBindArguments);
Request request = new Request();
request.setType(0x7);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns =waitResult(queueBindArguments.getRid());
return basicReturns.isOk();
}
public boolean queueUnBind(String exchangeName,String queueName) throws IOException {
QueueUnBindArguments queueUnBindArguments = new QueueUnBindArguments();
queueUnBindArguments.setRid(generateRid());
queueUnBindArguments.setChannelId(channelId);
queueUnBindArguments.setExchangeName(exchangeName);
queueUnBindArguments.setQueueName(queueName);
byte[] payload = BinaryTool.toBytes(queueUnBindArguments);
Request request = new Request();
request.setType(0x8);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(queueUnBindArguments.getRid());
return basicReturns.isOk();
}
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties,byte[] body) throws IOException {
BasicPublishArguments basicPublishArguments = new BasicPublishArguments();
basicPublishArguments.setRid(generateRid());
basicPublishArguments.setChannelId(channelId);
basicPublishArguments.setRoutingKey(routingKey);
basicPublishArguments.setBasicProperties(basicProperties);
basicPublishArguments.setBody(body);
byte[] payload = BinaryTool.toBytes(basicPublishArguments);
Request request = new Request();
request.setType(0x9);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(basicPublishArguments.getRid());
return basicReturns.isOk();
}
public boolean basicConsume(String queueName,boolean autoAck,Consumer consumer) throws MqException, IOException {
if(this.consumer!=null){
throw new MqException("[Channel] 已经设置过回调函数了!不能重复设置!");
}
this.consumer = consumer;
BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();
basicConsumeArguments.setRid(generateRid());
basicConsumeArguments.setChannelId(channelId);
basicConsumeArguments.setConsumerTag(channelId);
basicConsumeArguments.setQueueName(queueName);
basicConsumeArguments.setAutoAck(autoAck);
byte[] payload = BinaryTool.toBytes(basicConsumeArguments);
Request request = new Request();
request.setType(0xa);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());
return basicReturns.isOk();
}
public boolean basicAck(String queueName,String messageId) throws IOException {
BasicAckArguments basicAckArguments = new BasicAckArguments();
basicAckArguments.setRid(generateRid());
basicAckArguments.setChannelId(channelId);
basicAckArguments.setQueueName(queueName);
basicAckArguments.setMessageId(messageId);
byte[] payload = BinaryTool.toBytes(basicAckArguments);
Request request = new Request();
request.setType(0xb);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(basicAckArguments.getRid());
return basicReturns.isOk();
}
}
七、总结
本篇文章就是本次Java项目“模拟消息队列”的最后一个大的部分了,下一篇文章就是对所写的这个项目进行一个案例编写,然后对此项目进行扩展。感谢观看!