当前位置: 首页 > article >正文

lettuce 默认情况下连接池不生效,源码分析

先说结论:

LettuceConnectionFactory 属性 shareNativeConnection 默认为true,要想连接池生效,该参数设置为false;

spring2.0开始默认使用lettuce,lettuce和jedis一样支持使用连接池;这里不对比两款客户端的性能差异,只针对使用lettuce客户端执行命令获取连接的源码分析其逻辑;

版本说明:

spring-date-redis:2.3.9.RELEASE;

lettuce-core:5.3.7.RELEASE;

以单机版本reids说明;即:RedisStandaloneConfiguration,集群也是同样方法

以 stringRedisTemplate.opsForValue().get(key) 示例追踪说明;

点击进入get方法:

public V get(Object key) {
    return this.execute(new AbstractOperations<K, V>.ValueDeserializingRedisCallback(key) {
        protected byte[] inRedis(byte[] rawKey, RedisConnection connection) {
            return connection.get(rawKey);
        }
    }, true);
}

接着进入execute方法:

@Nullable
<T> T execute(RedisCallback<T> callback, boolean exposeConnection) {
    return this.template.execute(callback, exposeConnection);
}

接着进入:org.springframework.data.redis.core.RedisTemplate#execute(org.springframework.data.redis.core.RedisCallback<T>, boolean, boolean);

@Nullable
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
    Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
    Assert.notNull(action, "Callback object must not be null");
    RedisConnectionFactory factory = this.getRequiredConnectionFactory();
    RedisConnection conn = null;

    Object var11;
    try {
        if (this.enableTransactionSupport) {
            conn = RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);
        } else {
            conn = RedisConnectionUtils.getConnection(factory);
        }

        boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
        RedisConnection connToUse = this.preProcessConnection(conn, existingConnection);
        boolean pipelineStatus = connToUse.isPipelined();
        if (pipeline && !pipelineStatus) {
            connToUse.openPipeline();
        }

        RedisConnection connToExpose = exposeConnection ? connToUse : this.createRedisConnectionProxy(connToUse);
        T result = action.doInRedis(connToExpose);
        if (pipeline && !pipelineStatus) {
            connToUse.closePipeline();
        }

        var11 = this.postProcessResult(result, connToUse, existingConnection);
    } finally {
        RedisConnectionUtils.releaseConnection(conn, factory, this.enableTransactionSupport);
    }

    return var11;
}

到这里后进入获取连接的入口:conn = RedisConnectionUtils.getConnection(factory);

点进去到:org.springframework.data.redis.core.RedisConnectionUtils#doGetConnection

public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean transactionSupport) {
    Assert.notNull(factory, "No RedisConnectionFactory specified");
    RedisConnectionUtils.RedisConnectionHolder connHolder = (RedisConnectionUtils.RedisConnectionHolder)TransactionSynchronizationManager.getResource(factory);
    if (connHolder != null) {
        if (transactionSupport) {
            potentiallyRegisterTransactionSynchronisation(connHolder, factory);
        }

        return connHolder.getConnection();
    } else if (!allowCreate) {
        throw new IllegalArgumentException("No connection found and allowCreate = false");
    } else {
        if (log.isDebugEnabled()) {
            log.debug("Opening RedisConnection");
        }

        RedisConnection conn = factory.getConnection();
        if (bind) {
            RedisConnection connectionToBind = conn;
            if (transactionSupport && isActualNonReadonlyTransactionActive()) {
                connectionToBind = createConnectionProxy(conn, factory);
            }

            connHolder = new RedisConnectionUtils.RedisConnectionHolder(connectionToBind);
            TransactionSynchronizationManager.bindResource(factory, connHolder);
            if (transactionSupport) {
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            }

            return connHolder.getConnection();
        } else {
            return conn;
        }
    }
}

忽略事务相关逻辑,进入:factory.getConnection();

public RedisConnection getConnection() {
    if (this.isClusterAware()) {
        return this.getClusterConnection();
    } else {
        LettuceConnection connection = this.doCreateLettuceConnection(this.getSharedConnection(), this.connectionProvider, this.getTimeout(), this.getDatabase());
        connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults);
        return connection;
    }
}

先看:this.getSharedConnection();核心逻辑this.shareNativeConnection 标志位判断,默认为true;

org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#getSharedConnection

@Nullable
    protected StatefulRedisConnection<byte[], byte[]> getSharedConnection() {
        return this.shareNativeConnection ? (StatefulRedisConnection)this.getOrCreateSharedConnection().getConnection() : null;
    }

在进入核心方法:this.getOrCreateSharedConnection().getConnection();this.connection同步只会创建一次

private LettuceConnectionFactory.SharedConnection<byte[]> getOrCreateSharedConnection() {
        synchronized(this.connectionMonitor) {
            if (this.connection == null) {
                this.connection = new LettuceConnectionFactory.SharedConnection(this.connectionProvider);
            }

            return this.connection;
        }
    }
@Nullable
        StatefulConnection<E, E> getConnection() {
            synchronized(this.connectionMonitor) {
                if (this.connection == null) {
                    this.connection = this.getNativeConnection();
                }

                if (LettuceConnectionFactory.this.getValidateConnection()) {
                    this.validateConnection();
                }

                return this.connection;
            }
        }
