当前位置: 首页 > article >正文

【项目实战】redis实现websocket分布式消息推送服务

由于redis并非专业的MQ中间件,消息的防丢失策略并不完整,存在丢失消息的可能。该方案为在再pc web管理平台的右下角弹出,显示新接收到的消息数,哪怕没有收到这个通知,也可以自己在消息中心看看。所以对可靠性要求不高。如果业务场景要求可靠性高,还是请使用专业的MQ中间件。该方案已在多个实际项目中运行。

流程架构

websocket实现同一账户多点登录、websocket服务多节点部署推送方案。

简单架构图

假设用户A在两个地方登录,连接到两个websocketServer服务节点1和2,用户B连接到2节点。

websocketServer将websocket session保存在各自的Map<String,Session>中,key为userid,value为websocket Session。节点1保存了用户A的websocket session,节点2保存了用户A、B的websocket session。

消息生产者发布消息的时候为json格式,如:[{"receive"="userid_a","msg"="您有1个未读消息"},{"receive"="userid_b","msg"="您有3个未读消息"}],将消息发到redis的一个Channel,如showNewestMsg。

websocketServer中订阅redis的channel=showNewestMsg,收到消息后根据消息中receive冲map中找到对应的websocket session,发消息给客户端。

核心代码

1.该项目为springboot项目,先引入jar包,由于是从实际项目中抽出来写的记录,可能还缺jar请自行导入。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-web</artifactId>

</dependency>

<!--websocket-->

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-websocket</artifactId>

</dependency>

<!-- redis -->

<dependency>

    <groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-data-redis</artifactId>

</dependency>

<!-- 工具类 -->

<dependency>

    <groupId>cn.hutool</groupId>

    <artifactId>hutool-all</artifactId>

    <version>5.3.6</version>

</dependency>

<dependency>

    <groupId>net.sf.json-lib</groupId>

    <artifactId>json-lib</artifactId>

    <version>2.4</version>

    <classifier>jdk15</classifier>

</dependency>

2.websocket配置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**

 * spring websocket组件初始化

 * @author csf

 *

 */

//war包启动tomcat7及以下版本要关闭@Configuration注解,否则将无法启动websocket服务

@Configuration

public class WebSocketConfig

{

    @Bean

    public ServerEndpointExporter serverEndpointExporter()

    {

        return new ServerEndpointExporter();

    }

}

注意:war包启动tomcat7及以下版本要关闭@Configuration注解,否则将无法启动websocket服务。

3.websocket服务端实现

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PostConstruct;

import javax.annotation.Resource;

import javax.websocket.OnClose;

import javax.websocket.OnError;

import javax.websocket.OnMessage;

import javax.websocket.OnOpen;

import javax.websocket.Session;

import javax.websocket.server.PathParam;

import javax.websocket.server.ServerEndpoint;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

import com.kingengine.plug.service.MessageService;

import cn.hutool.core.util.StrUtil;

import net.sf.json.JSONArray;

import net.sf.json.JSONObject;

/**

 * WebSocket服务类

 * @author csf

 * @date 2020年8月10日

 */

@ServerEndpoint("/websocket/{custId}")

@Component

public class WebSocketServer

{

    @Resource

    private MessageService messageService;

     

    Logger log = LoggerFactory.getLogger(this.getClass());

     

    // 当前在线连接数

    private static int onlineCount = 0;

     

    // 存放每个用户对应的WebSocket连接对象,key为custId_HHmmss,确保一个登录用户只建立一个连接

    private static Map<String, Session> webSocketSessionMap = new ConcurrentHashMap<String, Session>();

     

    // 与某个客户端的连接会话,需要通过它来给客户端发送数据

    private Session session;

     

    // 接收用户id

    private String custId = "";

     

    private static WebSocketServer webSocketServer;

     

    // 通过@PostConstruct实现初始化bean之前进行的操作

    @PostConstruct

    public void init()

    {

        // 初使化时将已静态化的webSocketServer实例化

        webSocketServer = this;

        webSocketServer.messageService = this.messageService;

    }

     

