当前位置: 首页 > article >正文

【golang】23、gorilla websocket 源码:examples、数据结构、流程

文章目录

  • 一、examples
    • 1.1 echo
      • 1.1.1 server.go
      • 1.1.2 client.go
    • 1.2 command
      • 1.2.1 功能和启动方式
      • 1.2.2 home.html
      • 1.2.3 main.go
    • 1.3 filewatch
      • 1.3.1 html
      • 1.3.2 serveHome 渲染模板
      • 1.3.3 serveWs
      • 1.3.4 writer()
    • 1.4 buffer pool
      • 1.4.1 server
      • 1.4.2 client
    • 1.5 chat
      • 1.5.1 server
      • 1.5.2 hub
      • 1.5.3 client
  • 二、协议详情
    • 2.1 协议升级
    • 2.2 连接确认
    • 2.3 数据帧格式
      • 2.3.1 常见状态码
  • 三、code
    • 3.1 数据结构
      • 3.1.1 Upgrader
      • 3.1.2 Conn
    • 3.2 demo
    • 3.3 Server
      • 3.3.1 Upgrade() 协议升级
      • 3.3.2 computeAcceptKey() 计算接受密钥
    • 3.4 Client
      • 3.4.1 Dialer
      • 3.4.2 Dial() 和 DialContext()
    • 3.5 Conn

共 5400 行 go

没有引用第三方库,共 5k 行代码,主要是 server.go,client.go,conn.go,重点是实现 Web Socket 协议的部分。

一、examples

1.1 echo

客户端每秒发消息,并打印收到的消息

服务端收到消息后,再重复回复

先 go run server.go, 再 go run client.go, 在浏览器 http://127.0.0.1:8080 查看

1.1.1 server.go

在Go语言中,//go:build ignore// +build ignore 都是编译指令,用于告诉编译器忽略该文件或代码块的编译。

//go:build ignore 是在Go 1.17版本中引入的新的编译指令格式,用于指定在构建时忽略该文件或代码块。这意味着编译器将跳过该文件或代码块的编译,不会将其包含在最终的可执行文件中。

// +build ignore 是在较早的Go版本中使用的编译指令格式,具有相同的功能。它告诉编译器忽略该文件或代码块的编译,不包含在最终的可执行文件中。

这两个编译指令的作用是相同的,只是格式略有不同。在Go 1.17及更高版本中,推荐使用//go:build ignore来指定忽略编译的文件或代码块。

html 定义如下:

首先是一行 tr,其有两列 td,第一列是一个 input,第二列是一个 button,点击 button 时,会调用 send 函数,将 input 的值发送到服务器。

1.1.2 client.go

通过 DefaultDialer 连接

开协程循环 ReadMessage(),收到消息就打印控制台。

主线程,循环,

  • 每 1s 发一条消息(消息内容为当前时间)
  • 如果收到 sigint 信号,就 write CloseMessage,然后优雅退出(等 done 信号,或 1 秒。其中若 server 响应了 closeMessage,则 client 的 ReadMessage 会返回 err,则会向 done 发信号)

1.2 command

通过 websocket,可以很方便的实现 Web console

为什么 http 轮训做不到呢?因为不实时,比如设置每 1s 轮训一次,那就会有最多 1s 的延迟。但为了性能,总不能 100ms 轮训一次吧,那太浪费性能。而且 websocket 最大的优点就是双向通信,双向就能实现实时。

1.2.1 功能和启动方式

This example connects a websocket connection to stdin and stdout of a command.
Received messages are written to stdin followed by a `\n`. Each line read from
standard out is sent as a message to the client.

    $ go get github.com/gorilla/websocket
    $ cd `go list -f '{{.Dir}}' github.com/gorilla/websocket/examples/command`
    $ go run main.go <command and arguments to run>
    # Open http://localhost:8080/ .

Try the following commands.

    # Echo sent messages to the output area.
    $ go run main.go cat

    # Run a shell.Try sending "ls" and "cat main.go".
    $ go run main.go sh

功能就是使 server 进程变为某 command 的代理,例如 sh,cat 等

从 client 收到消息后,会传给该 command,并将执行结果返回

1.2.2 home.html

通过纯html 实现的前端,整体结构为

<!DOCTYPE html>
<html lang="en">
<head>
    <title>Command Example</title> // head的title显示到标签页
</head>

<body>
<div id="log"></div> // 显示 ws 的输出结果, 会通过创建 div,设置 div 内容,拖动滚动条实现
<form id="form"> // 底部,设置绝对定位
    <input type="submit" value="Send"/> // Send按钮,通过 type=submit设置的样式
    <input type="text" id="msg" size="64"/> // 文本框, 输入会传给 ws 对象
