并发服务器框架——zinx
zinx框架
Zinx 是一个用 Go 语言编写的高性能、轻量级的 TCP 服务器框架,它被设计为简单、快速且易于使用。Zinx 提供了一系列的功能,包括但不限于连接管理、数据编解码、业务处理、负载均衡等,适用于构建各种 TCP 网络服务,如游戏服务器、即时通讯服务器等。
下面实现zinx的多个功能包括:路由、全局配置、消息封装、读写分离、消息队列、链接管理等。
utils包下GlobalObj.go
package utils
import (
"datarace/zinx/ziface"
"encoding/json"
"io/ioutil"
)
/*
存储一切有关Zinx框架的全局参数,供其他模块使用
一些参数也可以通过 用户根据 zinx.json来配置
*/
type GlobalObj struct {
TcpServer ziface.IServer //当前Zinx的全局Server对象
Host string //当前服务器主机IP
TcpPort int //当前服务器主机监听端口号
Name string //当前服务器名称
Version string //当前Zinx版本号
MaxPacketSize uint32 //都需数据包的最大值
MaxConn int //当前服务器主机允许的最大链接个数
WorkerPoolSize uint32 //业务工作Worker池的数量
MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量
ConfFilePath string
MaxMsgChanLen int
}
/*
定义一个全局的对象
*/
var GlobalObject *GlobalObj
// 读取用户的配置文件
func (g *GlobalObj) Reload() {
data, err := ioutil.ReadFile("zinx.json")
if err != nil {
panic(err)
}
//将json数据解析到struct中
//fmt.Printf("json :%s\n", data)
err = json.Unmarshal(data, &GlobalObject)
if err != nil {
panic(err)
}
}
func init() {
//初始化GlobalObject变量,设置一些默认值
GlobalObject = &GlobalObj{
Name: "ZinxServerApp",
Version: "V0.4",
TcpPort: 7777,
Host: "0.0.0.0",
MaxConn: 12000,
MaxPacketSize: 4096,
ConfFilePath: "conf/zinx.json",
WorkerPoolSize: 10,
MaxWorkerTaskLen: 1024,
}
//从配置文件中加载一些用户配置的参数
GlobalObject.Reload()
}
ziface包
package ziface
type IConnManager interface {
Add(conn IConnection) //添加链接
Remove(conn IConnection) //删除连接
Get(connID uint32) (IConnection, error) //利用ConnID获取链接
Len() int //获取当前连接
ClearConn() //删除并停止所有链接
}
package ziface
import "net"
type IConnection interface {
Start()
Stop()
GetConnID() uint32
GetTCPConnection() *net.TCPConn
RemoteAddr() net.Addr
SendMsg(msgId uint32, data []byte) error
//直接将Message数据发送给远程的TCP客户端(有缓冲)
SendBuffMsg(msgId uint32, data []byte) error //添加带缓冲发送消息接口
//设置链接属性
SetProperty(key string, value interface{})
//获取链接属性
GetProperty(key string) (interface{}, error)
//移除链接属性
RemoveProperty(key string)
}
type HandFunc func(*net.TCPConn, []byte, int) error
package ziface
type IDataPack interface {
GetHeadLen() int32
Pack(msg IMessage) ([]byte, error)
Unpack([]byte) (IMessage, error)
}
package ziface
type IMessage interface {
GetDataLen() uint32
GetMsgId() uint32
GetData() []byte
SetMsgId(uint32)
SetData([]byte)
SetDataLen(uint32)
}
package ziface
type IMsgHandle interface {
DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息
AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑
StartWorkerPool() //启动worker工作池
SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理
}
package ziface
type IRequest interface {
GetConnection() IConnection
GetData() []byte
GetMsgID() uint32
}
package ziface
type IRouter interface {
PreHandle(req IRequest)
Handle(req IRequest)
PostHandle(req IRequest)
}
package ziface
type IServer interface {
Start()
Stop()
Serve()
AddRouter(msgId uint32, router IRouter)
//得到链接管理
GetConnMgr() IConnManager
//设置该Server的连接创建时Hook函数
SetOnConnStart(func(IConnection))
//设置该Server的连接断开时的Hook函数
SetOnConnStop(func(IConnection))
//调用连接OnConnStart Hook函数
CallOnConnStart(conn IConnection)
//调用连接OnConnStop Hook函数
CallOnConnStop(conn IConnection)
}
znet包
connection
package znet
import (
"datarace/zinx/utils"
"datarace/zinx/ziface"
"errors"
"fmt"
"io"
"net"
"sync"
)
type Connection struct {
//当前Conn属于哪个Server
TcpServer ziface.IServer //当前conn属于哪个server,在conn初始化的时候添加即可
//当前连接的socket TCP套接字
Conn *net.TCPConn
//当前连接的ID 也可以称作为SessionID,ID全局唯一
ConnID uint32
//当前连接的关闭状态
isClosed bool
//消息管理MsgId和对应处理方法的消息管理模块
MsgHandler ziface.IMsgHandle
//告知该链接已经退出/停止的channel
ExitBuffChan chan bool
//无缓冲管道,用于读、写两个goroutine之间的消息通信
msgChan chan []byte
//有关冲管道,用于读、写两个goroutine之间的消息通信
msgBuffChan chan []byte
//链接属性
property map[string]interface{}
//保护链接属性修改的锁
propertyLock sync.RWMutex
}
// 创建连接的方法
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
//初始化Conn属性
c := &Connection{
TcpServer: server, //将隶属的server传递进来
Conn: conn,
ConnID: connID,
isClosed: false,
MsgHandler: msgHandler,
ExitBuffChan: make(chan bool, 1),
msgChan: make(chan []byte),
msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen), //不要忘记初始化
property: make(map[string]interface{}), //对链接属性map初始化
}
//将新创建的Conn添加到链接管理中
c.TcpServer.GetConnMgr().Add(c) //将当前新创建的连接添加到ConnManager中
return c
}
// 设置链接属性
func (c *Connection) SetProperty(key string, value interface{}) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()
c.property[key] = value
}
// 获取链接属性
func (c *Connection) GetProperty(key string) (interface{}, error) {
c.propertyLock.RLock()
defer c.propertyLock.RUnlock()
if value, ok := c.property[key]; ok {
return value, nil
} else {
return nil, errors.New("no property found")
}
}
// 移除链接属性
func (c *Connection) RemoveProperty(key string) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()
delete(c.property, key)
}
func (c *Connection) startReader() {
fmt.Println("[Reader Goroutine is running]")
defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
defer c.Stop()
for {
// 创建拆包解包的对象
dp := NewDataPack()
//读取客户端的Msg head
headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head error ", err)
break
}
//拆包,得到msgid 和 datalen 放在msg中
msg, err := dp.Unpack(headData)
if err != nil {
fmt.Println("unpack error ", err)
break
}
//根据 dataLen 读取 data,放在msg.Data中
var data []byte
if msg.GetDataLen() > 0 {
data = make([]byte, msg.GetDataLen())
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data error ", err)
continue
}
}
msg.SetData(data)
//得到当前客户端请求的Request数据
req := Request{
conn: c,
msg: msg,
}
//从绑定好的消息和对应的处理方法中执行对应的Handle方法
if utils.GlobalObject.WorkerPoolSize > 0 {
//已经启动工作池机制,将消息交给Worker处理
c.MsgHandler.SendMsgToTaskQueue(&req)
} else {
//从绑定好的消息和对应的处理方法中执行对应的Handle方法
go c.MsgHandler.DoMsgHandler(&req)
}
}
}
// 启动连接,让当前连接开始工作
func (c *Connection) Start() {
//1 开启用户从客户端读取数据流程的Goroutine
go c.startReader()
//2 开启用于写回客户端数据流程的Goroutine
go c.StartWriter()
//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
c.TcpServer.CallOnConnStart(c)
}
//func (c *Connection) Start() {
// go c.startReader()
// for {
// select {
// case <-c.ExitBuffChan:
// return
// }
// }
//}
// 停止连接,结束当前连接状态M
func (c *Connection) Stop() {
fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
//如果当前链接已经关闭
if c.isClosed == true {
return
}
c.isClosed = true
//==================
//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
c.TcpServer.CallOnConnStop(c)
//==================
// 关闭socket链接
c.Conn.Close()
//关闭Writer
c.ExitBuffChan <- true
//将链接从连接管理器中删除
c.TcpServer.GetConnMgr().Remove(c)
//关闭该链接全部管道
close(c.ExitBuffChan)
close(c.msgBuffChan)
}
/*
写消息Goroutine, 用户将数据发送给客户端
*/
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
for {
select {
case data := <-c.msgChan:
//有数据要写给客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send Data error:, ", err, " Conn Writer exit")
return
}
//针对有缓冲channel需要些的数据处理
case data, ok := <-c.msgBuffChan:
if ok {
//有数据要写给客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
return
}
} else {
break
fmt.Println("msgBuffChan is Closed")
}
case <-c.ExitBuffChan:
return
}
}
}
// 直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("Connection closed when send msg")
}
//将data封包,并且发送
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
return errors.New("Pack error msg ")
}
//写回客户端
c.msgChan <- msg //将之前直接回写给conn.Write的方法 改为 发送给Channel 供Writer读取
return nil
}
// 从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
// 获取当前连接ID
func (c *Connection) GetConnID() uint32 {
return c.ConnID
}
// 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
// // 直接将Message数据发送数据给远程的TCP客户端
//
// func (c *Connection) SendMsg(msgId uint32, data []byte) error {
// if c.isClosed == true {
// return errors.New("Connection closed when send msg")
// }
// //将data封包,并且发送
// dp := NewDataPack()
// msg, err := dp.Pack(NewMsgPackage(msgId, data))
// if err != nil {
// fmt.Println("Pack error msg id = ", msgId)
// return errors.New("Pack error msg ")
// }
// //写回客户端
// if _, err := c.Conn.Write(msg); err != nil {
// fmt.Println("Write msg id ", msgId, " error ")
// c.ExitBuffChan <- true
// return errors.New("conn Write error")
// }
// return nil
// }
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("Connection closed when send buff msg")
}
//将data封包,并且发送
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
return errors.New("Pack error msg ")
}
//写回客户端
c.msgBuffChan <- msg
return nil
}
ConnManager
package znet
import (
"datarace/zinx/ziface"
"errors"
"fmt"
"sync"
)
type ConnManager struct {
connections map[uint32]ziface.IConnection
connLock sync.RWMutex
}
/*
创建一个链接管理
*/
func NewConnManager() *ConnManager {
return &ConnManager{
connections: make(map[uint32]ziface.IConnection),
}
}
// 添加链接
func (connMgr *ConnManager) Add(conn ziface.IConnection) {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
//将conn连接添加到ConnMananger中
connMgr.connections[conn.GetConnID()] = conn
fmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())
}
// 删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
//删除连接信息
delete(connMgr.connections, conn.GetConnID())
fmt.Println("connection Remove ConnID=", conn.GetConnID(), " successfully: conn num = ", connMgr.Len())
}
// 利用ConnID获取链接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {
//保护共享资源Map 加读锁
connMgr.connLock.RLock()
defer connMgr.connLock.RUnlock()
if conn, ok := connMgr.connections[connID]; ok {
return conn, nil
} else {
return nil, errors.New("connection not found")
}
}
// 获取当前连接
func (connMgr *ConnManager) Len() int {
return len(connMgr.connections)
}
// 清除并停止所有连接
func (connMgr *ConnManager) ClearConn() {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
//停止并删除全部的连接信息
for connID, conn := range connMgr.connections {
//停止
conn.Stop()
//删除
delete(connMgr.connections, connID)
}
fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())
}
datapack
package znet
import (
"bytes"
"datarace/zinx/utils"
"datarace/zinx/ziface"
"encoding/binary"
"errors"
)
type DataPack struct{}
func NewDataPack() *DataPack {
return &DataPack{}
}
func (dp *DataPack) GetHeadLen() uint32 {
//Id uint32(4字节) + DataLen uint32(4字节)
return 8
}
// 封包方法(压缩数据)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
//创建一个存放bytes字节的缓冲
dataBuff := bytes.NewBuffer([]byte{})
//写dataLen
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {
return nil, err
}
//写msgID
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {
return nil, err
}
//写data数据
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {
return nil, err
}
return dataBuff.Bytes(), nil
}
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
//创建一个从输入二进制数据的ioReader
dataBuff := bytes.NewReader(binaryData)
//只解压head的信息,得到dataLen和msgID
msg := &Message{}
//读dataLen
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil {
return nil, err
}
//读msgID
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {
return nil, err
}
//判断dataLen的长度是否超出我们允许的最大包长度
if utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize {
return nil, errors.New("Too large msg data recieved")
}
//这里只需要把head的数据拆包出来就可以了,然后再通过head的长度,再从conn读取一次数据
return msg, nil
}
message
package znet
type Message struct {
Id uint32
DataLen uint32
Data []byte
}
func NewMsgPackage(id uint32, data []byte) *Message {
return &Message{
Id: id,
DataLen: uint32(len(data)),
Data: data,
}
}
// 获取消息数据段长度
func (msg *Message) GetDataLen() uint32 {
return msg.DataLen
}
// 获取消息ID
func (msg *Message) GetMsgId() uint32 {
return msg.Id
}
// 获取消息内容
func (msg *Message) GetData() []byte {
return msg.Data
}
// 设置消息数据段长度
func (msg *Message) SetDataLen(len uint32) {
msg.DataLen = len
}
// 设计消息ID
func (msg *Message) SetMsgId(msgId uint32) {
msg.Id = msgId
}
// 设计消息内容
func (msg *Message) SetData(data []byte) {
msg.Data = data
}
msgHandler
package znet
import (
"datarace/zinx/utils"
"datarace/zinx/ziface"
"fmt"
"strconv"
)
type MsgHandle struct {
Apis map[uint32]ziface.IRouter
WorkerPoolSize uint32
TaskQueue []chan ziface.IRequest
}
func NewMsgHandle() *MsgHandle {
return &MsgHandle{
Apis: make(map[uint32]ziface.IRouter),
WorkerPoolSize: utils.GlobalObject.WorkerPoolSize,
TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
}
}
// 马上以非阻塞方式处理消息
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
handler, ok := mh.Apis[request.GetMsgID()]
if !ok {
fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!")
return
}
//执行对应处理方法
handler.PreHandle(request)
handler.Handle(request)
handler.PostHandle(request)
}
// 为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {
//1 判断当前msg绑定的API处理方法是否已经存在
if _, ok := mh.Apis[msgId]; ok {
panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))
}
//2 添加msg与api的绑定关系
mh.Apis[msgId] = router
fmt.Println("Add api msgId = ", msgId)
}
// 启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
fmt.Println("Worker ID = ", workerID, " is started.")
//不断的等待队列中的消息
for {
select {
//有消息则取出队列的Request,并执行绑定的业务方法
case request := <-taskQueue:
mh.DoMsgHandler(request)
}
}
}
// 启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {
//遍历需要启动worker的数量,依此启动
for i := 0; i < int(mh.WorkerPoolSize); i++ {
//一个worker被启动
//给当前worker对应的任务队列开辟空间
mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
go mh.StartOneWorker(i, mh.TaskQueue[i])
}
}
// 将消息交给TaskQueue,由worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
//根据ConnID来分配当前的连接应该由哪个worker负责处理
//轮询的平均分配法则
//得到需要处理此条连接的workerID
workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
fmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request msgID=", request.GetMsgID(), "to workerID=", workerID)
//将请求消息发送给任务队列
mh.TaskQueue[workerID] <- request
}
request
package znet
import (
"datarace/zinx/ziface"
)
type Request struct {
conn ziface.IConnection
msg ziface.IMessage
}
func (r *Request) GetConnection() ziface.IConnection {
return r.conn
}
// 获取请求消息的数据
func (r *Request) GetData() []byte {
return r.msg.GetData()
}
// 获取请求的消息的ID
func (r *Request) GetMsgID() uint32 {
return r.msg.GetMsgId()
}
router
package znet
import "datarace/zinx/ziface"
type BaseRouter struct{}
func (br *BaseRouter) PreHandle(request ziface.IRequest) {}
func (br *BaseRouter) Handle(request ziface.IRequest) {}
func (br *BaseRouter) PostHandle(request ziface.IRequest) {}
server
package znet
import (
"datarace/zinx/utils"
"datarace/zinx/ziface"
"fmt"
"net"
"time"
)
// iServer 接口实现,定义一个Server服务类
type Server struct {
//服务器的名称
Name string
//tcp4 or other
IPVersion string
//服务绑定的IP地址
IP string
//服务绑定的端口
Port int
//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法
msgHandler ziface.IMsgHandle
//当前Server的链接管理器
ConnMgr ziface.IConnManager
//新增两个hook函数原型
//该Server的连接创建时Hook函数
OnConnStart func(conn ziface.IConnection)
//该Server的连接断开时的Hook函数
OnConnStop func(conn ziface.IConnection)
}
// 得到链接管理
func (s *Server) GetConnMgr() ziface.IConnManager {
return s.ConnMgr
}
// ============== 实现 ziface.IServer 里的全部接口方法 ========
// 开启网络服务
func (s *Server) Start() {
fmt.Printf("[START] Server listenner at IP: %s, Port %d, is starting\n", s.IP, s.Port)
fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)
fmt.Printf("[Zinx] Version: %s, MaxConn: %d, MaxPacketSize: %d\n",
utils.GlobalObject.Version,
utils.GlobalObject.MaxConn,
utils.GlobalObject.MaxPacketSize)
//开启一个go去做服务端Linster业务
go func() {
//1 获取一个TCP的Addr
s.msgHandler.StartWorkerPool()
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Println("resolve tcp addr err: ", err)
return
}
//2 监听服务器地址
listenner, err := net.ListenTCP(s.IPVersion, addr)
if err != nil {
fmt.Println("listen", s.IPVersion, "err", err)
return
}
//已经监听成功
fmt.Println("start Zinx server ", s.Name, " succ, now listenning...")
var cid uint32
cid = 0
//3 启动server网络连接业务
for {
//3.1 阻塞等待客户端建立连接请求
conn, err := listenner.AcceptTCP()
if err != nil {
fmt.Println("Accept err ", err)
continue
}
//=============
//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {
conn.Close()
continue
}
//=============
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
dealConn := NewConntion(s, conn, cid, s.msgHandler)
cid++
//3.4 启动当前链接的处理业务
go dealConn.Start()
//go func() {
// //不断的循环从客户端获取数据
// for {
// buf := make([]byte, 512)
// cnt, err := conn.Read(buf)
// if err != nil {
// fmt.Println("recv buf err ", err)
// continue
// }
// //回显
// if _, err := conn.Write(buf[:cnt]); err != nil {
// fmt.Println("write back buf err ", err)
// continue
// }
// }
//}()
}
}()
}
func (s *Server) Stop() {
fmt.Println("[STOP] Zinx server , name ", s.Name)
//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理
s.ConnMgr.ClearConn()
}
func (s *Server) Serve() {
s.Start()
//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加
//阻塞,否则主Go退出, listenner的go将会退出
for {
time.Sleep(10 * time.Second)
}
}
func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {
s.msgHandler.AddRouter(msgId, router)
fmt.Println("Add Router SUCC!! msgID = ", msgId)
}
/*
创建一个服务器句柄
*/
func NewServer() *Server {
utils.GlobalObject.Reload()
s := &Server{
Name: utils.GlobalObject.Name,
IPVersion: "tcp4",
IP: utils.GlobalObject.Host,
Port: utils.GlobalObject.TcpPort,
msgHandler: NewMsgHandle(), //msgHandler 初始化
ConnMgr: NewConnManager(), //创建ConnManage
}
return s
}
// 设置该Server的连接创建时Hook函数
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {
s.OnConnStart = hookFunc
}
// 设置该Server的连接断开时的Hook函数
func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {
s.OnConnStop = hookFunc
}
// 调用连接OnConnStart Hook函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {
if s.OnConnStart != nil {
fmt.Println("---> CallOnConnStart....")
s.OnConnStart(conn)
}
}
// 调用连接OnConnStop Hook函数
func (s *Server) CallOnConnStop(conn ziface.IConnection) {
if s.OnConnStop != nil {
fmt.Println("---> CallOnConnStop....")
s.OnConnStop(conn)
}
}
客户端
package main
import (
"datarace/zinx/znet"
"fmt"
"io"
"net"
"time"
)
/*
模拟客户端
*/
func main() {
fmt.Println("Client Test ... start")
//3秒之后发起测试请求,给服务端开启服务的机会
time.Sleep(3 * time.Second)
conn, err := net.Dial("tcp", "127.0.0.1:7777")
if err != nil {
fmt.Println("client start err, exit!")
return
}
for {
//发封包message消息
dp := znet.NewDataPack()
msg, _ := dp.Pack(znet.NewMsgPackage(0, []byte("Zinx V0.8 Client0 Test Message")))
_, err := conn.Write(msg)
if err != nil {
fmt.Println("write error err ", err)
return
}
//先读出流中的head部分
headData := make([]byte, dp.GetHeadLen())
_, err = io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止
if err != nil {
fmt.Println("read head error")
break
}
//将headData字节流 拆包到msg中
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("server unpack err:", err)
return
}
if msgHead.GetDataLen() > 0 {
//msg 是有data数据的,需要再次读取data数据
msg := msgHead.(*znet.Message)
msg.Data = make([]byte, msg.GetDataLen())
//根据dataLen从io中读取字节流
_, err := io.ReadFull(conn, msg.Data)
if err != nil {
fmt.Println("server unpack data err:", err)
return
}
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
}
time.Sleep(1 * time.Second)
}
}
服务端
package main
import (
"datarace/zinx/ziface"
"datarace/zinx/znet"
"fmt"
)
// ping test 自定义路由
type PingRouter struct {
znet.BaseRouter
}
// Ping Handle
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle")
//先读取客户端的数据,再回写ping...ping...ping
fmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
err := request.GetConnection().SendBuffMsg(0, []byte("ping...ping...ping"))
if err != nil {
fmt.Println(err)
}
}
type HelloZinxRouter struct {
znet.BaseRouter
}
// HelloZinxRouter Handle
func (this *HelloZinxRouter) Handle(request ziface.IRequest) {
fmt.Println("Call HelloZinxRouter Handle")
//先读取客户端的数据,再回写ping...ping...ping
fmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
err := request.GetConnection().SendBuffMsg(1, []byte("Hello Zinx Router V0.10"))
if err != nil {
fmt.Println(err)
}
}
// 创建连接的时候执行
func DoConnectionBegin(conn ziface.IConnection) {
fmt.Println("DoConnecionBegin is Called ... ")
//=============设置两个链接属性,在连接创建之后===========
fmt.Println("Set conn Name, Home done!")
conn.SetProperty("Name", "Aceld")
conn.SetProperty("Home", "https://www.jianshu.com/u/35261429b7f1")
//===================================================
err := conn.SendMsg(2, []byte("DoConnection BEGIN..."))
if err != nil {
fmt.Println(err)
}
}
// 连接断开的时候执行
func DoConnectionLost(conn ziface.IConnection) {
//============在连接销毁之前,查询conn的Name,Home属性=====
if name, err := conn.GetProperty("Name"); err == nil {
fmt.Println("Conn Property Name = ", name)
}
if home, err := conn.GetProperty("Home"); err == nil {
fmt.Println("Conn Property Home = ", home)
}
//===================================================
fmt.Println("DoConneciotnLost is Called ... ")
}
func main() {
//创建一个server句柄
s := znet.NewServer()
//注册链接hook回调函数
s.SetOnConnStart(DoConnectionBegin)
s.SetOnConnStop(DoConnectionLost)
//配置路由
s.AddRouter(0, &PingRouter{})
s.AddRouter(1, &HelloZinxRouter{})
//开启服务
s.Serve()
}