当前位置: 首页 > article >正文

Seatunnel解决Excel中无法将数字类型转换成字符串类型以及源码打包

需求

需要实现将Excel中的数字类型的单元格像数据库中字符串类型的字段中推送

问题原因

Seatunnel在读取字段类型的时候都是使用强转的形式去获取数据的
假如说数据类型不一样的话直接强转就会报错

修改位置

org/apache/seatunnel/api/table/type/SeaTunnelRow.java
在这里插入图片描述

org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
在这里插入图片描述

修改的代码

    @Override
    public PreparedStatement toExternal(
            TableSchema tableSchema, SeaTunnelRow row, PreparedStatement statement)
            throws SQLException {
        SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
        for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) {
            SeaTunnelDataType<?> seaTunnelDataType = rowType.getFieldType(fieldIndex);
            int statementIndex = fieldIndex + 1;
            Object fieldValue = row.getField(fieldIndex);
            if (fieldValue == null) {
                statement.setObject(statementIndex, null);
                continue;
            }

            switch (seaTunnelDataType.getSqlType()) {
                case STRING:
                    //TODO wxt
                //region
                    try{
                        //直接类型强转会出问题  比如double类型就不能转成String
                        // 可以使用下面的toString解决这种类型问题
                        statement.setString(statementIndex, (String) row.getField(fieldIndex));
                    }catch (Exception e){
                        statement.setString(statementIndex,  row.getField(fieldIndex).toString());
                    }
                //endregion

                    break;
                case BOOLEAN:
                    statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex));
                    break;
                case TINYINT:
                    statement.setByte(statementIndex, (Byte) row.getField(fieldIndex));
                    break;
                case SMALLINT:
                    statement.setShort(statementIndex, (Short) row.getField(fieldIndex));
                    break;
                case INT:
                    statement.setInt(statementIndex, (Integer) row.getField(fieldIndex));
                    break;
                case BIGINT:
                    statement.setLong(statementIndex, (Long) row.getField(fieldIndex));
                    break;
                case FLOAT:
                    statement.setFloat(statementIndex, (Float) row.getField(fieldIndex));
                    break;
                case DOUBLE:
                    statement.setDouble(statementIndex, (Double) row.getField(fieldIndex));
                    break;
                case DECIMAL:
                    statement.setBigDecimal(statementIndex, (BigDecimal) row.getField(fieldIndex));
                    break;
                case DATE:
                    LocalDate localDate = (LocalDate) row.getField(fieldIndex);
                    statement.setDate(statementIndex, java.sql.Date.valueOf(localDate));
                    break;
                case TIME:
                    writeTime(statement, statementIndex, (LocalTime) row.getField(fieldIndex));
                    break;
                case TIMESTAMP:
                    LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex);
                    statement.setTimestamp(
                            statementIndex, java.sql.Timestamp.valueOf(localDateTime));
                    break;
                case BYTES:
                    statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex));
                    break;
                case NULL:
                    statement.setNull(statementIndex, java.sql.Types.NULL);
                    break;
                case ARRAY:
                    Object[] array = (Object[]) row.getField(fieldIndex);
                    if (array == null) {
                        statement.setNull(statementIndex, java.sql.Types.ARRAY);
                        break;
                    }
                    statement.setObject(statementIndex, array);
                    break;
                case MAP:
                case ROW:
                default:
                    throw new JdbcConnectorException(
                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
                            "Unexpected value: " + seaTunnelDataType);
            }
        }
        return statement;
    }
    private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {
        if (v == null) {
            return 0;
        }
        SqlType sqlType = dataType.getSqlType();
        switch (sqlType) {
            case STRING:
                //region
                //TODO 避免强转出现问题
                try{
                    return ((String) v).length();
                }catch (Exception e){
                    return ( v.toString()).length();
                }
              //endregion
            case BOOLEAN:
            case TINYINT:
                return 1;
            case SMALLINT:
                return 2;
            case INT:
            case FLOAT:
                return 4;
            case BIGINT:
            case DOUBLE:
                return 8;
            case DECIMAL:
                return 36;
            case NULL:
                return 0;
            case BYTES:
                return ((byte[]) v).length;
            case DATE:
                return 24;
            case TIME:
                return 12;
            case TIMESTAMP:
                return 48;
            case ARRAY:
                return getBytesForArray(v, ((ArrayType) dataType).getElementType());
            case MAP:
                int size = 0;
                MapType<?, ?> mapType = ((MapType<?, ?>) dataType);
                for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) {
                    size +=
                            getBytesForValue(entry.getKey(), mapType.getKeyType())
                                    + getBytesForValue(entry.getValue(), mapType.getValueType());
                }
                return size;
            case ROW:
                int rowSize = 0;
                SeaTunnelRowType rowType = ((SeaTunnelRowType) dataType);
                SeaTunnelDataType<?>[] types = rowType.getFieldTypes();
                SeaTunnelRow row = (SeaTunnelRow) v;
                for (int i = 0; i < types.length; i++) {
                    rowSize += getBytesForValue(row.fields[i], types[i]);
                }
                return rowSize;
            default:
                throw new UnsupportedOperationException("Unsupported type: " + sqlType);
        }
    }

如何源码打包

用maven插件打包就行
打完包的话东西在dist下面 如截图所示
在这里插入图片描述


http://www.kler.cn/a/400467.html

相关文章:

  • SpringMVC跨线程获取requests请求对象(子线程共享servletRequestAttributes)和跨线程获取token信息
  • Matlab单输入多输出之同时识别手写数字类别和倾斜角度
  • 用 Android Studio 从零开发一个多功能计算器应用
  • 集群聊天服务器(9)一对一聊天功能
  • 数据科学与SQL:如何计算排列熵?| 基于SQL实现
  • 10月回顾 | Apache SeaTunnel社区动态与进展一览
  • 【jvm】方法区的理解
  • 讨论大语言模型在学术文献应用中的未来与所带来的可能性和担忧
  • C++笔试面试题
  • leetcode 扫描线专题 06-leetcode.836 rectangle-overlap 力扣.836 矩形重叠
  • 无人机动力系统节能技术的未来发展趋势——CKESC电调小课堂12.1
  • Python 神经网络项目常用语法
  • C++---智能指针和内存泄露
  • 【网络安全 | 漏洞挖掘】邮件HTML注入
  • 群控系统服务端开发模式-应用开发-前端部门功能开发
  • 传奇996_25——ctrl+f11,ui标签,绘制自定义面板的参数
  • 学习大数据DAY61 宽表加工
  • 【惠州大亚湾】之维修戴尔服务器DELLR730XD
  • vue下载后端提供的文件/播放音频文件
  • Redis的缓存穿透、缓存雪崩、缓存击穿问题及有效解决方案