当前位置: 首页 > article >正文

Go搭建TcpSocket服务器

1.net包的强大功能

不可否认,go在网络服务开发有强大的优势。net库是一个功能强大的网络编程库,它提供了构建TCP、UDP和HTTP服务器和客户端所需的所有基础工具。

例如,搭建tcp服务器,只需要几行代码。

func main() {
     listener, err := net.Listen("tcp", ":8080")
     if err != nil {
         log.Fatal(err)
     }
     for {
         conn, err := listener.Accept()
         if err != nil {
             log.Print(err)
             continue
         }
         go handleConnection(conn)
     }
}

func handleConnection(conn net.Conn) {
    defer conn.Close()
    // 处理连接...
}

搭建udp服务器

func main() {
    addr, err := net.ResolveUDPAddr("udp", ":8080")
    if err != nil {
        log.Fatal(err)
    }
    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    buffer := make([]byte, 1024)
    for {
        n, remoteAddr, err := conn.ReadFromUDP(buffer)
        if err != nil {
            log.Print(err)
            continue
        }
        // 处理数据...
    }
}

搭建http服务器,使用net/http子模块

func main() {
    http.HandleFunc("/", handler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}

func handler(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Hello, World!")
}

2. java搭建tcp服务器

2.1.使用原生API的挑战

虽然利用java原生的NIOAPI也可以进行socket开发,但存在诸如以下的缺陷与挑战:

  1. 编程复杂性:NIO的编程模型相对复杂,需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等类库和API。开发者需要处理多路复用、事件循环、非阻塞读写等操作,这增加了编程的难度和代码的复杂性。
  2. 可靠性和稳定性问题:原生NIO需要开发者自己处理网络的闪断、客户端的重复接入、客户端的安全认证、消息的编解码、半包读写等情况,如果没有足够的NIO编程经验积累,一个NIO框架的稳定往往需要半年甚至更长的时间。
  3. 线程管理:NIO编程涉及到Reactor模式,必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。线程的管理和调度也是一个挑战,尤其是在高并发场景下。
  4. JDK NIO的BUG:例如臭名昭著的Epoll Bug,它会导致Selector空轮询,最终导致CPU 100%。虽然在JDK 1.7中有所改善,但并未完全解决。
  5. 资源消耗:使用内存占用较高,使用直接内存缓冲区可能导致较高的内存消耗,尤其是在处理大量数据时。

正因如此,mina,netty这类高效的NIO框架应运而生。然而,netty,mina的学习成本也不小。

2.2.netty编解码示例

下面看下,netty如何处理tcp网络连接,主要是黏包/拆包逻辑。 

解码器:

 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < DefaultMessageHeader.SIZE) {
            return;
        }
        in.markReaderIndex();
        // ----------------protocol pattern-------------------------
        //      header(12bytes)     | body
        // msgLength = 12+len(body) | body
        // msgLength | index | cmd  | body
        byte[] header = new byte[DefaultMessageHeader.SIZE];
        in.readBytes(header);
        DefaultMessageHeader headerMeta = new DefaultMessageHeader();
        headerMeta.read(header);

        int length = headerMeta.getMsgLength();
        if (length > maxProtocolBytes) {
            logger.error("message data frame [{}] too large, close session now", length);
            ctx.close();
            return;
        }
        int bodySize = length - DefaultMessageHeader.SIZE;
        if (in.readableBytes() < bodySize) {
            in.resetReaderIndex();
            return;
        }
        int cmd = headerMeta.getCmd();
        byte[] body = new byte[bodySize];
        in.readBytes(body);

        Class<?> msgClazz = messageFactory.getMessage(cmd);

        Object message = messageCodec.decode(msgClazz, body);
        out.add(new RequestDataFrame(headerMeta, message));
    }

编码器:

@Override
	protected void encode(ChannelHandlerContext ctx, Object message, ByteBuf out) throws Exception {
		assert message instanceof SocketDataFrame;
		SocketDataFrame dataFrame = (SocketDataFrame) message;
		// ----------------protocol pattern-------------------------
		//      header(12bytes)     | body
		// msgLength = 12+len(body) | body
		// msgLength | index | cmd  | body
		int  cmd = messageFactory.getMessageId(dataFrame.getMessage().getClass());
		try {
            byte[] body = messageCodec.encode(dataFrame.getMessage());
			// 写入包头
			//消息内容长度
			int msgLength = body.length + DefaultMessageHeader.SIZE;
			out.writeInt(msgLength);
			out.writeInt(dataFrame.getIndex());
			// 写入cmd类型
			out.writeInt(cmd);

			// 写入包体
			out.writeBytes(body);

		} catch (Exception e) {
			logger.error("wrote message {} failed", cmd, e);
		}
	}

