当前位置: 首页 > article >正文

解析 Apache SeaTunnel 的任务运行过程

示例代码运行

感兴趣的朋友可以先看看这篇文章《记本地第一次运行SeaTunnel示例项目》:https://blog.csdn.net/u011924665/article/details/143373017

基于分支

2.3.5-Release

跟踪过程

入口

Apache SeaTunnel提供的官方Example基于SeaTunnel Engine引擎的示例作为入口:org.apache.seatunnel.example.engine.SeaTunnelEngineExample

构建SeaTunnel引擎的命令行运行对象

这里默认会使用seatunnel-engine-example子项目的resources下的example/fanke\_to\_console.conf这个任务配置文件,并且以本地方式运行。

首先解析任务配置文件,构建ClientCommandArgs对象。

    public static void main(String[] args)
            throws FileNotFoundException, URISyntaxException, CommandException {
        String configurePath = args.length > 0 ? args[0] : "/examples/fake_to_console.conf";
        String configFile = getTestConfigFile(configurePath);
        ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
        clientCommandArgs.setConfigFile(configFile);
        clientCommandArgs.setCheckConfig(false);
        clientCommandArgs.setJobName(Paths.get(configFile).getFileName().toString());
        // Change Execution Mode to CLUSTER to use client mode, before do this, you should start
        // SeaTunnelEngineServerExample
        clientCommandArgs.setMasterType(MasterType.LOCAL);
        SeaTunnel.run(clientCommandArgs.buildCommand());
    }

最后一行中,ClientCommandArgsbuildCommand()方法会根据任务配置文件构建出一个Command对象。

默认配置文件构建出来的就是一个ClientExecuteCommand(org.apache.seatunnel.core.starter.seatunnel.command包下)对象,简而言之,这就是一个通过命令行执行的SeaTunnel引擎对象。

ClientExecuteCommand开始运行job

seatunnel-engine-example示例代码中最后的SeaTunnel.run()方法就是调用ClientExecuteCommandexecute()方法开始,然后通过命令行的方式运行SeaTunnel的Job。

推荐以下步骤,对比源码一起阅读。

声明Job监控器对象的引用
JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null;

整个Execute的核心内容都被包裹在一个try-catch-finally代码块中,此处声明一个Job监控器是为了在后续的Finally代码块中获取到任务执行结束(无论是成功还是失败)后的统计信息。

获取SeaTunnel的配置

SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();

seatunnel-engine-example中获取的是SeaTunnel任务配置,而在此处获取的是SeaTunnel的配置,生成配置的对象。

获取Haselcast的客户端的配置

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();

与加载SeaTunnel的配置类似,加载HazelcastClient配置,生成配置对象。

本地方式运行

clusterName = creatRandomClusterName(
             StringUtils.isNotEmpty(clusterName)
                 ? clusterName : Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);

本地方式运行且没有设置集群名称,则会默认以SeaTunnel开头后面添加随机数,生成一个集群名称。

instance = createServerInLocal(clusterName, seaTunnelConfig);

然后创建一个本地的Hazelcast客户端,创建成功之后,记录下来Hazelcast的端口号,用于在执行SeaTunnel的Job时使用。

确认(记录)集群名称

将本次Job执行的集群名称记录到SeaTunnel和SeaTunnel Client的配置对象中。

创建SeaTunnel Engine客户端

engineClient = new SeaTunnelClient(clientConfig);

创建一个SeaTunnel Engine的客户端,基于Hazelcast创建的。

创建出Engine Client对象后就是一列的判断。看起来是通过启动命令中指定来实现了,推测与任务恢复、状态等信息查询有关,暂未深入探究,感兴趣的小伙伴🙏可以研究研究。

准备执行Job

Path configFile = FileUtils.getConfigPath(clientCommandArgs);
checkConfigExist(configFile);
JobConfig jobConfig = new JobConfig();

获取Job的配置文件,并对文件进行检查,然后创建Job配置对象。

jobExecutionEnv = engineClient
    .createExecutionContext(configFile.toString(), jobConfig, seaTunnelConfig);

使用Job配置对象,基于通过前面创建的SeaTunnel Engine客户端创建一个Job运行环境对象jobExecutionEnv

开始执行

ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

通过调用jobExecutionEnvexecute()方法开始执行Jb,并返回一个Job代理对象:ClientJobProxy。