private StatefulConnection<E, E> getNativeConnection() {
            return this.connectionProvider.getConnection(StatefulConnection.class);
        }
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
        GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> {
            return ConnectionPoolSupport.createGenericObjectPool(() -> {
                return this.connectionProvider.getConnection(connectionType);
            }, this.poolConfig, false);
        });

        try {
            StatefulConnection<?, ?> connection = (StatefulConnection)pool.borrowObject();
            this.poolRef.put(connection, pool);
            return (StatefulConnection)connectionType.cast(connection);
        } catch (Exception var4) {
            throw new PoolException("Could not get a resource from the pool", var4);
        }
    }

这里只会从pool里获取一个链接,后面不会再获取;

进入:this.doCreateLettuceConnection(this.getSharedConnection(), this.connectionProvider, this.getTimeout(), this.getDatabase())

protected LettuceConnection doCreateLettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> sharedConnection, LettuceConnectionProvider connectionProvider, long timeout, int database) {
        LettuceConnection connection = new LettuceConnection(sharedConnection, connectionProvider, timeout, database);
        connection.setPipeliningFlushPolicy(this.pipeliningFlushPolicy);
        return connection;
    }

其中 LettuceConnection connection = new LettuceConnection(sharedConnection, connectionProvider, timeout, database); 每次会把同一个连接赋值给asyncSharedConn

LettuceConnection(@Nullable StatefulConnection<byte[], byte[]> sharedConnection, LettuceConnectionProvider connectionProvider, long timeout, int defaultDbIndex) {
        this.isClosed = false;
        this.isMulti = false;
        this.isPipelined = false;
        this.txResults = new LinkedList();
        this.convertPipelineAndTxResults = true;
        this.pipeliningFlushPolicy = LettuceConnection.PipeliningFlushPolicy.flushEachCommand();
        Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null.");
        this.asyncSharedConn = sharedConnection;
        this.connectionProvider = connectionProvider;
        this.timeout = timeout;
        this.defaultDbIndex = defaultDbIndex;
        this.dbIndex = this.defaultDbIndex;
    }

这里的pipeline 刷新策略是:this.pipeliningFlushPolicy = LettuceConnection.PipeliningFlushPolicy.flushEachCommand();

即每个命令执行执行刷新,所以说使用lettuce客户端的前提下用pipeline的意义不大;

回到命令执行方法:org.springframework.data.redis.core.RedisTemplate#execute(org.springframework.data.redis.core.RedisCallback<T>, boolean, boolean) 

来到:T result = action.doInRedis(connToExpose);

会进到:org.springframework.data.redis.connection.DefaultStringRedisConnection#get(byte[])

this.delegate.get(key)会进入到:org.springframework.data.redis.connection.lettuce.LettuceStringCommands#get

进入this.getConnection;

org.springframework.data.redis.connection.lettuce.LettuceConnection#getConnection()

到这里就用到了之前在 LettuceConnection connection = new LettuceConnection(sharedConnection, connectionProvider, timeout, database); 方法里赋值给asyncSharedConn 的对象;

如果为null即上面shareNativeConnection为false,会走 this.getDedicatedConnection() :org.springframework.data.redis.connection.lettuce.LettucePoolingConnectionProvider#getConnection 从连接池里获取链接

获取到链接之后会进入:io.lettuce.core.AbstractRedisAsyncCommands#dispatch(io.lettuce.core.protocol.RedisCommand<K,V,T>)

至此已经能够看出shareNativeConnection参数true和false的区别


http://www.kler.cn/a/442080.html

相关文章:

  • JavaScript--流程控制
  • C语言编程笔记:文件处理的艺术
  • iOS - Objective-C 底层实现中的哈希表
  • Unity2017 控制.abc格式的三维动画播放
  • 深度学习 Pytorch 张量的线性代数运算
  • 深度学习 Pytorch 张量(Tensor)的创建和常用方法
  • 阿尔茨海默症数据集,使用yolo,voc,coco格式对2013张原始图片进行标注,可识别轻微,中等和正常的症状
  • 小程序快速实现大模型聊天机器人
  • linux oracle proc 编译报错
  • 跟沐神学读论文-论文阅读管理
  • 决策引擎技术
  • Android 13 相较于 Android 12 的新特性
  • ios 混合开发应用白屏问题
  • 健康养生:拥抱生活的艺术
  • 【C++】explicit关键字详解(explicit关键字是什么? 为什么需要explicit关键字? 如何使用explicit 关键字)
  • Linux Shell 脚本编程基础知识篇
  • VUE的缓存问题
  • 搭建分布式Hive集群
  • 【C语言程序设计——入门】基本数据类型与表达式(头歌实践教学平台习题)【合集】
  • WPF ControlTemplate 控件模板
  • vue3+fastAPI最简单例子
  • 小主机大用途:香橙派使用Docker轻松搭建Lsky Pro图床实操教程
  • vue 自定义组件image 和 input
  • AWTK-WEB 快速入门(1) - C 语言应用程序
  • 【动态规划】多歧路 , 今安在? - 路径问题
  • 监控视频汇聚融合云平台一站式解决视频资源管理痛点