当前位置: 首页 > article >正文

【源码】Sharding-JDBC源码分析之SQL重写实现原理

 Sharding-JDBC系列

1、Sharding-JDBC分库分表的基本使用

2、Sharding-JDBC分库分表之SpringBoot分片策略

3、Sharding-JDBC分库分表之SpringBoot主从配置

4、SpringBoot集成Sharding-JDBC-5.3.0分库分表

5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表

6、【源码】Sharding-JDBC源码分析之JDBC

7、【源码】Sharding-JDBC源码分析之SPI机制

8、【源码】Sharding-JDBC源码分析之Yaml分片配置文件解析原理

9、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(一)

10、【源码】Sharding-JDBC源码分析之Yaml分片配置原理(二)

11、【源码】Sharding-JDBC源码分析之Yaml分片配置转换原理

12、【源码】Sharding-JDBC源码分析之ShardingSphereDataSource的创建原理

13、【源码】Sharding-JDBC源码分析之ContextManager创建中mode分片配置信息的持久化存储的原理

14、【源码】Sharding-JDBC源码分析之ContextManager创建中ShardingSphereDatabase的创建原理

15、【源码】Sharding-JDBC源码分析之分片规则生成器DatabaseRuleBuilder实现规则配置到规则对象的生成原理

16、【源码】Sharding-JDBC源码分析之配置数据库定义的表的元数据解析原理

17、【源码】Sharding-JDBC源码分析之ShardingSphereConnection的创建原理

18、【源码】Sharding-JDBC源码分析之ShardingSpherePreparedStatement的创建原理

19、【源码】Sharding-JDBC源码分析之Sql解析的原理

20、【源码】Sharding-JDBC源码分析之SQL路由及SingleSQLRouter单表路由

21、【源码】Sharding-JDBC源码分析之SQL中分片键路由ShardingSQLRouter的原理

22、【源码】Sharding-JDBC源码分析之SQL中读写分离路由ReadwriteSplittingSQLRouter的原理

23、 【源码】Sharding-JDBC源码分析之SQL中读写分离动态策略、数据库发现规则及DatabaseDiscoverySQLRouter路由的原理

24、【源码】Sharding-JDBC源码分析之SQL中影子库ShadowSQLRouter路由的原理

25、【源码】Sharding-JDBC源码分析之SQL重写实现原理

前言

在Sharding Sphere框架中,在数据源中真正执行SQL语句之前,先解析SQL,结合配置的规则,进行重新路由,前面用了5篇介绍了SQL路由的实现原理。路由之后,根据路由映射,对SQL进行重写,如替换SQL真正需要执行的表等。本篇从源码的角度,分析SQL重写的实现原理。

ShardingSpherePreparedStatement回顾

在【源码】Sharding-JDBC源码分析之SQL路由及SingleSQLRouter单表路由-CSDN博客中分析在执行SQL语句前,会进行SQL路由。通过配置的路由规则,创建RouteContext对象,在RouteContext路由上下文对象中,包含了SQL真正执行的数据源、逻辑表及真实表的映射。

创建完RouteContext路由上下文对象之后,执行rewrite()进行路由重写。rewrite()代码如下:

package org.apache.shardingsphere.infra.context.kernel;

/**
 * 内核处理器
 */
public final class KernelProcessor {
    
    /**
     * sql重写
     * @param queryContext 查询上下文
     * @param database 数据库信息
     * @param globalRuleMetaData 全局规则源数据
     * @param props 配置是props
     * @param routeContext 路由上下文
     * @param connectionContext 连接上下文
     * @return
     */
    private SQLRewriteResult rewrite(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData,
                                     final ConfigurationProperties props, final RouteContext routeContext, final ConnectionContext connectionContext) {
        // 创建SQL重写条目,包含重写装饰器
        SQLRewriteEntry sqlRewriteEntry = new SQLRewriteEntry(database, globalRuleMetaData, props);
        // 重写
        return sqlRewriteEntry.rewrite(queryContext.getSql(), queryContext.getParameters(), queryContext.getSqlStatementContext(), routeContext, connectionContext);
    }
	
}