</form>
</body>
</html>

js 部分,主要控制了 div 对象,和 ws 对象:

window.onload = function () { // 首先通过 window.onload 在加载 html 对象时,初始化变量
    var conn;
    var msg = document.getElementById("msg");
    var log = document.getElementById("log");

    function appendLog(item) { // 通用逻辑,将 div=log 对象的滚动条拖动到最底下
        var doScroll = log.scrollTop > log.scrollHeight - log.clientHeight - 1;
        log.appendChild(item);
        if (doScroll) {
            log.scrollTop = log.scrollHeight - log.clientHeight;
        }
    }

    document.getElementById("form").onsubmit = function () { // 用户点了 Send 按钮,则 ws 发送,并清空输入框 text
        if (!conn) {
            return false;
        }
        if (!msg.value) {
            return false;
        }
        conn.send(msg.value);
        msg.value = "";
        return false;
    };

    if (window["WebSocket"]) { // 浏览器是否支持 WebSocket,一般都是支持的
        conn = new WebSocket("ws://" + document.location.host + "/ws");
        conn.onclose = function (evt) { // 断开连接后,打印到 log 上
            var item = document.createElement("div");
            item.innerHTML = "<b>Connection closed.</b>";
            appendLog(item);
        };
        conn.onmessage = function (evt) { // 收到消息,消息可能有多行(以\n分割),则每行创建一个 log 对象
            var messages = evt.data.split('\n');
            for (var i = 0; i < messages.length; i++) {
                var item = document.createElement("div");
                item.innerText = messages[i];
                appendLog(item);
            }
        };
    } else {
        var item = document.createElement("div");
        item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
        appendLog(item);
    }
};

1.2.3 main.go

main.go 由一个 server 构成,首先定义各种超时时间

const (
    // Time allowed to write a message to the peer.
    writeWait = 10 * time.Second // 通过 net 包的 SetWriteDeadline() 实现,输入一个时刻,时刻到达前必须有 Write() 操作,否则 ws 断连,且未来的 write 请求会返回 error。
    // net 包的行为是:如果超过截止时间,对Read或Write或其他I/O方法的调用将返回一个包装了os.ErrDeadlineExceeded的错误

    // Maximum message size allowed from peer.
    maxMessageSize = 8192

    // Time allowed to read the next pong message from the peer.
    pongWait = 60 * time.Second // 通过 net 包的 SetReadDeadline() 实现,输入一个时刻,时刻到达前必须有 Read() 操作,否则 ws 断连,且未来的 write 请求会返回 error

    // Send pings to peer with this period. Must be less than pongWait.
    pingPeriod = (pongWait * 9) / 10 // mock 一个 client,其定时 ping server,来保持链接

    // Time to wait before force close on connection.
    closeGracePeriod = 10 * time.Second // 优雅退出:等 10s 后再关闭 ws 链接
)

主体逻辑:

main.go

func main() {
    flag.Parse()
    if len(flag.Args()) < 1 {
       log.Fatal("must specify at least one argument")
    }
    var err error
    cmdPath, err = exec.LookPath(flag.Args()[0]) // 通过 flags.Args()[0] 找到程序路径 cmdPath
    if err != nil {
       log.Fatal(err)
    }
    http.HandleFunc("/", serveHome)
    http.HandleFunc("/ws", serveWs)
    server := &http.Server{ // 启动 http server
       Addr:              *addr,
       ReadHeaderTimeout: 3 * time.Second,
    }
    log.Fatal(server.ListenAndServe())
}

server.ws

开启 go ping(ws, stdoutDone) 使连接保活

整体流程如下:

pumpStdin(ws, inw) 从 ws 读到 inw => inw => inr =os.StartProcess=> outw =》outr =》go pumpStdout(outr)

  • 首先主线程的 pumpStdin(ws, inw) 循环 ws.ReadMessage(),并写入 inw,备注:inw 是 os.File
  • 然后 inw 通过 os.Pipe() 传输给 inr
  • 然后调用 os.StartProcess(cmdPath, inr, outw, outw) 指定标准输入/输出/错误,同步执行子进程路径 cmdPath,从 inr 读取标准输入,写出结果到 outw
  • 然后 outw 通过 os.Pipe() 传输给 outr
  • 最终,通过 go pumpStdout(ws, outr) 读入 outr,并通过 ws.WriteMessage() 返回给 ws 的 client

