当前位置: 首页 > article >正文

Apache Airflow (十) :SSHOperator及调度远程Shell脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. 安装“apache-airflow-providers-ssh ”provider package

2. 配置SSH Connection连接

3. 准备远程执行脚本

4. 编写DAG python配置文件

5. 调度python配置脚本


在实际的调度任务中,任务脚本大多分布在不同的机器上,我们可以使用SSHOperator来调用远程机器上的脚本任务。SSHOperator使用ssh协议与远程主机通信,需要注意的是SSHOperator调用脚本时并不会读取用户的配置文件,最好在脚本中加入以下代码以便脚本被调用时会自动读取当前用户的配置信息:

#Ubunto系统

. ~/.profile



#CentoOS或者RedHat系统

. ~/.bashrc

关于SSHOperator参数详解可以参照:

airflow.providers.ssh.operators.ssh — apache-airflow-providers-ssh Documentation

SSHOperator的常用参数如下:

ssh_conn_id(str):ssh连接id,名称自取,需要在airflow webserver界面配置,具体配置参照案例。

remote_host(str):远程连接节点host,如果配置,可替换ssh_conn_id中配置的远程host,可选。
command(str):在远程主机上执行的命令或脚本。

按照如下步骤来使用SSHOperator调度远程节点脚本:

1. 安装“apache-airflow-providers-ssh ”provider package

首先停止airflow webserver与scheduler,在node4节点切换到python37环境,安装ssh Connection包。另外,关于Providers package安装方式可以参照如下官网地址:

https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html#apache-airflow-providers-ssh

#切换Python37环境

[root@node4 ~]# conda activate python37



#安装ssh provider package

(python37) [root@node4 ~]# pip install apache-airflow-providers-ssh==2.1.1



#启动airflow

(python37) [root@node4 ~]# airflow webserver --port 8080

(python37) [root@node4 ~]# airflow scheduler

2. 配置SSH Connection连接

登录airflow webui ,选择“Admin”->“Connections”:

点击“+”添加连接,这里host连接的是node5节点:

3. 准备远程执行脚本

在node5节点/root路径下创建first_shell.sh,内容如下:

#!/bin/bash

echo "==== execute first shell ===="

在node3节点/root路径下创建second_shell.sh,内容如下:

#!/bin/bash

echo "==== execute second shell ===="

4. 编写DAG python配置文件

注意在本地开发工具编写python配置时,需要用到SSHOperator,需要在本地对应的python环境中安装对应的provider package。

C:\Users\wubai>d:

D:\>cd d:\ProgramData\Anaconda3\envs\python37\Scripts

d:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow-providers-ssh==2.1.1

python配置文件:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperator

default_args = {
    'owner':'lisi',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_remote_shell',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=SSHOperator(
    task_id='first',
    ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
    command='sh /root/first_shell.sh ',
    dag = dag
)

second=SSHOperator(
    task_id='second',
    ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
    command='sh /root/second_shell.sh ',
    remote_host="192.168.179.6",#如果配置remote_host ,将会替换Connection中的SSH 配置的host
    dag=dag
)

first >> second

5. 调度python配置脚本

将以上配置好的python文件上传至node4节点$AIRFLOW_HOME/dags下,重启Airflow websever与scheduler,登录webui,开启调度:

调度结果如下:



http://www.kler.cn/a/131865.html

相关文章:

  • uniapp 手动调用form表单submit事件
  • vue3+vite+ts 发布自定义组件到npm
  • 竞赛选题 深度学习驾驶行为状态检测系统(疲劳 抽烟 喝水 玩手机) - opencv python
  • C#的MessagePack(unity)--02
  • Spring Boot - devtools 热部署
  • 【python】Conda强大的包/环境管理工具
  • MyBatis #{} 和 ${} 的区别
  • Diagrams——制作短小精悍的流程图
  • 数据结构与算法之美学习笔记:20 | 散列表(下):为什么散列表和链表经常会一起使用?
  • 【Android】导入三方jar包/系统的framework.jar
  • JavaScript对象
  • 每日OJ题_剑指offer数组篇
  • vue监听对象属性值变化
  • 某头部通信企业:SDLC+模糊测试,保障数实融合安全发展
  • Scala---样例类+隐式转换
  • 云计算(一):弹性计算概述
  • Apriori算法
  • 报错资源不足,k8s使用containerd运行容器修改挂载点根目录换成/home
  • mongodb 6/7的 windows安装问题
  • 电子学会C/C++编程等级考试2021年09月(一级)真题解析