在rewrite()方法中,创建一个SQLRewriteEntry重写对象,执行SQLRewriteEntry的rewrite()方法。

SQLRewriteEntry

SQLRewriteEntry的源码如下:

package org.apache.shardingsphere.infra.rewrite;

/**
 * SQL 重写
 */
public final class SQLRewriteEntry {

    // 数据库
    private final ShardingSphereDatabase database;

    // 配置的全局规则
    private final ShardingSphereRuleMetaData globalRuleMetaData;

    // 配置的属性
    private final ConfigurationProperties props;

    // 规则中的sql重写装饰器上下文
    @SuppressWarnings("rawtypes")
    private final Map<ShardingSphereRule, SQLRewriteContextDecorator> decorators;
    
    public SQLRewriteEntry(final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData, final ConfigurationProperties props) {
        this.database = database;
        this.globalRuleMetaData = globalRuleMetaData;
        this.props = props;
		// 通过SPI,结合配置的规则,获取重写装饰器
        decorators = OrderedSPIRegistry.getRegisteredServices(SQLRewriteContextDecorator.class, database.getRuleMetaData().getRules());
    }
    
    /**
     * 重写
     * @param sql 当前的sql语句
     * @param params sql对应的参数值
     * @param sqlStatementContext sql语句的上下文
     * @param routeContext 解析的路由上下文
     * @param connectionContext 连接上下文
     * @return
     */
    public SQLRewriteResult rewrite(final String sql, final List<Object> params, final SQLStatementContext<?> sqlStatementContext,
                                    final RouteContext routeContext, final ConnectionContext connectionContext) {
        // 创建 SQLRewriteContext
        SQLRewriteContext sqlRewriteContext = createSQLRewriteContext(sql, params, sqlStatementContext, routeContext, connectionContext);
        // 获取SQL转换器规则
        SQLTranslatorRule rule = globalRuleMetaData.getSingleRule(SQLTranslatorRule.class);
        DatabaseType protocolType = database.getProtocolType();
        Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes();
        return routeContext.getRouteUnits().isEmpty()
                // 如果没有路由单元
                ? new GenericSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext)
                // 有路由单元
                : new RouteSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext, routeContext);
    }

    /**
     * 创建Sql重写上下文
     * @param sql
     * @param params
     * @param sqlStatementContext
     * @param routeContext
     * @param connectionContext
     * @return
     */
    private SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> params, final SQLStatementContext<?> sqlStatementContext,
                                                      final RouteContext routeContext, final ConnectionContext connectionContext) {
        // 创建重写上下文
        SQLRewriteContext result = new SQLRewriteContext(database.getName(), database.getSchemas(), sqlStatementContext, sql, params, connectionContext);
        // 遍历重写装饰器,执行装饰器的decorate()方法
        decorate(decorators, result, routeContext);
        // 遍历sql令牌生成器,创建sql令牌。如分页令牌、自动主键令牌、distinct()令牌等
        result.generateSQLTokens();
        return result;
    }

    /**
     * 遍历重写装饰器,执行装饰器的decorate()方法
     * @param decorators
     * @param sqlRewriteContext
     * @param routeContext
     */
    @SuppressWarnings({"unchecked", "rawtypes"})
    private void decorate(final Map<ShardingSphereRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
        for (Entry<ShardingSphereRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) {
            entry.getValue().decorate(entry.getKey(), props, sqlRewriteContext, routeContext);
        }
    }
}

3.1 构造方法

构造方法主要执行如下:

1)记录基本信息;

2)通过SPI,结合配置的规则,获取重写装饰器;

系统实现的装饰器包括:

a)ShardingSQLRewriteContextDecorator:分片重写装饰器。配置分片规则时,通过SPI获取;

b)EncryptSQLRewriteContextDecorator:加密重写装饰器,配置加密规则时,通过SPI获取;

3.2 rewrite()重写方法

在KernelProcessor中通过该rewrite()方法,执行SQL重写,创建SQLRewriteResult对象。主要执行如下:

1)创建SQLRewriteContext对象;

1.1)创建SQLRewriteContext对象;

1.2)执行decorate()方法,遍历重写装饰器,执行装饰器的decorate()方法。如设置了分片规则,则执行ShardingSQLRewriteContextDecorator的decorate()进行SQLRewriteContext对象的装饰增强。如添加参数重写器创建器、SQL令牌生成器;

1.3)执行SQLRewriteContext对象的generateSQLTokens(),遍历sql令牌生成器,创建sql令牌。如分页令牌、自动主键令牌、distinct()令牌等;

1.4)返回SQLRewriteContext对象;

2)从全局规则元数据中获取SQL转换器规则对象;

3)创建SQL重写引擎,执行重写引擎的rewrite()方法;

如果路由上下文中的路由单元为空,说明没有路由映射,创建GenericSQLRewriteEngine;否则创建RouteSQLRewriteEngine。然后执行重写引擎的rewrite()方法,返回一个SQLRewriteResult对象;

SQLRewriteContext

SQLRewriteContext的源码如下:

package org.apache.shardingsphere.infra.rewrite.context;

/**
 * SQL 重写上下文。维护sql重写令牌、参数生成器
 */
@Getter
public final class SQLRewriteContext {

    // 数据库名称
    private final String databaseName;

    // schema信息
    private final Map<String, ShardingSphereSchema> schemas;

    // sql语句上下文
    private final SQLStatementContext<?> sqlStatementContext;

    // sql语句
    private final String sql;

    // sql语句的参数值
    private final List<Object> parameters;

    // 参数创建者
    private final ParameterBuilder parameterBuilder;

    // SQL令牌,同SQLTokenGenerator生成,如OrderBySQLToken等
    private final List<SQLToken> sqlTokens = new LinkedList<>();

    // sql 令牌生成器。对于大部分的sql操作,都会添加RemoveTokenGenerator
    @Getter(AccessLevel.NONE)
    private final SQLTokenGenerators sqlTokenGenerators = new SQLTokenGenerators();
    
    private final ConnectionContext connectionContext;
    
    public SQLRewriteContext(final String databaseName, final Map<String, ShardingSphereSchema> schemas,
                             final SQLStatementContext<?> sqlStatementContext, final String sql, final List<Object> params, final ConnectionContext connectionContext) {
        this.databaseName = databaseName;
        this.schemas = schemas;
        this.sqlStatementContext = sqlStatementContext;
        this.sql = sql;
        parameters = params;
        this.connectionContext = connectionContext;
        // 添加RemoveTokenGenerator生成器
        addSQLTokenGenerators(new DefaultTokenGeneratorBuilder(sqlStatementContext).getSQLTokenGenerators());
        // 创建参数创建器
        parameterBuilder = ((sqlStatementContext instanceof InsertStatementContext) && (null == ((InsertStatementContext) sqlStatementContext).getInsertSelectContext()))
                // 如果是插入语句,且没有子查询,创建GroupedParameterBuilder参数生成器
                ? new GroupedParameterBuilder(
                        ((InsertStatementContext) sqlStatementContext).getGroupedParameters(), ((InsertStatementContext) sqlStatementContext).getOnDuplicateKeyUpdateParameters())
                // 否则创建标准的参数生成器
                : new StandardParameterBuilder(params);
    }
    
    /**
     * 添加token生成器
     */
    public void addSQLTokenGenerators(final Collection<SQLTokenGenerator> sqlTokenGenerators) {
        this.sqlTokenGenerators.addAll(sqlTokenGenerators);
    }
    
    /**
     * 生成SQL令牌
     */
    public void generateSQLTokens() {
        sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(databaseName, schemas, sqlStatementContext, parameters, connectionContext));
    }
}

在SQLRewriteContext对象中,保存了当前执行的SQL的语句上下文对象、参数、重写的令牌等。

ShardingSQLRewriteContextDecorator

ShardingSQLRewriteContextDecorator的源码如下:

package org.apache.shardingsphere.sharding.rewrite.context;

/**
 * 用于分片的SQL重写上下文装饰器
 */
@Setter
public final class ShardingSQLRewriteContextDecorator implements SQLRewriteContextDecorator<ShardingRule> {

    /**
     * 装饰
     * @param shardingRule 分片规则
     * @param props 配置的属性
     * @param sqlRewriteContext sql重写上下文
     * @param routeContext 路由上下文
     */
    @SuppressWarnings("rawtypes")
    @Override
    public void decorate(final ShardingRule shardingRule, final ConfigurationProperties props, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
        // 如果有参数值
        if (!sqlRewriteContext.getParameters().isEmpty()) {
            // 获取参数重写器
            Collection<ParameterRewriter> parameterRewriters = new ShardingParameterRewriterBuilder(shardingRule,
                    routeContext, sqlRewriteContext.getSchemas(), sqlRewriteContext.getSqlStatementContext()).getParameterRewriters();
            // 参数重写,执行重写器的rewrite()方法
            rewriteParameters(sqlRewriteContext, parameterRewriters);
        }
        // 添加分片sql令牌生成器
        sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext, sqlRewriteContext.getSqlStatementContext()).getSQLTokenGenerators());
    }
    
    @SuppressWarnings({"unchecked", "rawtypes"})
    private void rewriteParameters(final SQLRewriteContext sqlRewriteContext, final Collection<ParameterRewriter> parameterRewriters) {
        for (ParameterRewriter each : parameterRewriters) {
            each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
        }
    }
    
    @Override
    public int getOrder() {
        return ShardingOrder.ORDER;
    }
    
    @Override
    public Class<ShardingRule> getTypeClass() {
        return ShardingRule.class;
    }
}

如果配置了分片规则,则在 SQLRewriteEntry 的构造方法中会创建ShardingSQLRewriteContextDecorator装饰器对象。在SQLRewriteEntry的rewrite()方法中,执行ShardingSQLRewriteContextDecorator的decorate()方法。

decorate()方法执行如下:

1)如果SQL操作语句有参数值,则执行如下:

1.1)创建分片参数重写器创建器ShardingParameterRewriterBuilder,获取参数重写器;

在 ShardingParameterRewriterBuilder 类的getParameterRewriters()方法中,会返回两个参数重写器:

a)ShardingGeneratedKeyInsertValueParameterRewriter:自动生成插入语句中的主键;

b)ShardingPaginationParameterRewriter:自动替换分页查询参数的值;

1.2)执行rewriteParameters()方法,进行参数重写。执行ShardingParameterRewriterBuilder的rewrite()进行参数重写;

a)ShardingGeneratedKeyInsertValueParameterRewriter:根据主键生成器,添加主键值;

b)ShardingPaginationParameterRewriter:自动替换分页查询参数的值。如分页查询第二页10~20的数据,且分片到两张表,那么每张表应该查询的记录是1~20条,因为并无法知道第一页的10条是在哪张表获取的,此时的1和20就是通过该参数重写器进行自动替换的;

2)执行sqlRewriteContext的addSQLTokenGenerators()方法,添加分片SQL令牌生成器ShardingTokenGenerateBuilder的getSQLTokenGenerators()方法返回的令牌生成器;

a)在ShardingTokenGenerateBuilder令牌生成器的getSQLTokenGenerators()方法中,添加17个令牌生成器,如order by、distinct、offset、rowcount等;

b)对应的令牌生成器,用于生成对应令牌。如order by的生成器,生成OrderByToken;

c)在进行SQL重写是,会调用令牌的toString()方法。toString()方法返回对应令牌的SQL语句。如OrderByToken的toString()方法,返回 order by columnLabel orderDirection,即order by 字符串加上对应排序的列及排序方向;

RouteSQLRewriteEngine

如果路由上下文不为空,即有路由数据源映射信息,则创建RouteSQLRewriteEngine对象,并执行RouteSQLRewriteEngine的rewrite()方法,进行SQL重写。

RouteSQLRewriteEngine的源码如下:

package org.apache.shardingsphere.infra.rewrite.engine;

/**
 * 路由的SQL重写引擎
 */
@RequiredArgsConstructor
public final class RouteSQLRewriteEngine {

    // 配置的sql转换规则
    private final SQLTranslatorRule translatorRule;

    // 可通过proxy-frontend-database-protocol-type属性配置,
    // 如果没有配置,为当前配置的数据源中可用的第一个数据源的数据库类型
    private final DatabaseType protocolType;

    // 当前配置的数据源对应的数据库类型
    private final Map<String, DatabaseType> storageTypes;
    
    /**
     * 重写sql和参数
     * @param sqlRewriteContext sql重写上下文
     * @param routeContext 路由上下文
     * @return
     */
    public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
        // key为路由单元;value为重写后的sql单元
        Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1);
        // 聚合路由单元组。按数据源名称分组。遍历
        for (Entry<String, Collection<RouteUnit>> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) {
            Collection<RouteUnit> routeUnits = entry.getValue();
            // 判断是否需要聚合重写
            if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) {
                // 对于需要聚合的sql进行重写,使用union all,一次连接执行多个查询
                sqlRewriteUnits.put(routeUnits.iterator().next(), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));
            } else {
                // 添加重写单元
                addSQLRewriteUnits(sqlRewriteUnits, sqlRewriteContext, routeContext, routeUnits);
            }
        }
        return new RouteSQLRewriteResult(translate(sqlRewriteContext.getSqlStatementContext().getSqlStatement(), sqlRewriteUnits));
    }

    /**
     * 创建重写单元。对于需要聚合的sql进行重写,使用union all,一次连接执行多个查询
     * @param sqlRewriteContext sql重写上下文
     * @param routeContext 路由上下文
     * @param routeUnits 路由单元
     * @return
     */
    private SQLRewriteUnit createSQLRewriteUnit(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext, final Collection<RouteUnit> routeUnits) {
        Collection<String> sql = new LinkedList<>();
        List<Object> params = new LinkedList<>();
        // 判断是select语句是否包含$符号
        boolean containsDollarMarker = sqlRewriteContext.getSqlStatementContext() instanceof SelectStatementContext
                && ((SelectStatementContext) (sqlRewriteContext.getSqlStatementContext())).isContainsDollarParameterMarker();
        for (RouteUnit each : routeUnits) {
            // 创建RouteSQLBuilder,重新拼接sql
            sql.add(SQLUtil.trimSemicolon(new RouteSQLBuilder(sqlRewriteContext, each).toSQL()));
            // 如果包含$符号 && 有参数值
            if (containsDollarMarker && !params.isEmpty()) {
                continue;
            }
            // 添加参数
            params.addAll(getParameters(sqlRewriteContext.getParameterBuilder(), routeContext, each));
        }
        return new SQLRewriteUnit(String.join(" UNION ALL ", sql), params);
    }

    /**
     * 添加SQL重写单元。每个路由单元创建一个SQLRewriteUnit,每个SQLRewriteUnit对sql进行重写
     * @param sqlRewriteUnits sql重写单元
     * @param sqlRewriteContext 重写上下文
     * @param routeContext 路由上下文
     * @param routeUnits 路由单元
     */
    private void addSQLRewriteUnits(final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits, final SQLRewriteContext sqlRewriteContext,
                                    final RouteContext routeContext, final Collection<RouteUnit> routeUnits) {
        // 遍历路由单元
        for (RouteUnit each : routeUnits) {
            // 每个路由单元创建一个SQLRewriteUnit,每个SQLRewriteUnit对sql进行重写
            sqlRewriteUnits.put(each, new SQLRewriteUnit(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeContext, each)));
        }
    }

    /**
     * 判断是否需要聚合重写。没有子查询或join查询 && 没有排序和分页 && 没有锁部分,返回true;否则为false
     * @param sqlStatementContext sql语句上下文
     * @param routeUnits 路由单元
     * @return
     */
    private boolean isNeedAggregateRewrite(final SQLStatementContext<?> sqlStatementContext, final Collection<RouteUnit> routeUnits) {
        // 只有查询语句 && 大于一个路由单元,才需要聚合
        if (!(sqlStatementContext instanceof SelectStatementContext) || routeUnits.size() == 1) {
            return false;
        }
        SelectStatementContext statementContext = (SelectStatementContext) sqlStatementContext;
        boolean containsSubqueryJoinQuery = statementContext.isContainsSubquery() || statementContext.isContainsJoinQuery();
        boolean containsOrderByLimitClause = !statementContext.getOrderByContext().getItems().isEmpty() || statementContext.getPaginationContext().isHasPagination();
        boolean containsLockClause = SelectStatementHandler.getLockSegment(statementContext.getSqlStatement()).isPresent();
        // 没有子查询或join查询 && 没有排序和分页 && 没有锁部分
        boolean needAggregateRewrite = !containsSubqueryJoinQuery && !containsOrderByLimitClause && !containsLockClause;
        statementContext.setNeedAggregateRewrite(needAggregateRewrite);
        return needAggregateRewrite;
    }

    /**
     * 聚合路由单元组。按数据源名称分组
     * @param routeUnits 路由单元
     * @return
     */
    private Map<String, Collection<RouteUnit>> aggregateRouteUnitGroups(final Collection<RouteUnit> routeUnits) {
        Map<String, Collection<RouteUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1);
        for (RouteUnit each : routeUnits) {
            String dataSourceName = each.getDataSourceMapper().getActualName();
            result.computeIfAbsent(dataSourceName, unused -> new LinkedList<>()).add(each);
        }
        return result;
    }

    /**
     * 获取参数
     * @param paramBuilder 参数创建器
     * @param routeContext 路由上下文
     * @param routeUnit 路由单元
     * @return
     */
    private List<Object> getParameters(final ParameterBuilder paramBuilder, final RouteContext routeContext, final RouteUnit routeUnit) {
        // 如果是标准参数生成器
        if (paramBuilder instanceof StandardParameterBuilder) {
            // 获取参数值,返回的类型为List<List<Object>>,即每个参数都为List<Object>类型
            return paramBuilder.getParameters();
        }
        return routeContext.getOriginalDataNodes().isEmpty()
                // 如果没有路由信息
                ? ((GroupedParameterBuilder) paramBuilder).getParameters()
                // 如果有路由信息,获取路由参数
                : buildRouteParameters((GroupedParameterBuilder) paramBuilder, routeContext, routeUnit);
    }

    /**
     * 构建路由参数
     * @param paramBuilder 参数创建器
     * @param routeContext 路由上下文
     * @param routeUnit 路由单元
     * @return
     */
    private List<Object> buildRouteParameters(final GroupedParameterBuilder paramBuilder, final RouteContext routeContext, final RouteUnit routeUnit) {
        List<Object> result = new LinkedList<>();
        int count = 0;
        // 遍历原始数据节点
        for (Collection<DataNode> each : routeContext.getOriginalDataNodes()) {
            // 找到当前的路由单元的数据节点
            if (isInSameDataNode(each, routeUnit)) {
                // 获取对应下标的分组参数信息
                result.addAll(paramBuilder.getParameters(count));
            }
            count++;
        }
        // 添加通用参数
        result.addAll(paramBuilder.getGenericParameterBuilder().getParameters());
        return result;
    }
    
    private boolean isInSameDataNode(final Collection<DataNode> dataNodes, final RouteUnit routeUnit) {
        if (dataNodes.isEmpty()) {
            return true;
        }
        for (DataNode each : dataNodes) {
            if (routeUnit.findTableMapper(each.getDataSourceName(), each.getTableName()).isPresent()) {
                return true;
            }
        }
        return false;
    }

    /**
     * 翻译转换
     * @param sqlStatement 查询语句
     * @param sqlRewriteUnits 按路由单元重写后的sql单元
     * @return
     */
    private Map<RouteUnit, SQLRewriteUnit> translate(final SQLStatement sqlStatement, final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits) {
        Map<RouteUnit, SQLRewriteUnit> result = new LinkedHashMap<>(sqlRewriteUnits.size(), 1);
        // 遍历SQL重写单元
        for (Entry<RouteUnit, SQLRewriteUnit> entry : sqlRewriteUnits.entrySet()) {
            // 获取对应单元数据源的数据库类型
            DatabaseType storageType = storageTypes.get(entry.getKey().getDataSourceMapper().getActualName());
            // 通过配置的翻译规则,执行sql翻译
            String sql = translatorRule.translate(entry.getValue().getSql(), sqlStatement, protocolType, storageType);
            // 翻译后,重新创建SQLRewriteUnit
            SQLRewriteUnit sqlRewriteUnit = new SQLRewriteUnit(sql, entry.getValue().getParameters());
            result.put(entry.getKey(), sqlRewriteUnit);
        }
        return result;
    }
}