整体数据链路很长,核心是 os.StartProcess() 需要 os.File 参数,所以用 inr, inw 参数可以很方便传给 os.StartProcess(cmdPath, inr, outw, outw)。

然后 inr 对应有 inw,其调用 pumpStdin(ws, inw) 读消息

然后 outw 对应用 outr,其调用 go pumpStdout(ws, outr) 写消息

func serveWs(w http.ResponseWriter, r *http.Request) {
    ws, err := upgrader.Upgrade(w, r, nil) // 将 http 链接 变为 ws 链接
    if err != nil {
       log.Println("upgrade:", err)
       return
    }

    defer ws.Close()

    outr, outw, err := os.Pipe()
    if err != nil {
       internalError(ws, "stdout:", err)
       return
    }
    defer outr.Close()
    defer outw.Close()

    inr, inw, err := os.Pipe()
    if err != nil {
       internalError(ws, "stdin:", err)
       return
    }
    defer inr.Close()
    defer inw.Close()

    proc, err := os.StartProcess(cmdPath, flag.Args(), &os.ProcAttr{
       Files: []*os.File{inr, outw, outw},
    })
    if err != nil {
       internalError(ws, "start:", err)
       return
    }

    inr.Close()
    outw.Close()

    stdoutDone := make(chan struct{})
    go pumpStdout(ws, outr, stdoutDone)
    go ping(ws, stdoutDone) // 协程,使连接保活

    pumpStdin(ws, inw)

    // Some commands will exit when stdin is closed.
    inw.Close()

    // Other commands need a bonk on the head.
    if err := proc.Signal(os.Interrupt); err != nil {
       log.Println("inter:", err)
    }

    select {
    case <-stdoutDone:
    case <-time.After(time.Second):
       // A bigger bonk on the head.
       if err := proc.Signal(os.Kill); err != nil {
          log.Println("term:", err)
       }
       <-stdoutDone
    }

    if _, err := proc.Wait(); err != nil {
       log.Println("wait:", err)
    }
}

1.3 filewatch

实现实时文件预览

1.3.1 html

首先,有一个 html 模板,只有 一个

加载页面后,即会和 server 建立 ws 链接,当收到 ws 消息后,会赋值 data 变量,即展示到该标签

<html lang="en">
    <head>
        <title>WebSocket Example</title>
    </head>
    <body>
        <pre id="fileData">{{.Data}}</pre>
        <script type="text/javascript">
            (function() {
                var data = document.getElementById("fileData");
                var conn = new WebSocket("ws://{{.Host}}/ws?lastMod={{.LastMod}}");
                conn.onclose = function(evt) {
                    data.textContent = 'Connection closed';
                }
                conn.onmessage = function(evt) {
                    console.log('file updated');
                    data.textContent = evt.data;
                }
            })();
        </script>
    </body>
</html>

1.3.2 serveHome 渲染模板

serveHome() 会用 go template 渲染模板

1.3.3 serveWs

从 url 的 form 解析 lastMod 参数,得到 time.Time 类型

开启协程,执行 go writer()

主线程,执行 reader()

1.3.4 writer()

pingTicker = 10s

fileTicker = 10s

循环,

  • 当 pingTicker 到达时,WriteMessage(ping),且在 Write() 之前设置 SetWriteDeadline() 放置网络问题无法发送出去造成主线程阻塞
  • 当 fileTicker 到达时,读文件最近的更改时间,判断和 ws client 传来 url 的 lastMod 的先后关系,如果文件有变化(文件的时间,比,用户传来的时间晚),则 WriteMessage(Text, 文件内容),且在 Write() 之前设置 SetWriteDeadline() 放置网络问题无法发送出去造成主线程阻塞
    • 当文件变化时,发出 ws 消息如下:

然后,server 会维护 lastMod 变量:因为在 go writer 协程里有 for 循环,所以 writer(lastMod) 只会被调用一次,其入参 lastMod 只是初始值,后续都是由 go writer 里的 for 循环自己维护 lastMod 变量的。

1.4 buffer pool

执行效果如下:

1.4.1 server

首先定义 upgrader,设置了 I/O BufferSize 是 256 bytes,默认是 4096 bytes,如果手动设置为 0,则会使用 http server 设置的 size。

I/O BufferSize 只是一个缓存区,并不限制能接收、发送的大小。

然后是 process(),循环 ReadMessage() 并打印,最终 Close() 断开链接。

http 的 handler() 会启用协程 go process() 处理该链接

所以 server 实现的功能就是:收到 ws 数据后,打印控制台

1.4.2 client

启动 1000 个协程,通过 wg 等待所有协程执行完毕

