go+redis基于tcp实现聊天室
go+redis实现聊天室
基于tcp连接通过redis实现了消息的广播,命令改名,查询在线人数,查询用户活跃度
server+clinet
server
聊天室服务端的流程可以分为几个主要部分:初始化、监听连接、处理每个连接以及消息的处理和转发
1. 初始化
- 程序启动时,
init()
函数会执行。 init()
函数中删除了Redis中的旧数据,包括用户信息、在线状态和活跃度排名。
2. 监听TCP连接
- 在
main()
函数中,程序开始监听TCP端口9527。 - 如果监听成功,程序将进入一个无限循环,等待新的客户端连接。
- 当有新的客户端连接时,
Accept()
方法接受该连接,并为每个新连接启动一个新的goroutine来处理它(handleConnection(conn)
)。
3. 处理每个连接
handleConnection()
函数是每个客户端连接的主要处理函数。- 它首先调用
userAdd(conn)
将新用户的地址添加到Redis中,并设置其在线状态。 - 然后订阅两个Redis频道:“channel1”用于接收广播消息,另一个是基于客户端远程地址的个人频道。
- 启动三个goroutine:
readClient(conn, quit)
用于从客户端读取消息。broadCast(conn, sub)
用于从“channel1”频道接收并转发广播消息给客户端。personalMessage(conn, sub1)
用于从个人频道接收并转发个人消息给客户端。
- 最后,它等待
quit
通道接收到信号,表示客户端已断开连接,然后调用userRemove(conn)
移除用户信息。
4. 消息处理
readClient()
函数负责从客户端读取原始消息,并通过module.Decode(reader)
解码。- 解码后的消息被传递给
writeToRedis(msg, conn, quit)
函数,该函数将消息推送到Redis列表messageQueue
中。 messageProcessing(conn, quit)
函数从messageQueue
中弹出消息,并根据消息格式进行处理。- 根据消息的目标类型(个人或频道),分别调用
handleIndividualCommands()
或handleChannelCommands()
来处理具体的命令。
5. 命令处理
handleIndividualCommands()
和handleChannelCommands()
分别处理个人消息和频道消息。- 对于个人消息,可能的命令包括更改用户名、显示菜单等。
- 对于频道消息,可能的命令包括获取在线用户列表、退出聊天室等。
- 每个命令都有相应的逻辑来更新Redis数据库,并向相关的频道发布消息。
6. 用户管理
userAdd()
和userRemove()
函数用于在用户连接和断开时更新Redis中的用户信息。changeName()
函数允许用户更改他们的昵称,并确保昵称不重复。getOnlineUsers()
函数返回当前在线的用户数量。topActiveUsers()
函数返回活跃度最高的前五名用户。
7. 活跃度更新
- 每次用户发送消息时,都会调用
updateActivity(conn)
来增加该用户的活跃度分数。
package main
import (
"bufio"
"chatRoom/chatRoom4.0/module"
"context"
"fmt"
"github.com/redis/go-redis/v9"
"io"
"log"
"net"
"strconv"
"strings"
"time"
)
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6380",
Password: "",
DB: 2,
})
func init() {
err := rdb.Del(ctx, "usersName").Err()
if err != nil {
log.Fatalf("删除哈希键失败: %v", err)
}
err = rdb.Del(ctx, "usersOnline").Err()
if err != nil {
log.Fatalf("删除哈希键失败: %v", err)
}
err = rdb.Del(ctx, "activityRanking").Err()
if err != nil {
log.Fatalf("删除有序集合键失败: %v", err)
}
}
func main() {
fmt.Println("开始监听tcp连接端口")
listener, err := net.Listen("tcp", "127.0.0.1:9527")
if err != nil {
log.Fatalf("tcp连接端口监听失败: %v", err)
}
defer listener.Close()
fmt.Println("tcp连接端口连接成功")
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("接受连接失败: %v", err)
continue
}
go handleConnection(conn)
}
}
func handleConnection(conn net.Conn) {
defer conn.Close()
userAdd(conn)
sub := rdb.Subscribe(ctx, "channel1")
sub1 := rdb.Subscribe(ctx, conn.RemoteAddr().String())
quit := make(chan bool)
go readClient(conn, quit)
go broadCast(conn, sub)
go personalMessage(conn, sub1)
<-quit // 等待客户端断开连接
userRemove(conn)
}
func userAdd(conn net.Conn) {
err := rdb.HSet(ctx, "usersName", conn.RemoteAddr().String(), conn.RemoteAddr().String()).Err()
if err != nil {
log.Printf("用户信息添加失败: %v", err)
} else {
log.Printf("用户信息添加成功: %s", conn.RemoteAddr().String())
}
err = rdb.HSet(ctx, "usersOnline", conn.RemoteAddr().String(), "online").Err()
if err != nil {
log.Printf("用户在线信息添加失败: %v", err)
} else {
log.Printf("用户在线信息添加成功: %s", conn.RemoteAddr().String())
}
err = rdb.ZAdd(ctx, "activityRanking", redis.Z{0, conn.RemoteAddr().String()}).Err()
if err != nil {
log.Printf("用户活跃信息添加失败: %v", err)
} else {
log.Printf("用户活跃信息添加成功: %s", conn.RemoteAddr().String())
}
}
func changeName(conn net.Conn, newName string) int {
data, err := rdb.HGetAll(ctx, "usersName").Result()
if err != nil {
fmt.Println("", err)
}
for _, val := range data {
if val == newName {
return 2
}
}
oldName, err := rdb.HSet(ctx, "usersName", conn.RemoteAddr().String(), newName).Result()
if err != nil {
log.Printf("名字修改失败: %v", err)
return 0
} else {
log.Printf("名字修改成功 <旧名字> %s <新名字> %s", strconv.Itoa(int(oldName)), newName)
return 1
}
}
func personalMessage(conn net.Conn, sub1 *redis.PubSub) {
for {
log.Println("开始接收" + conn.RemoteAddr().String() + "个人订阅消息")
msg, err := sub1.ReceiveMessage(ctx)
if err != nil {
log.Printf("接收消息失败: %v", err)
return
}
log.Printf("接收处理过后消息 %s: %s", msg.Channel, msg.Payload)
king, err := module.Encode(msg.Payload + "\n")
if err != nil {
log.Printf("编码消息失败: %v", err)
continue
}
_, err = conn.Write(king)
if err != nil {
log.Printf("发送消息失败: %v", err)
return
}
log.Println("个人信息发送成功")
}
}
func broadCast(conn net.Conn, sub *redis.PubSub) {
for {
log.Println("开始接收channel1广播消息")
msg, err := sub.ReceiveMessage(ctx)
if err != nil {
log.Printf("接收消息失败: %v", err)
return
}
log.Printf("接收消息 %s: %s", msg.Channel, msg.Payload)
king, err := module.Encode(msg.Payload + "\n")
if err != nil {
log.Printf("编码消息失败: %v", err)
continue
}
_, err = conn.Write(king)
if err != nil {
log.Printf("发送消息失败: %v", err)
return
}
}
}
func readClient(conn net.Conn, quit chan bool) {
reader := bufio.NewReader(conn)
for {
msg, err := module.Decode(reader)
if err == io.EOF {
select {
case quit <- true:
default:
return
}
}
if err != nil {
log.Printf("解码消息失败: %v", err)
select {
case quit <- true:
default:
return
}
}
if len(msg) == 0 {
continue
}
log.Printf("收到client发来的数据: %s", msg)
writeToRedis(msg, conn, quit)
}
}
func writeToRedis(msg string, conn net.Conn, quit chan bool) {
log.Printf("接受到的信息转存redis: %s", msg)
err := rdb.RPush(ctx, "messageQueue", msg).Err()
if err != nil {
log.Printf("信息转存redis失败: %v", err)
} else {
log.Printf("接受到的信息转存redis成功: %s", msg)
}
messageProcessing(conn, quit)
}
func messageProcessing(conn net.Conn, quit chan bool) {
log.Printf("开始读取redis队列消息")
result, err := rdb.LPop(ctx, "messageQueue").Result()
if err != nil {
if err == redis.Nil {
log.Println("消息队列已空")
} else {
log.Fatalf("redis队列信息弹出失败: %v", err)
}
return
}
log.Printf("redis队列弹出信息为: %s", result)
time.Sleep(time.Second)
parts := strings.SplitN(result, ":", 4)
if len(parts) != 4 {
log.Println("无效的消息格式")
return
}
targetType, target, specialMark, message := parts[0], parts[1], parts[2], parts[3]
switch targetType {
case "individual":
log.Println("individual")
handleIndividualCommands(target, specialMark, message, conn)
case "channel":
log.Println("信息为channel类型")
handleChannelCommands(target, specialMark, message, conn, quit)
default:
log.Printf("未知的目标类型: %s", targetType)
}
updateActivity(conn)
}
func getUsersChannel(target string) (channelName string) {
data, err := rdb.HGetAll(ctx, "usersName").Result()
if err != nil {
fmt.Println("", err)
}
for key, val := range data {
if val == target {
channelName = key
}
}
return channelName
}
func getUsersName(conn net.Conn) (userName string) {
data, err := rdb.HGetAll(ctx, "usersName").Result()
if err != nil {
fmt.Println("", err)
}
for key, val := range data {
if key == conn.RemoteAddr().String() {
userName = val
}
}
return userName
}
func handleIndividualCommands(target, specialMark, message string, conn net.Conn) {
log.Println("开始处理Individual信息")
switch specialMark {
case "changeNameFirst":
isSuc := changeName(conn, message)
if isSuc == 1 {
rdb.Publish(ctx, conn.RemoteAddr().String(), "individual:"+getUsersName(conn)+":changeNameFirst1:"+message)
rdb.Publish(ctx, "channel1", "channel:channel1:系统消息:用户 < "+getUsersName(conn)+" > 上线了!")
} else if isSuc == 2 {
rdb.Publish(ctx, conn.RemoteAddr().String(), "individual:"+getUsersName(conn)+":changeNameFirst0:"+"名字重复,请更换名字再次尝试")
} else {
rdb.Publish(ctx, conn.RemoteAddr().String(), "individual:"+getUsersName(conn)+":changeNameFirst2:"+"未知原因导致改名失败")
}
case "menu":
menu := " \n * ./cd1 或 ./menu 功能菜单\n * ./cd2 或 ./changeName 更改昵称\n * ./cd3 或 ./online 在线用户数量查询\n*./cd4 或 ./ activity 查询活跃排行\n * ./cd5 或 ./quit 退出聊天室\n"
rdb.Publish(ctx, getUsersChannel(target), "individual:"+getUsersName(conn)+":系统消息:"+menu)
case "changeName":
isSuc := changeName(conn, message)
if isSuc == 1 {
rdb.Publish(ctx, conn.RemoteAddr().String(), "individual:"+getUsersName(conn)+":changeName1:"+message)
} else if isSuc == 2 {
rdb.Publish(ctx, conn.RemoteAddr().String(), "individual:"+getUsersName(conn)+":changeNameFirst0:"+"名字重复,请更换名字再次尝试")
} else {
rdb.Publish(ctx, conn.RemoteAddr().String(), "individual:"+getUsersName(conn)+":changeNameFirst2:"+"未知原因导致改名失败")
}
case "":
rdb.Publish(ctx, getUsersChannel(target), "individual:"+getUsersName(conn)+":系统消息:"+message)
default:
rdb.Publish(ctx, conn.RemoteAddr().String(), "信息格式错误")
log.Printf("未知的个人命令: %s", specialMark)
}
}
func handleChannelCommands(target, specialMark, message string, conn net.Conn, quit chan bool) {
log.Println("开始处理channel信息")
switch specialMark {
case "onlineList":
onlineList, err := getOnlineUsers()
if err != nil {
log.Printf("获取在线列表失败: %v", err)
} else {
rdb.Publish(ctx, "channel1", "channel:channel1:"+getUsersName(conn)+":"+onlineList)
}
case "quit":
log.Println("用户退出", getUsersName(conn))
quit <- true
case "activity":
topUsers := topActiveUsers()
activityMsg := "< 系统消息 > 活跃度排名\n"
data, err := rdb.HGetAll(ctx, "usersName").Result()
if err != nil {
log.Println(err)
}
for i, user := range topUsers {
name := ""
for field, val := range data {
if user == field {
name = val
log.Println("name:", name)
}
}
activityMsg += fmt.Sprintf(" 第%d名 : %s\n", i+1, name)
}
rdb.Publish(ctx, "channel1", "channel:channel1:系统消息:"+message+activityMsg)
default:
rdb.Publish(ctx, "channel1", "channel:channel1:"+getUsersName(conn)+":"+message)
}
}
func userRemove(conn net.Conn) {
name := getUsersName(conn)
rdb.HDel(ctx, "usersName", conn.RemoteAddr().String())
rdb.HDel(ctx, "usersOnline", conn.RemoteAddr().String())
rdb.ZRem(ctx, "activityRanking", conn.RemoteAddr().String())
msg := "用户已下线:" + name
rdb.Publish(ctx, "channel1", "channel:channel1:"+"系统消息:"+msg)
log.Printf("用户已下线: %s", name)
}
func getOnlineUsers() (string, error) {
data, err := rdb.HGetAll(ctx, "usersOnline").Result()
if err != nil {
return "", err
}
count := 0
for _, val := range data {
if val == "online" {
count++
}
}
return "当前在线人数为" + strconv.Itoa(count) + "人", nil
}
func topActiveUsers() []string {
members, err := rdb.ZRevRangeWithScores(ctx, "activityRanking", 0, -1).Result()
if err != nil {
log.Printf("无法获取活跃用户排名: %v", err)
return nil
}
var usernames []string
for _, member := range members {
if len(usernames) >= 5 {
return usernames
}
usernames = append(usernames, member.Member.(string))
}
return usernames
}
func updateActivity(conn net.Conn) {
log.Printf("用户发言增加活跃度: %s", conn.RemoteAddr().String())
rdb.ZIncrBy(ctx, "activityRanking", 1, conn.RemoteAddr().String())
log.Printf("活跃度增加成功: %s", conn.RemoteAddr().String())
}
clinet
1. 初始化连接
- 在
main()
函数中,客户端尝试通过TCP协议连接到运行在本地(127.0.0.1
)9527端口上的服务器。 - 如果连接失败,程序将打印错误信息并退出。
- 连接成功后,客户端会提示用户输入昵称,并将这个昵称通过特定格式的消息发送给服务器以设置用户的初始昵称。
2. 用户输入处理
- 客户端使用一个无限循环来读取用户的命令或消息。
- 对于每个用户输入,客户端会根据输入的内容构造不同的消息:
./cd1
或./menu
:显示功能菜单。./cd2
或./changeName
:允许用户更改昵称。./cd3
或./online
:查询在线用户数量。./cd4
或./activity
:查询活跃度排名。./cd5
或./quit
:退出聊天室。- 其他输入被视为普通消息,发送到频道
channel1
。
3. 消息编码和发送
- 构造好的消息会被编码并通过TCP连接发送到服务器。
- 如果编码或发送过程中出现错误,客户端会打印错误信息并退出。
4. 接收服务器消息
readMsg()
函数在一个单独的goroutine中运行,用于从服务器接收消息。- 当接收到消息时,
readMsg()
会解码消息,并根据消息类型(个人或频道)以及特殊标记进行处理。 - 例如,如果消息是关于昵称变更成功的确认,则更新本地存储的用户名
name
并打印相应的系统消息。 - 如果消息是退出指令,则向
exit
通道发送信号,通知主函数关闭连接并退出。
5. 退出逻辑
- 主函数中的无限循环监听
exit
通道。 - 当
exit
通道接收到信号时,主函数打印退出信息并结束程序。
6. 辅助函数
getUserInput(prompt string)
函数用于从标准输入读取用户输入,并返回去掉前后空白的字符串。- 根据不同的提示信息,该函数会在控制台上打印对应的提示。
流程图简述
- 启动:客户端启动并尝试连接服务器。
- 连接成功:如果连接成功,客户端提示用户输入昵称,并将昵称发送给服务器。
- 用户输入循环:客户端进入一个无限循环,等待用户输入命令或消息。
- 消息构造与发送:根据用户输入构造消息,编码后发送给服务器。
- 接收消息:另一个goroutine不断从服务器接收消息,并根据消息内容执行相应操作。
- 退出:当用户选择退出或者服务器断开连接时,客户端退出循环并关闭连接。
package main
import (
"bufio"
"chatRoom/chatRoom4.0/module"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"time"
)
var name string
func main() {
conn, err := net.Dial("tcp", "127.0.0.1:9527")
if err != nil {
fmt.Println("服务器连接失败 err =", err)
return
}
defer conn.Close()
fmt.Println("服务器连接成功")
getName := getUserInput("请输入你的昵称:")
data, err := module.Encode("individual::changeNameFirst:" + getName)
if err != nil {
fmt.Println("encode msg failed, err:", err)
return
}
_, err = conn.Write(data)
if err != nil {
fmt.Println("发送数据失败1 err =", err)
}
var exit = make(chan bool)
defer close(exit)
fmt.Println("--------------欢迎进入多人聊天室系统----------------")
fmt.Println(" * ./cd1 或 ./menu 功能菜单")
fmt.Println(" * ./cd2 或 ./changeName 更改昵称")
fmt.Println(" * ./cd3 或 ./online 在线用户数量查询")
fmt.Println(" * ./cd4 或 ./activity 在线用户数量查询")
fmt.Println(" * ./cd5 或 ./quit 退出聊天室")
fmt.Println("---------------指令字母不区分大小写-----------------")
go readMsg(conn, exit)
go func() {
for {
msg := getUserInput("")
switch {
case strings.EqualFold(msg, "./cd1") || strings.EqualFold(msg, "./menu"):
msg = "individual:" + name + ":menu:"
case strings.EqualFold(msg, "./cd2") || strings.EqualFold(msg, "./changeName"):
newMsg := getUserInput("请输入新的昵称:")
msg = "individual:" + name + ":changeName:" + newMsg
case strings.EqualFold(msg, "./cd3") || strings.EqualFold(msg, "./online"):
msg = "channel:channel1:onlineList:"
case strings.EqualFold(msg, "./cd4") || strings.EqualFold(msg, "./activity"):
msg = "channel:channel1:activity:"
case strings.EqualFold(msg, "./cd5") || strings.EqualFold(msg, "./quit"):
msg = "channel:channel1:quit:"
default:
msg = "channel:channel1::" + msg
}
data, err = module.Encode(msg)
if err != nil {
fmt.Println("encode msg failed, err:", err)
return
}
_, err = conn.Write(data)
if err != nil {
fmt.Println("发送数据失败2 err =", err)
return
}
}
}()
for {
select {
case <-exit:
fmt.Println("退出成功")
return
}
}
}
func getUserInput(prompt string) string {
time.Sleep(time.Millisecond * 100)
switch prompt {
case "请输入你的昵称:":
fmt.Print("请输入你的昵称:")
case "请输入新的昵称:":
fmt.Println("请输入新的昵称:")
}
reader := bufio.NewReader(os.Stdin)
input, err := reader.ReadString('\n')
if err != nil {
fmt.Println("用户输入获取失败:err =", err)
return "客户端信息读取错误"
}
return strings.TrimSpace(input)
}
func readMsg(conn net.Conn, exit chan bool) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
msg, err := module.Decode(reader)
if err == io.EOF {
fmt.Println("服务器连接已断开 ")
exit <- true
}
if err != nil {
fmt.Println("服务器断开连接 2 err =", err)
return
}
if msg == "" {
continue
}
parts := strings.SplitN(msg, ":", 4)
if len(parts) != 4 {
log.Println("无效的消息格式")
return
}
targetType, target, specialMark, message := parts[0], parts[1], parts[2], parts[3]
message = strings.TrimRight(message, "\r\n")
switch targetType {
case "individual":
switch specialMark {
case "changeNameFirst1":
fmt.Println("【 ", time.Now().Format("15:04"), " 】你当前昵称为:", target)
name = target
case "changeName1":
fmt.Println("【 ", time.Now().Format("15:04"), " 】昵称为修改成功")
fmt.Println("【 ", time.Now().Format("15:04"), " 】你当前昵称为:", target)
name = target
case "quit":
fmt.Println("【 ", time.Now().Format("15:04"), " 】开始退出客户端...")
exit <- true
case "系统消息":
fmt.Println("【 ", time.Now().Format("15:04"), " 】< 系统消息 >"+message)
default:
fmt.Println("【 ", time.Now().Format("15:04"), " 】< 系统消息 > 未知错误")
}
case "channel":
switch specialMark {
case "系统消息":
fmt.Println("【 ", time.Now().Format("15:04"), " 】< 系统消息 > "+message)
default:
fmt.Println("【 ", time.Now().Format("15:04"), " 】< "+specialMark+" >"+message)
}
}
}
}
tcp防粘包
由于客户端和服务端基于tcp连接会出现粘包现象,因此需要对信息进行封包后再发送,同时接收端也要进行解包
package module
import (
"bufio"
"bytes"
"encoding/binary"
)
func Encode(message string) ([]byte, error) {
// 读取消息的长度,转换成int32类型(占4个字节)
var length = int32(len(message))
var pkg = new(bytes.Buffer)
// 写入消息头
err := binary.Write(pkg, binary.LittleEndian, length)
if err != nil {
return nil, err
}
// 写入消息实体
err = binary.Write(pkg, binary.LittleEndian, []byte(message))
if err != nil {
return nil, err
}
return pkg.Bytes(), nil
}
// Decode 解码消息
func Decode(reader *bufio.Reader) (string, error) {
// 读取消息的长度
lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据
lengthBuff := bytes.NewBuffer(lengthByte)
var length int32
err := binary.Read(lengthBuff, binary.LittleEndian, &length)
if err != nil {
return "", err
}
// Buffered返回缓冲中现有的可读取的字节数。
if int32(reader.Buffered()) < length+4 {
return "", err
}
// 读取真正的消息数据
pack := make([]byte, int(4+length))
_, err = reader.Read(pack)
if err != nil {
return "", err
}
return string(pack[4:]), nil
}