Go搭建TcpSocket服务器
1.net包的强大功能
不可否认,go在网络服务开发有强大的优势。net
库是一个功能强大的网络编程库,它提供了构建TCP、UDP和HTTP服务器和客户端所需的所有基础工具。
例如,搭建tcp服务器,只需要几行代码。
func main() {
listener, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConnection(conn)
}
}
func handleConnection(conn net.Conn) {
defer conn.Close()
// 处理连接...
}
搭建udp服务器
func main() {
addr, err := net.ResolveUDPAddr("udp", ":8080")
if err != nil {
log.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
buffer := make([]byte, 1024)
for {
n, remoteAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Print(err)
continue
}
// 处理数据...
}
}
搭建http服务器,使用net/http子模块
func main() {
http.HandleFunc("/", handler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
func handler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, World!")
}
2. java搭建tcp服务器
2.1.使用原生API的挑战
虽然利用java原生的NIOAPI也可以进行socket开发,但存在诸如以下的缺陷与挑战:
- 编程复杂性:NIO的编程模型相对复杂,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等类库和API。开发者需要处理多路复用、事件循环、非阻塞读写等操作,这增加了编程的难度和代码的复杂性。
- 可靠性和稳定性问题:原生NIO需要开发者自己处理网络的闪断、客户端的重复接入、客户端的安全认证、消息的编解码、半包读写等情况,如果没有足够的NIO编程经验积累,一个NIO框架的稳定往往需要半年甚至更长的时间。
- 线程管理:NIO编程涉及到Reactor模式,必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。线程的管理和调度也是一个挑战,尤其是在高并发场景下。
- JDK NIO的BUG:例如臭名昭著的Epoll Bug,它会导致Selector空轮询,最终导致CPU 100%。虽然在JDK 1.7中有所改善,但并未完全解决。
- 资源消耗:使用内存占用较高,使用直接内存缓冲区可能导致较高的内存消耗,尤其是在处理大量数据时。
正因如此,mina,netty这类高效的NIO框架应运而生。然而,netty,mina的学习成本也不小。
2.2.netty编解码示例
下面看下,netty如何处理tcp网络连接,主要是黏包/拆包逻辑。
解码器:
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < DefaultMessageHeader.SIZE) {
return;
}
in.markReaderIndex();
// ----------------protocol pattern-------------------------
// header(12bytes) | body
// msgLength = 12+len(body) | body
// msgLength | index | cmd | body
byte[] header = new byte[DefaultMessageHeader.SIZE];
in.readBytes(header);
DefaultMessageHeader headerMeta = new DefaultMessageHeader();
headerMeta.read(header);
int length = headerMeta.getMsgLength();
if (length > maxProtocolBytes) {
logger.error("message data frame [{}] too large, close session now", length);
ctx.close();
return;
}
int bodySize = length - DefaultMessageHeader.SIZE;
if (in.readableBytes() < bodySize) {
in.resetReaderIndex();
return;
}
int cmd = headerMeta.getCmd();
byte[] body = new byte[bodySize];
in.readBytes(body);
Class<?> msgClazz = messageFactory.getMessage(cmd);
Object message = messageCodec.decode(msgClazz, body);
out.add(new RequestDataFrame(headerMeta, message));
}
编码器:
@Override
protected void encode(ChannelHandlerContext ctx, Object message, ByteBuf out) throws Exception {
assert message instanceof SocketDataFrame;
SocketDataFrame dataFrame = (SocketDataFrame) message;
// ----------------protocol pattern-------------------------
// header(12bytes) | body
// msgLength = 12+len(body) | body
// msgLength | index | cmd | body
int cmd = messageFactory.getMessageId(dataFrame.getMessage().getClass());
try {
byte[] body = messageCodec.encode(dataFrame.getMessage());
// 写入包头
//消息内容长度
int msgLength = body.length + DefaultMessageHeader.SIZE;
out.writeInt(msgLength);
out.writeInt(dataFrame.getIndex());
// 写入cmd类型
out.writeInt(cmd);
// 写入包体
out.writeBytes(body);
} catch (Exception e) {
logger.error("wrote message {} failed", cmd, e);
}
}
2.3.创建socket服务器
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
ChannelPipeline pipeline = arg0.pipeline();
if (protocolDecoder != null) {
pipeline.addLast("protocolDecoder",protocolDecoder);
} else {
pipeline.addLast("protocolDecoder", new DefaultProtocolDecoder(messageFactory, messageCodec, maxProtocolBytes));
}
if (protocolEncoder != null) {
pipeline.addLast("protocolEncoder", protocolEncoder);
} else {
pipeline.addLast("protocolEncoder", new DefaultProtocolEncoder(messageFactory, messageCodec));
}
if (idleTime > 0) {
// 客户端XXX没收发包,便会触发UserEventTriggered事件到IdleEventHandler
pipeline.addLast(new IdleStateHandler(0, 0, idleTime,
TimeUnit.MILLISECONDS));
pipeline.addLast("serverIdleHandler", serverIdleHandler);
}
pipeline.addLast("socketIoHandler", channelIoHandler);
}
}
总体来说,还是挺复杂的,需要对netty的api有一定的了解。
3. go搭建tcp服务器
3.1私有协议栈编解码
Tcp传输层在网络传递的数据是一堆字节码,类似于水管的流水,没有边界。由于传输层不是应用层,没有公有协议,也就没有统一的通信格式,都是应用程序内部定义的,只要服务器与客户端遵守同样的规则即可,因此这种协议也叫“私有协议”。
import (
"bytes"
"errors"
)
// Protocol constants.
const (
HeadLength = 8
MaxPacketSize = 64 * 1024
)
type MessageHeader struct {
Cmd int //消息类型
Size int //消息长度
}
// ErrPacketSizeExceed is the error used for encode/decode.
var ErrPacketSizeExceed = errors.New("Protocol: packet size exceed")
type Protocol struct {
buf *bytes.Buffer
}
// NewDecoder returns a new decoder that used for decode network bytes slice.
func NewDecoder() *Protocol {
return &Protocol{
buf: bytes.NewBuffer(nil),
}
}
func (c *Protocol) readHeader() (*MessageHeader, error) {
buff := c.buf.Next(HeadLength)
id := bytesToInt(buff[0:4])
size := bytesToInt(buff[4:HeadLength])
// packet length limitation
if size > MaxPacketSize {
return nil, ErrPacketSizeExceed
}
return &MessageHeader{Cmd: id, Size: size}, nil
}
func (c *Protocol) Decode(data []byte) ([]*Packet, error) {
c.buf.Write(data)
// check length
if c.buf.Len() < HeadLength {
return nil, errors.New("length too small")
}
var packets []*Packet
for c.buf.Len() > 0 {
header, err := c.readHeader()
if err != nil {
return packets, err
}
if header.Size <= c.buf.Len() {
body := c.buf.Next(header.Size)
p := &Packet{Header: *header, Data: body}
packets = append(packets, p)
} else {
break
}
}
return packets, nil
}
func (c *Protocol) Encode(cmd int, data []byte) ([]byte, error) {
// p := &Packet{Cmd: cmd, Length: len(data)}
bodyLen := len(data)
buf := make([]byte, HeadLength+bodyLen)
copy(buf[0:4], intToBytes(cmd))
copy(buf[4:HeadLength], intToBytes(bodyLen))
copy(buf[HeadLength:], data)
return buf, nil
}
// Decode packet data length byte to int(Big end)
func bytesToInt(b []byte) int {
result := 0
for _, v := range b {
result = result<<8 + int(v)
}
return result
}
func intToBytes(n int) []byte {
buf := make([]byte, 4)
buf[0] = byte((n >> 24) & 0xFF)
buf[1] = byte((n >> 16) & 0xFF)
buf[2] = byte((n >> 8) & 0xFF)
buf[3] = byte(n & 0xFF)
return buf
}
大致逻辑:
每次链接建立时,将缓存区的二进制流转移到session自己的缓存区(Protocol#buf字段),在循环内部,每次读取包头8个字节,如果剩余的字节树超过包体的长度,则接取对应的长度,反序列化为一个完整的消息包,再转交给下一步的消息解码器。这个流程只依赖go的内部api,完全无需引入其他依赖,非常漂亮。
3.2.tcp服务器
package network
import (
"errors"
"fmt"
"github.com/gorilla/websocket"
"log"
"net"
"net/http"
"reflect"
)
type Node struct {
Name string // 服务器名称
option Options
}
func (n *Node) Startup(opts ...Option) error {
// 设置参数
opt := Options{}
for _, option := range opts {
option(&opt)
}
n.option = opt
if n.option.ServiceAddr == "" {
return errors.New("service address cannot be empty in master node")
}
go func() {
n.listenTcpConn()
}()
return nil
}
// Enable current server accept connection
func (n *Node) listenTcpConn() {
listener, err := net.Listen("tcp", n.option.ServiceAddr)
if err != nil {
log.Fatal(err.Error())
}
defer listener.Close()
for {
conn, err := listener.Accept()
if err != nil {
log.Println(err.Error())
continue
}
go handleClient(n, conn)
}
}
// 处理客户端连接,包括socket,websocket
func handleClient(node *Node, conn net.Conn) {
defer conn.Close() // 确保在函数结束时关闭连接
ioSession := NewSession(&conn, node.option.MessageCodec)
// 异步向客户端写数据
go ioSession.Write()
// read loop
buf := make([]byte, 2048)
for {
n, err := conn.Read(buf)
if err != nil {
log.Printf(fmt.Sprintf("Read message error: %s, session will be closed immediately", err.Error()))
return
}
if n <= 0 {
continue
}
packets, err := ioSession.ProtocolCodec.Decode(buf[:n])
if err != nil {
log.Println(err.Error())
return
}
// process packets decoded
for _, p := range packets {
typ, _ := GetMessageType(p.Header.Cmd)
msg := reflect.New(typ.Elem()).Interface()
err := node.option.MessageCodec.Decode(p.Data, msg)
if err != nil {
log.Println(err.Error())
continue
}
ioFrame := &RequestDataFrame{Header: p.Header, Msg: msg}
node.option.IoDispatch.OnMessageReceived(ioSession, *ioFrame)
}
}
}
完整代码请移步:
--> go游戏服务器