当前位置: 首页 > article >正文

3.阿里云flinkselectdb-py作业

1.概述

Python API中文文档
本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发;
先说结论:
在实际开发中遇到了很多问题,导致python作业基本基本无法运行。最后放弃了;

  • python作业中的标量函数的错误没有日志,永远是报这个错误:ExceptionInChainedOperatorException: Could not forward element to next operator,定位不到具体问题;
  • python作业中的用户定义的标量函数基本无法运行。本地测试没有问题的函数,提交到flink中就报错。怀疑是环境中没有flink-python.jar,自己上传此jar和flink中的包不兼容(阿里云flink和开源版本flink有些jar包不一样);
  • 如果各位遇到些问题并且有解决方案,麻烦也告知我,非常感谢;

2.目标

把阿里云sls日志中的数据准实时同步到云服务selectdb;

源表flink结果表
阿里云sls实时计算flink云服务selectdb

3.步骤

3.1.搭建环境

#**创建虚拟环境essa-flink,pyhton版本为3.11.9
conda create -n essa-flink python=3.11.9

#**安装apache-flink-1.20版本。安装的依赖比较大,指定国内的pip源
pip install apache-flink==1.20.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

3.2.创建作业

作业代码本身很简单,逐行读取sls的日志,进行转换后保存到selectdb中。转换函数为do_active_log,在本地测试过程中遇到了第一个问题后,很轻松愉快就通过了。部署在flink中出现了其它问题;

  • 首先是阿里云提供sls连接器(ververica-connector-sls-1.17-vvr-8.0.8.jar)不可用,报错缺少com/alibaba/ververica/connectors/sls/source/SLSRecordReader。查看源码,确实没有定义此类。提工单后,建设使用低版本解决;
  • 然后报错缺少flink-python,不能执行python函数。于是把flink-python上传,并在作业中引用依赖;
  • 最后报错ExceptionInChainedOperatorException: Could not forward element to next,无法执行。把作业中函数调用do_active_log删除后正常。提工单后还是没有解决。最后放弃,改用jar作业;
def do_active_log(row: Row) -> Row:
    '''用户登录日志处理'''
    logging.info('执行do_active_log函数...')
    params = json.loads(row[2])

    occurred = datetime.fromtimestamp(float(row[1]))
    user_id = params['userId']
    platform = params['platform']
    last_active_time = occurred
    create_time = occurred
    id = occurred.strftime("%Y%m%d") + str(user_id)

    return Row(str(id), int(user_id), platform, last_active_time, create_time)

def create_active_log_sink_table(table_env: StreamTableEnvironment, sink_table: str):
    '''创建用户登录日志结果表'''
    sql = '''
        create temporary table {}(
            id string,
            user_id int,
            platform string,
            last_active_time timestamp,
            create_time timestamp,
            primary key(id) not enforced
        ) with (
            'connector' = 'doris',
            'fenodes' = '{}',
            'table.identifier' = '{}',
            'username' = '{}',
            'password' = '{}',
            'sink.properties.format' = 'json'
        )
        '''.format(sink_table, sink_config['fenodes'], sink_config['table.identifier'], 
                   sink_config['username'], sink_config['password'])
    table_env.execute_sql(sql)

def get_soruce_datastream(table_env: StreamTableEnvironment):
    '''创建datastream'''
    times = {'start_time': '', 'stop_time': ''}
    sql = '''
        create temporary table essa_ubc(
            ip string,
            `time` string,
            content string,
            __topic__ string metadata virtual,
            __source__ string metadata virtual,
            __timestamp__ string metadata virtual
        ) with (
            'connector' = 'sls',
            'endpoint' = '{}',
            'accessId' = '{}',
            'accessKey' = '{}',
            'project' = '{}',
            'logstore' ='essa-ubc',
            'startTime' = '{}',
            'stopTime' = '{}',
            'exitAfterFinish' = 'true'
        )
        '''.format(source_config['sls_endpoint'], source_config['access_id'], source_config['access_secret'],
                   source_config['sls_project'], times['start_time'], times['stop_time'])
    table_env.execute_sql(sql)
    source_table = table_env.from_path('essa_ubc')
    return table_env.to_append_stream(source_table, Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),
                                                           Types.STRING(), Types.STRING()]))

if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    #**加载依赖的jar包
    t_env.get_config().set("pipeline.jars", "依赖包.jar")
    
    #**创建sls源
    ds = get_soruce_datastream(t_env)

    #**用户登录日志处理
    #**读取sls日志数据,然后使用自定义标量函数处理数据
    ds = ds.filter(lambda d: d[3] == 'activeLog').map(do_active_log, Types.ROW([Types.STRING(), Types.INT(), Types.STRING(),
                                                                                Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()]))
    table = t_env.from_data_stream(ds)
    active_log_sink_table = 'user_active_log'
    create_active_log_sink_table(t_env, active_log_sink_table)
    table.execute_insert(active_log_sink_table).wait()

http://www.kler.cn/a/460049.html

相关文章:

  • 自由学习记录(31)
  • 从单点 Redis 到 1 主 2 从 3 哨兵的架构演进之路
  • 《Vue3实战教程》35:Vue3测试
  • 【SOC 芯片设计 DFT 学习专栏 -- DFT 为何需要在综合之后插入】
  • Django-Easy-Audit 实战:轻松实现数据审计
  • DeepSeek V3“报错家门”:我是ChatGPT
  • 什么是微服务、微服务如何实现Eureka,网关是什么,nacos是什么
  • PyTorch快速入门教程【小土堆】之Sequential使用和小实战
  • 【RK3588 Linux 5.x 内核编程】-内核IO复用与select
  • 防火墙基础-工作原理
  • 爱思唯尔word模板
  • UE(虚幻)学习(二) 使用UnrealSharp插件让UE支持C#脚本
  • Harbor(2.3.0)的定制页面与安装(x86 arm)
  • 科龙空调:以创新科技,适配多元家居场景
  • 最短路径-Dijkstra 算法
  • 【记录】列表自动滚动轮播功能实现
  • 如何恢复永久删除的PPT文件?查看数据恢复教程!
  • STM32中断详解
  • RabbitMQ基础篇之数据隔离
  • 【机器学习】机器学习的基本分类-半监督学习-半监督生成对抗网络(Semi-supervised GANs)
  • Effective C++ 条款41:了解隐式接口和编译期多态
  • mysql只恢复某个库或某个表
  • 算法环境安装GPU驱动、CUDA、cuDNN、Docker及NVIDIA Container Toolkit
  • node.js文件压缩包解析,反馈解析进度,解析后的文件字节正常
  • Ungoogled Chromium127编译指南 Linux篇 - 项目要求(二)
  • 华为,新华三,思科网络设备指令