- 导入 Maven 依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.23.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
- 服务端:
package com.qcby.springboot.MQ;
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.commons.lang3.StringUtils;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @ClassName BoyatopMQServer2021
* @Author
* @Version V1.0
**/
public class BoyatopNettyMQServer {
public void bind(int port) throws Exception {
/**
* Netty 抽象出两组线程池BossGroup和WorkerGroup
* BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(bossGroup, workerGroup)
// 设定NioServerSocketChannel 为服务器端
.channel(NioServerSocketChannel.class)
//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
.option(ChannelOption.SO_BACKLOG, 100)
// 服务器端监听数据回调Handler
.childHandler(new BoyatopNettyMQServer.ChildChannelHandler());
//绑定端口, 同步等待成功;
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("当前服务器端启动成功...");
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅关闭 线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 设置异步回调监听
ch.pipeline().addLast(new BoyatopNettyMQServer.MayiktServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 9008;
new BoyatopNettyMQServer().bind(port);
}
private static final String type_consumer = "consumer";
private static final String type_producer = "producer";
private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();
private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();
// 生产者投递消息的:topicName
public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {
/**
* 服务器接收客户端请求
*
* @param ctx
* @param data
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object data)
throws Exception {
//ByteBuf buf=(ByteBuf)data;
//byte[] req = new byte[buf.readableBytes()];
//buf.readBytes(req);
//String body = new String(req, "UTF-8");
//System.out.println("body:"+body);
JSONObject clientMsg = getData(data);
String type = clientMsg.getString("type");
switch (type) {
case type_producer:
producer(clientMsg);
break;
case type_consumer:
consumer(ctx);
break;
}
}
private void consumer(ChannelHandlerContext ctx) {
// 保存消费者连接
ctxs.add(ctx);
// 主动拉取mq服务器端缓存中没有被消费的消息
String data = msgs.poll();
if (StringUtils.isEmpty(data)) {
return;
}
// 将该消息发送给消费者
byte[] req = data.getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
private void producer(JSONObject clientMsg) {
// 缓存生产者投递 消息
String msg = clientMsg.getString("msg");
msgs.offer(msg); //保证消息不丢失还可以缓存硬盘
//需要将该消息推送消费者
ctxs.forEach((ctx) -> {
// 将该消息发送给消费者
String data = msgs.poll();
if (data == null) {
return;
}
byte[] req = data.getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
});
}
private JSONObject getData(Object data) throws UnsupportedEncodingException {
ByteBuf buf = (ByteBuf) data;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
return JSONObject.parseObject(body);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
}
- 生产端:
package com.qcby.springboot.MQ;
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @ClassName BoyatopNettyMQProducer
* @Author
* @Version V1.0
**/
public class BoyatopNettyMQProducer {
public void connect(int port, String host) throws Exception {
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
// 设置为Netty客户端
.channel(NioSocketChannel.class)
/**
* ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
*/
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new BoyatopNettyMQProducer.NettyClientHandler());
1. 演示LineBasedFrameDecoder编码器
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
}
});
//绑定端口, 异步连接操作
ChannelFuture future = client.connect(host, port).sync();
//等待客户端连接端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 9008;
BoyatopNettyMQProducer client = new BoyatopNettyMQProducer();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject data = new JSONObject();
data.put("type", "producer");
JSONObject msg = new JSONObject();
msg.put("userId", "123456");
msg.put("age", "23");
data.put("msg", msg);
// 生产发送数据
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
/**
* 客户端读取到服务器端数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("客户端接收到服务器端请求:" + body);
}
// tcp属于双向传输
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
- 客户端:
package com.qcby.springboot.MQ;
import com.alibaba.fastjson.JSONObject;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @ClassName BoyatopNettyMQProducer
* @Author
* @Version V1.0
**/
public class NettyMQConsumer {
public void connect(int port, String host) throws Exception {
//配置客户端NIO 线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(group)
// 设置为Netty客户端
.channel(NioSocketChannel.class)
/**
* ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
*/
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyMQConsumer.NettyClientHandler());
1. 演示LineBasedFrameDecoder编码器
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
}
});
//绑定端口, 异步连接操作
ChannelFuture future = client.connect(host, port).sync();
//等待客户端连接端口关闭
future.channel().closeFuture().sync();
} finally {
//优雅关闭 线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 9008;
NettyMQConsumer client = new NettyMQConsumer();
try {
client.connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
JSONObject data = new JSONObject();
data.put("type", "consumer");
// 生产发送数据
byte[] req = data.toJSONString().getBytes();
ByteBuf firstMSG = Unpooled.buffer(req.length);
firstMSG.writeBytes(req);
ctx.writeAndFlush(firstMSG);
}
/**
* 客户端读取到服务器端数据
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("客户端接收到服务器端请求:" + body);
}
// tcp属于双向传输
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}