6.1 rewrite()方法

在rewrite()中,执行如下:

1)调用aggregateRouteUnitGroups(),遍历路由单元,获取路由单元中的实际数据源名称,按真实数据源名称对路由单元进行分组,同一个数据源放在同一个集合中;

2)按数据源名称遍历进行遍历;

2.1)获取对应数据源的路由单元集合;

2.2)判断是否需要聚合重写,需要则进行重写;

2.2.1)如果不是查询语句 || 路由单元只有一个,说明不需要聚合,返回false;

2.2.2)(没有子查询 || join查询) && 没有排序和分页 && 没有锁部分,返回true;否则为false;

2.2.3)如果以上返回true,则表明通过一个数据源,有多个路由单元,而此处的多个路由单元数据源映射是一样的,不同的是表映射。则执行createSQLRewriteUnit(),创建一个SQLRewriteUnit对象,以路由单元为key,SQLRewriteUnit对象为value,添加到Map中;

2.3)如果不需要聚合重写,则执行 addSQLRewriteUnits(),添加重写单元;

遍历路由单元,每个路由单元创建一个SQLRewriteUnit,每个SQLRewriteUnit对sql进行重写。

3)执行translate()方法,进行翻译转换;

遍历SQL重写单元,执行 ranslatorRule 配置的翻译规则的translate()方法,进行翻译,获取新的sql字符串,创建新的SQLRewriteUnit,替换原来的SQLRewriteUnit对象;