每个协程的逻辑如下:

主线程,通过 websocket.DefaultDialer.Dial() 和 server 建立连接

子线程,循环 ReadMessage() 收数据,并打印

控制主线程

  • 每 5min 的 ticker:每次通过 WriteMessage() 发消息,消息内容为当前时间戳
  • 如果收到 sigInt 信号,则通过 WriteMessage(CloseMessage) 发送断开连接消息

1.5 chat

一个比较实用的 demo,聊天室

启动方式,是 go run *.go,而不是依次 go run main.go && go run client.go

每个浏览器标签页会加载 home.html,并通过 /ws 建立 ws 连接,当 server 收到 /ws 的 handler 时,则会创建 client + 注册 client + 启动读协程和写协程,

  • 其中 client 的读 协程,收 ws 消息并送到 hub.broadcast 里。
  • 其中 client 的写 协程,会从 c.send 收消息并通过 ws.WriteMessage() 发 ws 消息。
  • 其中 hub 的 go run() 会将 h.broadcast chan 的数据,转发到 每个 client.send chan 中

其实这样做表意并不清晰,完全可以把 hub 变为单例,这样 client 并不需要持有 hub 的成员变量。(PS:目前多此一举的将 hub 传入了 serveWs() 函数,其内部又将 hub 赋值给了 client 的 hub 成员变量。)

1.5.1 server

main.go 是 server,其会返回 html 页面

server 会为每个 ws 链接创建一个 Client

Client 是 ws conn 和 hub 实例的中介,因为他有如下两个成员变量

// Client is a middleman between the websocket connection and the hub.
type Client struct {
    hub *Hub

    // The websocket connection.
    conn *websocket.Conn

    // Buffered channel of outbound messages.
    send chan []byte
}

Hub 由若干 Clients 构成

// Hub maintains the set of active clients and broadcasts messages to the
// clients.
type Hub struct {
    // Registered clients.
    clients map[*Client]bool

    // Inbound messages from the clients.
    broadcast chan []byte // 需广播给所有 clients 的消息内容

    // Register requests from the clients.
    register chan *Client

    // Unregister requests from clients.
    unregister chan *Client
}

一个 hub 有一个协程,go hub.run()

每个 Client 有两个协程

// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()

goroutine 之间通过 chan 传递消息

  • writePump() 从 c.send 收消息
  • readPump() 将消息发给 c.hub.broadcast

hub 有三个 chan,用于发送 broadcast,注册 client,注销 client

client 有 send chan []byte

  • 其 readPump() goroutine 从 ws 读消息,并发给 hub
  • 其 writePump() goroutine 从 send chan []byte 收消息,并给 ws 写消息

1.5.2 hub

应用主线程通过 go hub.run() 启动协程,client 向 hub 发 register、unregister、broadcast 请求

hub 把 register 的 client 加入 clients map 中,map 的 key 即为 client 的指针。

unregiser 代码比较复杂,除了从 clients map 中移除 client,hub 还会 close client 的 send chan 来通知不会再有消息发给 client 了。

hub 的工作机制是,loop registered clients 并 send message to client’s 的 send channel,如果 send buffer 已满,则 hub 会假设 client 已挂或卡住,这种情况下 hub 会注销 client 并关闭 ws 连接。

func (h *Hub) run() {
    for {
       select {
       case client := <-h.register:
          h.clients[client] = true
       case client := <-h.unregister:
          if _, ok := h.clients[client]; ok {
             delete(h.clients, client)
             close(client.send)
          }
       case message := <-h.broadcast:
          for client := range h.clients {
             select {
             case client.send <- message:
             default:
                close(client.send)
                delete(h.clients, client)
             }
          }
       }
    }
}

1.5.3 client

main 通过 http handler 注册了 serveWs(), 该 handler 将 http 链接升级为 ws 协议,创建 client,向 hub 注册 client,并控制 client 的生命周期(defer unregister)

然后 go client.writePump(), 其内部从 client.send chan 接收消息,并通过 c.conn.NextWriter() + w.Write() + w.Close() 发消息

然后 go client.readPump(), 循环从 conn.ReadMessage() 并发送到 c.hub.broadcast chan 里

二、协议详情

websocket 是基于 tcp 的,是应用层协议。

websocket 只是利用 http协议,然后加上一些特殊的header头进行握手Upgrade升级操作,升级成功后就跟http没有任何关系了,之后就用websocket的数据格式进行收发数据。。

什么是web端即时通讯技术?

可以理解为实现这样一种功能:服务器端可以即时地将数据的更新或变化反应到客户端,例如消息推送等功能都是通过这种技术实现的。