    /**

     * 连接建立成功调用的方法

     * @param session 连接会话,由框架创建

     * @param custId 用户id, 为处理用户多点登录都能收到消息,需传该格式custId_HHmmss

     * @author csf

     * @date 2020年8月10日

     */

    @OnOpen

    public void onOpen(Session session, @PathParam("custId") String custId)

    {

        if (!webSocketSessionMap.containsKey(custId))

        {

            this.session = session;

            webSocketSessionMap.put(custId, session);

            addOnlineCount(); // 在线数加1

            log.info("有新连接[{}]接入,当前websocket连接数为:{}", custId, getOnlineCount());

        }

         

        this.custId = custId;

        try

        {

            // 第一次建立连接,推送消息给客户端,只会执行一次。后续的新消息由com.kingengine.plug.redis.RedisReceiver接收到redis订阅消息推送

            // 获取未读消息数

            // 由于前端传进来的custId是有时间后缀的,查询时需要去掉后缀。

            String qryCustId = custId.split("_")[0];

            JSONObject unreadMsg = webSocketServer.messageService.getUnreadCount(qryCustId);

             

            // 获取最新消息

            /*  JSONObject newMsg = webSocketServer.messageService.getNewestMsg(qryCustId);

            // 发送消息

            JSONArray msgArr = new JSONArray();

            if (newMsg!=null)

            {

                msgArr.add(newMsg);

            }*/

            JSONArray msgArr = new JSONArray();

            msgArr.add(unreadMsg);

            sendMessage(custId, msgArr.toString());

        }

        catch (Exception e)

        {

            log.error("客户端连接websocket服务异常");

            e.printStackTrace();

        }

    }

     

    /**

     * 连接关闭调用的方法

     */

    @OnClose

    public void onClose(@PathParam("custId") String sessionKey)

    {

        if (webSocketSessionMap.containsKey(sessionKey))

        {

            try

            {

                webSocketSessionMap.get(sessionKey).close();

                webSocketSessionMap.remove(sessionKey);

            }

            catch (IOException e)

            {

                log.error("连接[{}]关闭失败。", sessionKey);

                e.printStackTrace();

            }

            subOnlineCount();

            log.info("连接[{}]关闭,当前websocket连接数:{}", sessionKey, onlineCount);

        }

    }

     

    /**

     * 接收客户端发送的消息

     * @param message 客户端发送过来的消息

     * @param session websocket会话

     */

    @OnMessage

    public void onMessage(String message, Session session)

    {

        log.info("收到来自客户端" + custId + "的信息:" + message);

    }

     

    /**

     * 连接错误时触发

     * @param session

     * @param error

     */

    @OnError

    public void onError(Session session, Throwable error)

    {

        try

        {

            session.close();

        }

        catch (IOException e)

        {

            log.error("发生错误,连接[{}]关闭失败。");

            e.printStackTrace();

        }

        // log.error("websocket发生错误");

        // error.printStackTrace();

    }

     

    /**

     * 给指定的客户端推送消息,可单发和群发

     * @param sessionKeys 发送消息给目标客户端sessionKey,多个逗号“,”隔开1234,2345...

     * @param message

     * @throws IOException

     * @author csf

     * @date 2020年8月11日

     */

    public void sendMessage(String sessionKeys, String message)

    {

        if (StrUtil.isNotBlank(sessionKeys))

        {

            String[] sessionKeyArr = sessionKeys.split(",");

            for (String key : sessionKeyArr)

            {

                try

                {

                    // 可能存在一个账号多点登录

                    List<Session> sessionList = getLikeByMap(webSocketSessionMap, key);

                    for (Session session : sessionList)

                    {

                        session.getBasicRemote().sendText(message);

                    }

                }

                catch (IOException e)

                {

                    e.printStackTrace();

                    continue;// 某个客户端发送异常,不影响其他客户端发送

                }

            }

        }

        else

        {

            log.info("sessionKeys为空,没有目标客户端");

        }

    }

     

