当前位置: 首页 > article >正文

Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. BashOperator 调度Shell命令案例

2. BashOperator 调度Shell脚本案例


Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用参数如下:

task_id(str) : 唯一task_id标记



owner(str):任务的所有者,建议使用linux用户名



email(str or list[str]):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。



email_on_retry(bool):当任务重试时是否发送电子邮件



email_on_failure(bool):当任务执行失败时是否发送电子邮件



retries(int):在任务失败之前应该重试的次数



retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象



start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。



end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。



depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。



dag(airflow.models.DAG):指定的dag。



execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。



trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。

BashOperator主要执行bash脚本或命令,BashOperator参数如下:

bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)

1. BashOperator 调度Shell命令案例

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'email':'kettle_test1@163.com', #pwd:kettle123456
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_cmd',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

t1=BashOperator(
    task_id='print_date',
    bash_command='date',
    dag = dag
)

t2=BashOperator(
    task_id='print_helloworld',
    bash_command='echo "hello world!"',
    dag=dag
)

t3=BashOperator(
    task_id='tempplated',
    bash_command="""
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ params.name}}"
        echo "{{ params.age}}"
    {% endfor %}
    """,
    params={'name':'wangwu','age':10},
    dag=dag
)

t1 >> t2 >> t3

注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5

此外,配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

2. BashOperator 调度Shell脚本案例

准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bash

dt=$1

echo "==== execute first shell ===="

echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bash

dt=$1

echo "==== execute second shell ===="

echo "---- second : time is ${dt}"

编写airflow python 配置:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_sh',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=BashOperator(
    task_id='first',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag = dag
)

second=BashOperator(
    task_id='second',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag=dag
)

first >> second

执行结果:

特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:



http://www.kler.cn/a/133343.html

相关文章:

  • 机器学习day5-随机森林和线性代数1
  • Java21和Java8性能优化详细对比
  • 使用 Python 和 OpenCV 实现摄像头人脸检测并截图
  • Flink Job更新和恢复
  • 【卡尔曼滤波】数据融合Fusion的应用 C语言、Python实现(Kalman Filter)
  • 【Three.js基础学习】22.New project structure
  • 【开源】基于Vue和SpringBoot的康复中心管理系统
  • 计数排序.
  • Nuxt3框架局部文件引用外部JS/CSS文件的相关配置方法
  • 融合语言模型中的拓扑上下文和逻辑规则实现知识图谱补全11.18
  • PCL内置点云类型
  • Vue中给对象添加新属性时,界面不刷新怎么办?
  • PS学习笔记——移动工具
  • 【LeetCode刷题日志】225.用队列实现栈
  • 宗老师计算机教学-大型集群开发基础知识
  • 软件需求调研指南
  • 汽车CAN/ CAN FD数据记录仪在上汽大通诊断测试部门的应用
  • PyTorch微调权威指南3:使用数据增强
  • 【论文阅读】基于隐蔽带宽的汽车控制网络鲁棒认证(一)
  • 【0235】修改私有内存(private memory)中的MyBEEntry时,st_changecount值前后变化
  • 「Java开发指南」如何在Spring中使用JAX-WS注释器?
  • 【JavaEE】Servlet API 详解(HttpServletResponse类方法演示、实现自动刷新、实现自动重定向)
  • 服务器数据恢复—服务器raid5离线磁盘上线同步失败的数据恢复案例
  • C#装箱与拆箱详解
  • 实验三 页面置换算法
  • 数据结构【DS】树和森林的遍历对应关系