但是在Web中,由于浏览器的限制,实现即时通讯需要借助一些方法。这种限制出现的主要原因是,一般的Web通信都是浏览器先发送请求到服务器,服务器再进行响应完成数据的现实更新。

Web端实现即时通讯主要有四种方式:轮询、长轮询(comet)、长连接(SSE)、WebSocket。

它们大体可以分为两类,一种是在HTTP基础上实现的,包括短轮询、长轮询(comet)、长连接(SSE);另一种不是在HTTP基础上实现是,即WebSocket。下面分别介绍一下这四种轮询方式。

2.1 协议升级

出于兼容性的考虑,websocket 的握手使用 HTTP 来实现,客户端的握手消息就是一个「普通的,带有 Upgrade 头的,HTTP Request 消息」。

📢 想建立websoket连接,就需要在http请求上带一些特殊的header头才行!

我们看下WebSocket协议客户端请求和服务端响应示例,关于http这里就不多介绍了(这里自行回想下Http请求的request和reposone部分)

header头的意思是,浏览器想升级http协议,并且想升级成websocket协议

客户端请求:

以下是WebSocket请求头中的一些字段:

Upgrade: websocket   // 1
Connection: Upgrade  // 2
Sec-WebSocket-Key: xx==  // 3
Origin: http:                        // 4
Sec-WebSocket-Protocol: chat, superchat  // 5
Sec-WebSocket-Version: 13  // 6

上述字段说明如下:

  1. Upgrade:字段必须设置 websocket,表示希望升级到 WebSocket 协议
  2. Connection:须设置 Upgrade,表示客户端希望连接升级
  3. Sec-WebSocket-Key:是随机的字符串,服务器端会用这些数据来构造出一个 SHA-1 的信息摘要
  4. Origin:字段是可选的,只包含了协议和主机名称
  5. Sec-WebSocket-Extensions:用于协商本次连接要使用的 WebSocket 扩展
  6. Sec-WebSocket-Version:表示支持的 WebSocket 版本,RFC6455 要求使用的版本是 13

服务端响应

HTTP/1.1 101 Web Socket Protocol Handshake  // 1
Connection: Upgrade  // 2
Upgrade: websocket  // 3
Sec-WebSocket-Accept: 2mQFj9iUA/Nz8E6OA4c2/MboVUk=  //4

上述字段说明如下:

  1. 101 响应码确认升级到 WebSocket 协议
  2. Connection:值为 “Upgrade” 来指示这是一个升级请求
  3. Upgrade:表示升级为 WebSocket 协议
  4. Sec-WebSocket-Accept:签名的键值验证协议支持

🚩 1:ws 协议默认使用 80 端口,wss 协议默认使用 443 端口,和 http 一样 🚩 2:WebSocket 没有使用 TCP 的“IP 地址 + 端口号”,开头的协议名不是“http”,引入的是两个新的名字:“ws”和“wss”,分别表示明文和加密的 WebSocket 协议

2.2 连接确认

发建立连接是前提,但是只有当请求头参数Sec-WebSocket-Key字段的值经过固定算法加密后的数据和响应头里的Sec-WebSocket-Accept的值保持一致,该连接才会被认可建立。

如下图从浏览器截图的两个关键参数:

服务端返回的响应头字段 Sec-WebSocket-Accept 是根据客户端请求 Header 中的Sec-WebSocket-Key计算出来。

那么时如何进行参数加密验证和比对确认的呢,如下图!

具体流程如下:

  • 客户端握手中的 Sec-WebSocket-Key 头字段的值是16字节随机数,并经过base64编码
  • 服务端需将该值和固定的 GUID 字符串( 258EAFA5-E914-47DA-95CA-C5AB0DC85B11)拼接后使用 SHA-1 进行哈希,并采用 base64 编码后
  • 服务端将编码后的值作为响应作为的Sec-WebSocket-Accept 值返回。
  • 客户端也必须按照服务端生成 Sec-WebSocket-Accept 的方式一样生成字符串,与服务端回传的进行对比
  • 相同就是协议升级成功,不同就是失败

在协议升级完成后websokcet就建立完成了,接下来就是客户端和服务端使用websocket进行数据传输通信了!

2.3 数据帧格式

一旦升级成功 WebSocket 连接建立后,后续数据都以帧序列的形式传输

📄 协议规定了数据帧的格式,服务端要想给客户端推送数据,必须将要推送的数据组装成一个数据帧,这样客户端才能接收到正确的数据;同样,服务端接收到客户端发送的数据时,必须按照帧的格式来解包,才能真确获取客户端发来的数据

