当前位置: 首页 > article >正文

netty之实现一个redis的客户端

写在前面

本文看下如何使用redis来实现一个类似于redis官方提供的redis-cli.exe的客户端工具。

1:用到的模块

主要需要用到netty针对redis的编解码模块,可以解析redis的协议,从而可以实现和redis交互的功能。
在这里插入图片描述

2:正文

首先来定义客户端类:

package com.dahuyou.netty.redis.cli;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;

import javax.swing.plaf.synth.SynthRadioButtonMenuItemUI;
import java.io.BufferedReader;
import java.io.InputStreamReader;

public class RedisClient {

    String host;    //   目标主机
    int port;       //   目标主机端口

    public RedisClient(String host,int port){
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new RedisClientInitializer());

            Channel channel = bootstrap.connect(host, port).sync().channel();
            System.out.println(" connected to host : " + host + ", port : " + port);
            System.out.println(" type redis's command to communicate with redis-server or type 'quit' to shutdown ");
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            ChannelFuture lastWriteFuture = null;
            for (;;) {
                Thread.sleep(1000);
                RedisClientHandler.serialNum = 0;
                System.out.println(host + ":" + port + ">");
                String s = in.readLine();
                if(s.equalsIgnoreCase("quit")) {
                    break;
                }
//                System.out.print(">");
                lastWriteFuture = channel.writeAndFlush(s);
                lastWriteFuture.addListener(new GenericFutureListener<ChannelFuture>() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            System.err.print("write failed: ");
                            future.cause().printStackTrace(System.err);
                        }
                    }
                });
            }
            if (lastWriteFuture != null) {
                lastWriteFuture.sync();
            }
            System.out.println(" bye ");
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception{
//        RedisClient client = new RedisClient("192.168.56.10",6379);
        RedisClient client = new RedisClient("127.0.0.1",6379);
        client.start();
    }

}

这里redis server其实就是一个tcp server的角色了。
在启动类中同一个for (;;) {的死循环来等待用户录入信息,类似于redis-cli.exe的如下功能:
在这里插入图片描述
另外,通过RedisClientInitializer设置协议解析的编解码器,如下:

package com.dahuyou.netty.redis.cli;
 
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.redis.RedisArrayAggregator;
import io.netty.handler.codec.redis.RedisBulkStringAggregator;
import io.netty.handler.codec.redis.RedisDecoder;
import io.netty.handler.codec.redis.RedisEncoder;
 
public class RedisClientInitializer extends ChannelInitializer<Channel> {
 
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new RedisDecoder());
        pipeline.addLast(new RedisBulkStringAggregator());
        pipeline.addLast(new RedisArrayAggregator());
        pipeline.addLast(new RedisEncoder());
        pipeline.addLast(new RedisClientHandler());
    }
}

RedisDecoder,RedisBulkStringAggregator,RedisArrayAggregator,RedisEncoder这几个类都是redis codec模块提供的编解码类,如下:
在这里插入图片描述
RedisClientHandler是我们自定义的业务处理类,源码如下:

package com.dahuyou.netty.redis.cli;
 
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.redis.*;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
 
import java.util.ArrayList;
import java.util.List;
 
public class RedisClientHandler extends ChannelDuplexHandler {
 
 
    // 发送 redis 命令
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        String[] commands = ((String) msg).split("\\s+");
        List<RedisMessage> children = new ArrayList<>(commands.length);
        for (String cmdString : commands) {
            children.add(new FullBulkStringRedisMessage(ByteBufUtil.writeUtf8(ctx.alloc(), cmdString)));
        }
        RedisMessage request = new ArrayRedisMessage(children);
        ctx.write(request, promise);
    }
 
 
    // 接收 redis 响应数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        RedisMessage redisMessage = (RedisMessage) msg;
        // 打印响应消息
        printAggregatedRedisResponse(redisMessage);
        // 是否资源
        ReferenceCountUtil.release(redisMessage);
    }
 
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.print("exceptionCaught: ");
        cause.printStackTrace(System.err);
        ctx.close();
    }
 
 
    private static void printAggregatedRedisResponse(RedisMessage msg) {
        if (msg instanceof SimpleStringRedisMessage) {
            System.out.println(((SimpleStringRedisMessage) msg).content());
        } else if (msg instanceof ErrorRedisMessage) {
            System.out.println(((ErrorRedisMessage) msg).content());
        } else if (msg instanceof IntegerRedisMessage) {
            System.out.println(((IntegerRedisMessage) msg).value());
        } else if (msg instanceof FullBulkStringRedisMessage) {
            System.out.println(getString((FullBulkStringRedisMessage) msg));
        } else if (msg instanceof ArrayRedisMessage) {
            for (RedisMessage child : ((ArrayRedisMessage) msg).children()) {
                printAggregatedRedisResponse(child);
            }
        } else {
            throw new CodecException("unknown message type: " + msg);
        }
    }

    public static int serialNum = 0;
    private static String getString(FullBulkStringRedisMessage msg) {
        if (msg.isNull()) {
            return "(null)";
        }
        return ++serialNum + ") " +msg.content().toString(CharsetUtil.UTF_8);
    }
 
}

运行测试:
在这里插入图片描述

写在后面

参考文章列表

redis的基本命令,并用netty操作redis(不使用springboot或者spring框架)就单纯的用netty搞。 。

netty之导入源码到idea 。


http://www.kler.cn/a/376354.html

相关文章:

  • 基于ESP8266 wifimanager实现WiFi配置及天气显示
  • 《使用人工智能心脏磁共振成像筛查和诊断心血管疾病》论文精读
  • C++实现设计模式---原型模式 (Prototype)
  • MySQL Binlog 同步工具go-mysql-transfer Lua模块使用说明
  • <论文>时序大模型如何应用于金融领域?
  • springboot vue uniapp 仿小红书 1:1 还原 (含源码演示)
  • Java设计模式-单例模式和工厂模式的思路解析
  • CFA全球投资分析大赛专访:与投资人胡建平共话投资智慧
  • Windows 下基于 CLion 配置 Linux 项目开发环境
  • 【再谈设计模式】原型模式~复制的魔法师
  • 基于CNN-LSTM的时间序列数据预测,15个输入1个输出,可以更改数据集,MATLAB代码
  • 建立maven项目常见问题解决办法
  • 循环神经网络(RNN):处理序列数据的 AI 利器
  • 【论文速读】| APILOT:通过避开过时API陷阱,引导大语言模型生成安全代码
  • pycharm小游戏飞机射击
  • 显示器接口
  • 2024年11月1日Day2第一部分(最详细简单有趣味的介绍2
  • 合并排序算法(C语言版)
  • 【网络面试篇】TCP与UDP类
  • Linux之selinux和防火墙
  • 优化外贸管理 解锁全球业务流畅双效
  • python爬虫实现自动获取论文GB 7714引用
  • 【开源免费】基于SpringBoot+Vue.J服装商城系统(JAVA毕业设计)
  • i2c与从设备通讯编程示例之开发板测试
  • 使用pytorch实现LSTM预测交通流
  • 【排序】常见的八大排序算法