    /**

     * 给当前客户端推送消息,首次建立连接时调用

     */

    public void sendMessage(String message)

        throws IOException

    {

        this.session.getBasicRemote().sendText(message);

    }

     

    /**

     * 检查webSocket连接是否在线

     * @param sesstionKey webSocketMap中维护的key

     * @return 是否在线

     */

    public static boolean checkOnline(String sesstionKey)

    {

        if (webSocketSessionMap.containsKey(sesstionKey))

        {

            return true;

        }

        else

        {

            return false;

        }

    }

     

    /**

     * 获取包含key的所有map值

     * @param map

     * @param keyLike

     * @return

     * @author csf

     * @date 2020年8月13日

     */

    private List<Session> getLikeByMap(Map<String, Session> map, String keyLike)

    {

        List<Session> list = new ArrayList<>();

        for (String key : map.keySet())

        {

            if (key.contains(keyLike))

            {

                list.add(map.get(key));

            }

        }

        return list;

    }

     

    public static synchronized int getOnlineCount()

    {

        return onlineCount;

    }

     

    public static synchronized void addOnlineCount()

    {

        WebSocketServer.onlineCount++;

    }

     

    public static synchronized void subOnlineCount()

    {

        WebSocketServer.onlineCount--;

    }

}

4.redis消息订阅配置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

import org.springframework.cache.annotation.EnableCaching;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.data.redis.listener.PatternTopic;

import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration

@EnableCaching

public class RedisCacheConfig

{

    @Bean

    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter)

    {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        container.setConnectionFactory(connectionFactory);

        // 可以添加多个 messageListener,配置不同的交换机

        container.addMessageListener(listenerAdapter, new PatternTopic("showNewestMsg"));// 订阅最新消息频道

        return container;

    }

     

    @Bean

    MessageListenerAdapter listenerAdapter(RedisReceiver receiver)

    {

        // 消息监听适配器

        return new MessageListenerAdapter(receiver, "onMessage");

    }

     

    @Bean

    StringRedisTemplate template(RedisConnectionFactory connectionFactory)

    {

        return new StringRedisTemplate(connectionFactory);

    }

}

5.redis配置,直接放在springboot项目application.properties或application.yml中

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

# 数据库索引(默认为0)

spring.redis.database=0 

spring.redis.host=192.168.1.100

spring.redis.port=6379

spring.redis.password=123456

# 连接池最大连接数(使用负值表示没有限制)

spring.redis.pool.max-active=8 

# 连接池最大阻塞等待时间(使用负值表示没有限制)

spring.redis.pool.max-wait=-1 

# 连接池中的最大空闲连接

spring.redis.pool.max-idle=8 

# 连接池中的最小空闲连接

spring.redis.pool.min-idle=0 

# 连接超时时间(毫秒)

spring.redis.timeout=5000

6.接收消息生产者发布的消息,推送给对应的客户端

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

import java.io.UnsupportedEncodingException;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.connection.Message;

import org.springframework.data.redis.connection.MessageListener;

import org.springframework.stereotype.Component;

import com.kingengine.plug.websocket.WebSocketServer;

import cn.hutool.core.codec.Base64;

import cn.hutool.core.util.StrUtil;

import net.sf.json.JSONArray;

import net.sf.json.JSONObject;

/**

 * 消息监听对象,接收订阅消息

 * @author csf

 * @date 2020年8月13日

 */

@Component

public class RedisReceiver implements MessageListener

{

    Logger log = LoggerFactory.getLogger(this.getClass());

     

    @Autowired

    WebSocketServer webSocketServer;

     

    /**

     * 处理接收到的订阅消息

     */

    @Override

    public void onMessage(Message message, byte[] pattern)