我们来看下对帧的格式定义吧!

看看数据帧字段代表的含义吧:

  1. FIN 1个bit位,用来标记当前数据帧是不是最后一个数据帧
  2. RSV1, RSV2, RSV3 这三个,各占用一个bit位用做扩展用途,没有这个需求的话设置位0
  3. Opcode 的值定义的是数据帧的数据类型

值为1 表示当前数据帧内容是文本

值为2 表示当前数据帧内容是二进制

值为8表示请求关闭连接

  1. MASK 表示数据有没有使用掩码

服务端发送给客户端的数据帧不能使用掩码,客户端发送给服务端的数据帧必须使用掩码

  1. Payload len 数据的长度,Payload data的长度,占7bits,7+16bits,7+64bits
  2. Masking-key 数据掩码 (设置位0,则该部分可以省略,如果设置位1,则用来解码客户端发送给服务端的数据帧)
  3. Payload data 帧真正要发送的数据,可以是任意长度

上面我们说到Payload len三种长度(最开始的7bit的值)来标记数据长度,这里具体看下是哪三种:

🚩 情况1:值设置在0-125

那么这个有效载荷长度(Payload len)就是对应的数据的值

🚩 情况2:值设置为126

如果设置为 126,可表示payload的长度范围在 126~65535 之间,那么接下来的 2 个字节(扩展用16bit Payload长度)会包含Payload真实数据长度

🚩 情况3:值设置为127

可表示payload的长度范围在 >=65535 ,那么接下来的 8 个字节(扩展用16bit + 32bit + 16bit Payload长度)会包含Payload真实数据长度,这种情况能表示的数据就很大了,完全够用

2.3.1 常见状态码

1000 CLOSE_NORMAL 连接正常关闭
1001 CLOSE_GOING_AWAY 终端离开 例如:服务器错误,或者浏览器已经离开此页面
1002 CLOSE_PROTOCOL_ERROR 因为协议错误而中断连接
1003 CLOSE_UNSUPPORTED 端点因为受到不能接受的数据类型而中断连接
1004 保留
1005 CLOSE_NO_STATUS 保留, 用于提示应用未收到连接关闭的状态码
1006 CLOSE_ABNORMAL 期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧)
1007 Unsupported Data 收到的数据帧类型不一致而导致连接关闭
1008 Policy Violation 收到不符合约定的数据而断开连接
1009 CLOSE_TOO_LARGE 收到的消息数据太大而关闭连接
1010 Missing Extension 客户端因为服务器未协商扩展而关闭
1011 Internal Error 服务器因为遭遇异常而关闭连接
1012 Service Restart 服务器由于重启而断开连接
1013 Try Again Later 服务器由于临时原因断开连接, 如服务器过载因此断开一部分客户端连接
1015 TLS握手失败关闭连接

三、code

3.1 数据结构

3.1.1 Upgrader

Upgrader指定用于将 HTTP 连接升级到 WebSocket 连接

type Upgrader struct {
    HandshakeTimeout time.Duration // 握手
    
    ReadBufferSize, WriteBufferSize int
    
    WriteBufferPool BufferPool
    
    Subprotocols []string
    
    Error func(w http.ResponseWriter, r *http.Request, status int, reason error)
    CheckOrigin func(r *http.Request) bool // 跨域
    EnableCompression bool
}

3.1.2 Conn

Conn 表示 WebSocket连接,这个结构体的组成包括两部分,写入字段(Write fields)和 读取字段(Read fields)

type Conn struct {
    conn        net.Conn
    isServer    bool
    
    // Write fields
    writeBuf      []byte        // frame is constructed in this buffer.
    writePool     BufferPool
    writeBufSize  int
    writeDeadline time.Time
    writer        io.WriteCloser // the current writer returned to the application
    isWriting     bool           // for best-effort concurrent write detection
    
    // Read fields
    readRemaining int64
    readFinal     bool  // true the current message has more frames.
    readLength    int64 // Message size.
    readLimit     int64 // Maximum message size.
    messageReader *messageReader // the current low-level reader
}

首先利用 http 协议建立连接:

3.2 demo

server:

package main

import (
    "github.com/gorilla/websocket"
    log "github.com/sirupsen/logrus"
    "net/http"
    "time"
)

var (
    upgrader = websocket.Upgrader{}
)

func main() {
    // set up a http server
    http.HandleFunc("/abc", wsHandler)
    log.Info("http server started at :9123")
    err := http.ListenAndServe(":9123", nil)
    if err != nil {
       log.Errorf("%v server failed", time.Now().Format(time.TimeOnly))
       panic(err)
    }
}