4)创建一个RouteSQLRewriteResult对象,返回该对象;

6.2 createSQLRewriteUnit()方法

createSQLRewriteUnit()方法执行如下:

1)判断是select语句是否包含$符号,保存到containsDollarMarker变量;

2)遍历路由单元,执行如下:

2.1)创建RouteSQLBuilder对象,执行toSQL()方法,重写SQL语句。在toSQL()方法中,遍历SQLRewriteContext对象中的重写令牌,重写拼接SQL语句。如在表的令牌对象(TableToken)中,结合路由单元和分配规则,获取SQL语句执行的真实表名,并进行替换;

2.2)如果containsDollarMarker为true && 有参数值,跳过;

2.3)获取参数值;

3)使用union all 连接多个sql语句,创建新的SQLRewriteUnit对象;

6.3 addSQLRewriteUnits() 方法

对于不需要聚合重写的路由单元,则直接遍历路由单元,每个路由单元创建一个RouteSQLBuilder对象,执行toSQL()方法,重写SQL语句。重写后创建SQLRewriteUnit对象。

小结

以上为本篇分析的全部内容,以下做一个小结:

ShardingSpherePreparedStatement在执行SQL语句前,会进行SQL路由。通过配置的路由规则,创建RouteContext对象,在RouteContext路由上下文对象中,包含了SQL真正执行的数据源、逻辑表及真实表的映射。

创建完RouteContext路由上下文对象之后,执行rewrite()进行SQL重写。重写的执行如下:

1)创建一个SQLRewriteEntry对象,执行rewrite()方法;

在SQLRewriteEntry对象的构造方法中,通过SPI,结合配置的规则,获取重写装饰器。如分片重写装饰器、加密重写装饰器;