execute()方法的跟踪见:记第一次跟踪seatunnel的任务运行过程二——ClientJobExecutionEnvironment的execture方法

然后检查一下当前Job运行是否为异步模式,若为异步模式且不是本地模式运行,则流程结束。

若为非异步模式或任务是本地方式(seatunnel-engine-example中默认的是本地模式)执行,则继续执行以下流程。

注册任务取消的钩子函数
Runtime.getRuntime().addShutdownHook(
      new Thread(() -> { CompletableFuture<Void> future =
            CompletableFuture.runAsync(() -> {
                log.info("run shutdown hook because get close signal");
                shutdownHook(clientJobProxy);
            });
            try {
               future.get(15, TimeUnit.SECONDS);
            } catch (Exception e) {
               log.error("Cancel job failed.", e);
            }
      })
);
打印监控信息

启动一个新的定时线程,用于打印监控信息。

JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId);
executorService = Executors.newSingleThreadScheduledExecutor(
                                new ThreadFactoryBuilder()
                                        .setNameFormat("job-metrics-runner-%d")
                                        .setDaemon(true)
                                        .build());
executorService.scheduleAtFixedRate(
        jobMetricsRunner,
        0,
        seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
        TimeUnit.SECONDS
);

这个线程打印的就是如下的监控信息:

    ***********************************************
               Job Progress Information
    ***********************************************
    Job Id                    :  904173403390000097
    Read Count So Far         :               23412
    Write Count So Far        :               23412
    Average Read Count        :              1233/s
    Average Write Count       :              1233/s
    Last Statistic Time       : 2024-10-31 19:04:02
    Current Statistic Time    : 2024-10-31 19:05:02
    ***********************************************
等到Job执行结果

线程在此同步的等待Job的执行结果。

结尾

如果Job的结果的错误信息不为空或者Job的结果为失败,则抛出异常。

正常结束则打印出Job的统计信息,形式如下:

    ***********************************************
               Job Statistic Information
    ***********************************************
    Start Time                : 2024-10-31 19:02:18
    End Time                  : 2024-10-31 19:05:32
    Total Time(s)             :                 194
    Total Read Count          :              239202
    Total Write Count         :              239202
    Total Failed Count        :                   0
    ***********************************************

本文由 白鲸开源科技 提供发布支持!


http://www.kler.cn/a/401213.html

相关文章:

  • IO流(九):打印流-字节打印流PrintStream、字符打印流PrintWriter
  • 博客文章怎么设计分类与标签
  • 面向服务的软件工程——业务流程合规性(Business Process Compliance)(week12)
  • 【经验分享】2024年11月下半年软件设计师考试选择题估分(持续更新~~)
  • 海量数据面试题
  • gitlab容器的迁移(部署)并配置自动备份
  • 第7章硬件测试-7.4 专业实验
  • 内容分发网络CDN、动态内容缓存简介
  • Vite 基础理解及应用
  • ThreadLocal 和 Caffeine 缓存是两种不同的缓存机制,它们在用途和实现上有明显的区别
  • hash表和B树
  • 《人工智能深度学习的基本路线图》
  • DevOps-Jenkins-新手入门级
  • IndentationError: unindent does not match any outer indentation level
  • 汽车资讯新高度:Spring Boot技术飞跃
  • 百度智能云 VectorDB 优势数量 TOP 1
  • 湘潭大学软件工程算法设计与分析考试复习笔记(三)
  • 【Hadoop】【大数据技术基础】实践三 NoSQL数据库 大数据基础编程、实验和案例教程(第2版)
  • opencascade源码学习之BRepOffsetAPI包 -BRepOffsetAPI_DraftAngle
  • 大话C++:第28篇 详解独占智能指针
  • Vue3中使用:deep修改element-plus的样式无效怎么办?
  • 【算法】P5018 对称二叉树
  • 基于YOLOv8深度学习的智慧课堂教师上课行为检测系统研究与实现(PyQt5界面+数据集+训练代码)
  • gvim添加至右键、永久修改配置、放大缩小快捷键、ctrl + c ctrl +v 直接复制粘贴、右键和还原以前版本(V)冲突
  • 《原子与分子物理学报》
  • 玩转view和text组件与相关动画的使用