func wsHandler(w http.ResponseWriter, r *http.Request) {
    // upgrade the http connection to a websocket connection
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
       return
    }
    defer conn.Close()

    for {
       // read the message from the websocket connection
       msgType, p, err := conn.ReadMessage()
       if err != nil {
          log.Error(err)
          return
       }
       log.Infof("%v server recv: %v", time.Now().Format(time.TimeOnly), string(p))

       // write the message back to the websocket connection
       msg := string(p) + "123"
       log.Infof("%v server send: %v", time.Now().Format(time.TimeOnly), msg)
       if err := conn.WriteMessage(msgType, []byte(msg)); err != nil {
          log.Error(err)
          return
       }
    }
}

client:

package main

import (
    "github.com/gorilla/websocket"
    log "github.com/sirupsen/logrus"
    "time"
)

func main() {
    // set up a ws client
    url := "ws://localhost:9123/abc"
    conn, _, err := websocket.DefaultDialer.Dial(url, nil)
    if err != nil {
       log.Error(err)
       return
    }
    defer conn.Close()

    // 子协程定时发送
    ticker := time.NewTicker(30 * time.Second)
    go func() {
       for range ticker.C {
          // write a message to the ws server
          msg := "hi"
          log.Infof("%v client send: %v", time.Now().Format(time.TimeOnly), msg)
          if err := conn.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil {
             log.Error(err)
             return
          }
       }
    }()

    // 主线程接收
    for {
       // read the message from the ws server
       _, msg, err := conn.ReadMessage()
       if err != nil {
          log.Error(err)
          return
       }
       log.Infof("%v client recv: %v", time.Now().Format(time.TimeOnly), string(msg))
    }
}

运行效果如下:

// client 端效果
➜  awesomeProject2 go run ./client.go
INFO[0030] 11:35:09 client send: hi                     
INFO[0030] 11:35:09 client recv: hi123                  
INFO[0060] 11:35:39 client send: hi                     
INFO[0060] 11:35:39 client recv: hi123 

// server 端效果
➜  awesomeProject2 go run ./server.go
INFO[0000] http server started at :9123                 
INFO[0032] 11:35:09 server recv: hi                     
INFO[0032] 11:35:09 server send: hi123                  
INFO[0062] 11:35:39 server recv: hi                     
INFO[0062] 11:35:39 server send: hi123 

3.3 Server

3.3.1 Upgrade() 协议升级

func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) {
    // 首先检查协议
    if !tokenListContainsValue(r.Header, "Connection", "upgrade") {return err}
    if !tokenListContainsValue(r.Header, "Upgrade", "websocket") {return err}
    if r.Method != http.MethodGet {return err}
    if !tokenListContainsValue(r.Header, "Sec-Websocket-Version", "13") {return err}
    if _, ok := responseHeader["Sec-Websocket-Extensions"]; ok {return err}
    
    // 检测跨域:期望 requestHeader["Origin"] 和 request.Host 相同
    if !checkOrigin(r) {return err}
    
    // 校验 Sec-Websocket-Key: base64(如WfVq8trYPQpMCekdJsjENw==)解码长度为 16
    challengeKey := r.Header.Get("Sec-Websocket-Key")
    if !isValidChallengeKey(challengeKey) {return err}
    
    subprotocol := u.selectSubprotocol(r, responseHeader) // 子协议
    
    if u.EnableCompression {compress = true} // 压缩
    
    
    netConn, brw, err := http.NewResponseController(w).Hijack() // 调用者 hijacker 截取 net 连接,从此以后 http server 库不会做任何操作,详见 golang 的 type Hijacker interface{}
    // 在Go语言中,`http.Hijacker`是一个接口,用于支持HTTP服务器与客户端之间的低级别连接操作。它允许HTTP协议的各个部分(如请求和响应)获取底层网络连接,以便执行更高级别的操作。
    // 其中,`Hijack()`方法用于从HTTP连接中分离底层网络连接,返回一个`net.Conn`类型的连接对象,以及一个`*bufio.ReadWriter`对象,用于读取和写入数据。通过这个底层连接,你可以执行一些高级别的操作,比如升级协议、使用自定义协议等。
    // 需要注意的是,`Hijack()`方法只能在支持升级协议的请求上调用,例如WebSocket协议。在其他普通的HTTP请求上调用`Hijack()`方法可能会导致错误。
    
    // 校验:握手前,client 不能提前发数据
    if brw.Reader.Buffered() > 0 {return err}
    
    // 从`bufio.Writer`中提取缓冲区的内容,并将缓冲区内容以字节切片的形式返回。
    // 首先,通过创建一个`writeHook`类型的变量`wh`,调用`bw.Reset(&wh)`将`writeHook`作为参数传递给`bufio.Writer`的`Reset`方法。这样可以将写入缓冲区的数据传递给`writeHook`。
    // 接下来,通过调用`bw.WriteByte(0)`在缓冲区中写入一个字节0。然后,通过调用`bw.Flush()`将缓冲区的内容刷新到底层的写入器。
    // 接着,通过调用`bw.Reset(originalWriter)`重新设置`bufio.Writer`,将原始的写入器`originalWriter`作为底层写入器。
    // 最后,返回`writeHook`中缓冲区的内容,即`wh.p[:cap(wh.p)]`。
    buf := bufioWriterBuffer(netConn, brw.Writer)
    
    // 待写的 buf
    var writeBuf []byte
    
    // 创建 websocket 库自己的连接
    c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize, u.WriteBufferPool, br, writeBuf)
    
    // 构造 response header 到 p
    // HTTP/1.1 101 Switching Protocols
    // Upgrade: websocket
    // Connection: Upgrade
    // Sec-WebSocket-Accept: GK8uSfOGf+3eQlAfGkSWKC7L7fQ=
    p := buf
    p = p[:0] // 先清空
    p = append(p, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...)
    p = append(p, computeAcceptKey(challengeKey)...)
    p = append(p, "\r\n"...)
    
    // 清空 hijack 前的 http conn 的 deadline, 使无 deadline
    if err := netConn.SetDeadline(time.Time{}); err != nil {return err}
    
    // 写入上文构造的 response header 即 p
    if _, err = netConn.Write(p); err != nil {return err}
    
    // 返回最终构造的 websocket.Conn
    return c, nil
}