2)在rewrite()方法中,执行SQL重写,创建SQLRewriteResult对象;

2.1)创建SQLRewriteContext对象;

a)在SQLRewriteContext对象中,保存了当前执行的SQL的语句上下文对象、参数、重写的令牌等。重写装饰器对象(如ShardingSQLRewriteContextDecorator)根据SQL的类型(如分页、自动生成主键等)添加对应的重写令牌到SQLRewriteContext对象;

b)重写令牌主要用于信息的替换;

c)在构造方法中,添加RemoveTokenGenerator生成器,用于生成RemoveToken令牌。该令牌主要用于SQL字符串中某些字符串的移除(替换为空)。如移除SQL语句中的owner.table中的owner信息等;

2.1.1)创建SQLRewriteContext对象;

2.1.2)执行decorate()方法,遍历重写装饰器,执行装饰器的decorate()方法。如设置了分片规则,则执行ShardingSQLRewriteContextDecorator的decorate()进行SQLRewriteContext对象的装饰增强。如添加参数重写器创建器、SQL令牌生成器;

2.1.3)执行SQLRewriteContext对象的generateSQLTokens(),遍历sql令牌生成器,创建sql令牌。如分页令牌、自动主键令牌、distinct()令牌等;

2.2)从全局规则元数据中获取SQL转换器规则对象;

2.3)创建SQL重写引擎,执行重写引擎的rewrite()方法;

如果路由上下文中的路由单元为空,说明没有路由映射,创建GenericSQLRewriteEngine;否则创建RouteSQLRewriteEngine。然后执行重写引擎的rewrite()方法,返回一个SQLRewriteResult对象;

2.4)在RouteSQLRewriteEngine重写引擎的rewrite()方法中,执行如下:

2.4.1)遍历路由单元,获取路由单元中的实际数据源名称,按真实数据源名称对路由单元进行分组,同一个数据源放在同一个集合中;

2.4.2)按数据源名称遍历进行遍历;

a)如果同一个数据源的多个路由单元可以聚合重写,则执行SQL重写,重新拼接SQL语句,使用union all对多个SQL语句进行联合查询。生成一个SQLRewriteUnit对象;

b)如果不需要联合查询,则执行SQL重写,重新拼接SQL语句。生成一个SQLRewriteUnit对象;

2.4.3)执行translate()方法,进行翻译转换;

遍历SQL重写单元,执行 ranslatorRule 配置的翻译规则的translate()方法,进行翻译,获取新的sql字符串,创建新的SQLRewriteUnit,替换原来的SQLRewriteUnit对象;

2.4.4)创建一个RouteSQLRewriteResult对象,返回该对象;

关于本篇内容你有什么自己的想法或独到见解,欢迎在评论区一起交流探讨下吧。


http://www.kler.cn/a/505065.html

相关文章:

  • IEC103 转 ModbusTCP 网关
  • 【网络协议】ACL(访问控制列表)第二部分
  • Re78 读论文:GPT-4 Technical Report
  • 中台成熟度模型有什么用
  • 2025特种设备安全管理人员免费题库限时练习(判断题)
  • RocketMQ消息发送---源码解析
  • 【FAQ】HarmonyOS SDK 闭源开放能力 —Map Kit(4)
  • 系统架构设计师考点—UML建模和设计模式
  • ASP.NET Core 全局异常处理
  • 【NLP高频面题 - 分布式训练篇】分布式训练主要解决大模型训练中的哪些问题?
  • Android中下载 HAXM 报错 HAXM installation failed,如何解决?
  • Jmeter进行http接口并发测试
  • MyBatis 中动态 SQL 标签
  • 后端技术选型 sa-token校验学习 中 文档学习
  • 庖丁解java(一篇文章学java)
  • 浅谈PHP之线程锁
  • 【实践】操作系统智能助手OS Copilot新功能测评
  • C语言初阶习题【30】字符串左旋
  • ECharts 折线图隐藏标点
  • Maven 配置本地仓库