    {

        String channel = new String(message.getChannel());// 订阅的频道名称

        String msg = "";

        try

        {

            msg = new String(message.getBody(), "GBK");//注意与发布消息编码一致,否则会乱码

            if (StrUtil.isNotBlank(msg)){

                if ("showNewestMsg".endsWith(channel))// 最新消息

                {

                    JSONObject json = JSONObject.fromObject(msg);

                    webSocketServer.sendMessage(json.get("receive"),json.get("msg"));

                }else{

                    //TODO 其他订阅的消息处理

                }

                

            }else{

                log.info("消息内容为空,不处理。");

            }

        }

        catch (Exception e)

        {

            log.error("处理消息异常:"+e.toString())

            e.printStackTrace();

        }

    }

}

7.消息发布测试

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

import java.io.UnsupportedEncodingException;

import java.util.HashMap;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.StringRedisTemplate;

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import net.sf.json.JSONObject;

@RequestMapping("redis")

@RestController

public class RedisTestController

{

    @Autowired

    StringRedisTemplate template;

     

    /**

     * 发布消息测试

     *@param userid

     * @param msg

     * @return

     */

    @PostMapping("sendMessage")

    public String sendMessage(String userid,String msg)

    {

        try

        {

            String newMessge=new String(msg.getBytes("GBK"),"GBK");

            Map<String,String> map = new HashMap<String, String>();

            map.put("receive", userid);

            map.put("msg", newMessge);

            template.convertAndSend("showNewestMsg",        

          JSONObject.fromObject(map).toString());

        }

        catch (UnsupportedEncodingException e)

        {

            e.printStackTrace();

        }

        return "消息发布成功!";

    }

}

8.客户端代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

<!DOCTYPE html>

<html>

<head>

    <title>WebSocket测试</title>

    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>

</head>

<body>

<div>

    来自服务端消息:

    <p id="message"></p>

</div>

</body>

<script src="http://apps.bdimg.com/libs/jquery/1.6.4/jquery.min.js"></script>

<script>

    let webSocketClient;

    if (window.WebSocket)

    {

       let custid="132456_" + Math.random();//该参数会作为websocketServer中存储session的key,要保证唯一。

        webSocketClient = new WebSocket("ws://127.0.0.1:8082/bootapp/websocket/" + custid);

        //连通之后的回调事件

        webSocketClient.onopen = function () {

            webSocketClient.send("这里是地球,收到请回答。。。");

            //  webSocket.send('{"type":"1","data":"121"}');

        };

        //接收后台服务端的消息

        webSocketClient.onmessage = function (evt) {

            console.log("数据已接收:" + evt.data);

            showMessage("未读消息:" + evt.data);

        };

        //连接关闭的回调事件

        webSocketClient.onclose = function () {

            alert("连接已关闭...");

        };

    }else{

        alert("浏览器不支持websocket");

    }

    function showMessage(message) {

        $("#message").html(message);

    }

</script>

</html>


http://www.kler.cn/a/448708.html

相关文章:

  • 在 C# 中实现的目录基础操作
  • 【SpringBoot】日志文件
  • SpringBoot 整合 SQLite 数据库
  • 深入解析:Python中的决策树与随机森林
  • vue CSS 自定义宽高 翻页 剥离 效果
  • 解决 Ubuntu 24 连接正点 I.MX6ULL 的 uboot 使用 nfs 出现 Loading: T T T T T T T T
  • RCE总结
  • YOLOv9-0.1部分代码阅读笔记-general.py
  • Pinia---新一代的Vuex
  • OpenEuler 22.03 不依赖zookeeper安装 kafka 3.3.2集群
  • 基于微信小程序的短视频系统(SpringBoot)+文档
  • [c++进阶(三)]单例模式及特殊类的设计
  • [python] 基于matplotlib实现雷达图的绘制
  • springboot连接mongo性能优化参数配置
  • 智能汽车自动驾驶发展趋
  • AI 视频:初识 Pika 2.0,基本使用攻略
  • 探索Python的pytest库:简化单元测试的艺术
  • 使用 Vite 和 Redux Toolkit 创建 React 项目
  • YOLOv9-0.1部分代码阅读笔记-autobatch.py
  • ubuntu安装Goland