给 Conn 设置完 response header 后,此 websocket 的连接,就都会附带此信息。

整体流程如下:

3.3.2 computeAcceptKey() 计算接受密钥

websocket 只有当 request header 里的 Sec-WebSocket-Key 字段的值经过固定算法加密后的数据,和 response header 里的 Sec-WebSocket-Accept 的值保持一致,该连接才会被认可建立。

var keyGUID = []byte("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")

func computeAcceptKey(challengeKey string) string {
	h := sha1.New() 
	h.Write([]byte(challengeKey))
	h.Write(keyGUID)
	return base64.StdEncoding.EncodeToString(h.Sum(nil))
}

服务端需将Sec-WebSocket-Key,和固定的 GUID 字符串( 258EAFA5-E914-47DA-95CA-C5AB0DC85B11)拼接后,使用 SHA-1 进行哈希,并采用 base64 编码后返回。

3.4 Client

Dialer 就是客户端的意思(并没有定义 Client 数据结构)而是通过 Dialer struct、DefaulrDialer 变量和 Dial() 函数实现的。

3.4.1 Dialer

type Dialer struct {

}

3.4.2 Dial() 和 DialContext()

3.5 Conn

绝大多数逻辑,都在 Conn 里,Server 和 Dialer 都复用此,实现了服务端和客户端的逻辑。


http://www.kler.cn/a/234106.html

相关文章:

  • 玩转ChatGPT:文献阅读 v2.0
  • js中typeOf无法区分数组对象
  • 解决VsCode无法跳转问题
  • Docker 的安装与使用
  • 网络基础Linux
  • 在JPA和EJB中用乐观锁解决并发问题
  • 【数据结构与算法】【小白也能学的数据结构与算法】递归 分治 迭代 动态规划 无从下手?一文通!!!
  • Linux满载CPU和运行内存的方法
  • 中文点选识别
  • c#安全-nativeAOT
  • 【java】Hibernate访问数据库
  • 快团团大团长加了微信都不回复怎么回事?如果自己做快团团预算需要多少,团队需要几个人
  • Java 将TXT文本文件转换为PDF文件
  • 前端-Vue項目初始化
  • AJAX——认识URL
  • 灰度发布浅见
  • Flink-CDC实时读Postgresql数据
  • HiveSQL——连续增长问题
  • ECMAScript Modules规范的示例详解
  • 微信小程序(四十)API的封装与调用
  • 【J2EE笔记】2万字!我当时学习时做的笔记
  • vue3 之 商城项目—项目搭建起步
  • JAVA学习笔记9
  • 力扣[面试题 01.02. 判定是否互为字符重排(哈希表,位图)
  • 使用client-only 解决组件不兼容SSR问题
  • 【十四】【C++】list 的常见用法