2.3.创建socket服务器

   private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            ChannelPipeline pipeline = arg0.pipeline();
       
            if (protocolDecoder != null) {
                pipeline.addLast("protocolDecoder",protocolDecoder);
            } else {
                pipeline.addLast("protocolDecoder", new DefaultProtocolDecoder(messageFactory, messageCodec, maxProtocolBytes));
            }
            if (protocolEncoder != null) {
                pipeline.addLast("protocolEncoder", protocolEncoder);
            } else {
                pipeline.addLast("protocolEncoder", new DefaultProtocolEncoder(messageFactory, messageCodec));
            }

            if (idleTime > 0) {
                // 客户端XXX没收发包,便会触发UserEventTriggered事件到IdleEventHandler
                pipeline.addLast(new IdleStateHandler(0, 0, idleTime,
                        TimeUnit.MILLISECONDS));
                pipeline.addLast("serverIdleHandler", serverIdleHandler);
            }
            pipeline.addLast("socketIoHandler", channelIoHandler);
        }
    }

总体来说,还是挺复杂的,需要对netty的api有一定的了解。

3. go搭建tcp服务器

3.1私有协议栈编解码

Tcp传输层在网络传递的数据是一堆字节码,类似于水管的流水,没有边界。由于传输层不是应用层,没有公有协议,也就没有统一的通信格式,都是应用程序内部定义的,只要服务器与客户端遵守同样的规则即可,因此这种协议也叫“私有协议”。

import (
	"bytes"
	"errors"
)

// Protocol constants.
const (
	HeadLength    = 8
	MaxPacketSize = 64 * 1024
)

type MessageHeader struct {
	Cmd  int //消息类型
	Size int //消息长度
}

// ErrPacketSizeExceed is the error used for encode/decode.
var ErrPacketSizeExceed = errors.New("Protocol: packet size exceed")

type Protocol struct {
	buf *bytes.Buffer
}

// NewDecoder returns a new decoder that used for decode network bytes slice.
func NewDecoder() *Protocol {
	return &Protocol{
		buf: bytes.NewBuffer(nil),
	}
}

func (c *Protocol) readHeader() (*MessageHeader, error) {
	buff := c.buf.Next(HeadLength)
	id := bytesToInt(buff[0:4])
	size := bytesToInt(buff[4:HeadLength])

	// packet length limitation
	if size > MaxPacketSize {
		return nil, ErrPacketSizeExceed
	}
	return &MessageHeader{Cmd: id, Size: size}, nil
}

func (c *Protocol) Decode(data []byte) ([]*Packet, error) {
	c.buf.Write(data)
	// check length
	if c.buf.Len() < HeadLength {
		return nil, errors.New("length too small")
	}
	var packets []*Packet

	for c.buf.Len() > 0 {
		header, err := c.readHeader()
		if err != nil {
			return packets, err
		}

		if header.Size <= c.buf.Len() {
			body := c.buf.Next(header.Size)
			p := &Packet{Header: *header, Data: body}
			packets = append(packets, p)
		} else {
			break
		}
	}

	return packets, nil
}

func (c *Protocol) Encode(cmd int, data []byte) ([]byte, error) {
	// p := &Packet{Cmd: cmd, Length: len(data)}
	bodyLen := len(data)
	buf := make([]byte, HeadLength+bodyLen)

	copy(buf[0:4], intToBytes(cmd))
	copy(buf[4:HeadLength], intToBytes(bodyLen))
	copy(buf[HeadLength:], data)

	return buf, nil
}

// Decode packet data length byte to int(Big end)
func bytesToInt(b []byte) int {
	result := 0
	for _, v := range b {
		result = result<<8 + int(v)
	}
	return result
}

func intToBytes(n int) []byte {
	buf := make([]byte, 4)
	buf[0] = byte((n >> 24) & 0xFF)
	buf[1] = byte((n >> 16) & 0xFF)
	buf[2] = byte((n >> 8) & 0xFF)
	buf[3] = byte(n & 0xFF)
	return buf
}

大致逻辑:

每次链接建立时,将缓存区的二进制流转移到session自己的缓存区(Protocol#buf字段),在循环内部,每次读取包头8个字节,如果剩余的字节树超过包体的长度,则接取对应的长度,反序列化为一个完整的消息包,再转交给下一步的消息解码器。这个流程只依赖go的内部api,完全无需引入其他依赖,非常漂亮。 

3.2.tcp服务器

package network

import (
	"errors"
	"fmt"
	"github.com/gorilla/websocket"
	"log"
	"net"
	"net/http"
	"reflect"
)

type Node struct {
	Name   string // 服务器名称
	option Options
}

func (n *Node) Startup(opts ...Option) error {
	// 设置参数
	opt := Options{}
	for _, option := range opts {
		option(&opt)
	}
	n.option = opt

	if n.option.ServiceAddr == "" {
		return errors.New("service address cannot be empty in master node")
	}

	go func() {
	    n.listenTcpConn()
		
	}()

	return nil
}

// Enable current server accept connection
func (n *Node) listenTcpConn() {
	listener, err := net.Listen("tcp", n.option.ServiceAddr)
	if err != nil {
		log.Fatal(err.Error())
	}

	defer listener.Close()
	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Println(err.Error())
			continue
		}

		go handleClient(n, conn)
	}
}

// 处理客户端连接,包括socket,websocket
func handleClient(node *Node, conn net.Conn) {
	defer conn.Close() // 确保在函数结束时关闭连接

	ioSession := NewSession(&conn, node.option.MessageCodec)
	// 异步向客户端写数据
	go ioSession.Write()

	// read loop
	buf := make([]byte, 2048)
	for {
		n, err := conn.Read(buf)
		if err != nil {
			log.Printf(fmt.Sprintf("Read message error: %s, session will be closed immediately", err.Error()))
			return
		}
		if n <= 0 {
			continue
		}
		packets, err := ioSession.ProtocolCodec.Decode(buf[:n])
		if err != nil {
			log.Println(err.Error())
			return
		}
		// process packets decoded
		for _, p := range packets {
			typ, _ := GetMessageType(p.Header.Cmd)
			msg := reflect.New(typ.Elem()).Interface()
			err := node.option.MessageCodec.Decode(p.Data, msg)
			if err != nil {
				log.Println(err.Error())
				continue
			}
			ioFrame := &RequestDataFrame{Header: p.Header, Msg: msg}
			node.option.IoDispatch.OnMessageReceived(ioSession, *ioFrame)
		}
	}
}

 完整代码请移步:

--> go游戏服务器


http://www.kler.cn/a/315320.html

相关文章:

  • 中国石油大学(华东)自动评教工具(涵盖爬虫的基础知识,适合练手)
  • ESP8266 AP模式 网页配网 arduino ide
  • Java中对list数据进行手动分页(可直接复用版)
  • 通过proto文件构建 完整的 gRPC 服务端和客户端案例
  • 第G1周:生成对抗网络(GAN)入门
  • dockerfile实现lnmp
  • 华润电力最新校招社招润择认知能力测评:逻辑推理数字计算语言理解高分攻略
  • K8s容器运行时,移除Dockershim后存在哪些疑惑?
  • 神经网络面试题目
  • 【AI视频】复刻抖音爆款AI数字人作品初体验
  • 什么是机器学习力场
  • 多维时序 | Matlab基于BO-LSSVM贝叶斯优化最小二乘支持向量机数据多变量时间序列预测
  • cesium.js 入门到精通(5-2)
  • CentOS7.9环境上NFS搭建及使用
  • linux-系统备份与恢复-系统恢复
  • 云硬盘EVS详细解析和配置使用方法
  • 【在Linux世界中追寻伟大的One Piece】IP分片和组装的具体过程
  • Qt5详细安装教程(包含导入pycharm)
  • LangChain4j支持的API类型
  • Linux中使用cp命令的 -f 选项,但还是提醒覆盖的问题
  • 828华为云征文|云服务器Flexus X实例|MacOS系统-宝塔部署Nuxt项目
  • 【C#生态园】从基础到深度学习:探索C#机器学习库
  • EclipseRCP开发(三)-如何去除顽固原生菜单项
  • 递归手撕,JSON 字符串化和解析,加权树结构的字符串解析对象,解析并返回DOM 树结构(DOMParser),解析带有层级的文本
  • 51单片机-LCD1602(液晶显示屏)- 写驱动
  • 数据结构-树(基础,分类,遍历)