当前位置: 首页 > article >正文

Apache Airflow (十二) :PythonOperator

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。

关于PythonOperator常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation

python_callable(python callable):调用的python函数

op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。

op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。

PythonOperator调度案例

import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

# python中 *  关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。
# python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
def print__hello1(*a,**b):
    print(a)
    print(b)
    print("hello airflow1")

# 返回的值只会打印到日志中
    return{"sss1":"xxx1"}

def print__hello2(random_base):
    print(random_base)
    print("hello airflow2")

# 返回的值只会打印到日志中
    return{"sss2":"xxx2"}

default_args = {
    'owner':'maliu',
    'start_date':datetime(2021, 10, 1),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_pythoncode',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=PythonOperator(
    task_id='first',
    #填写  print__hello1 方法时,不要加上“()”
    python_callable=print__hello1,
    # op_args 对应 print_hello1 方法中的a参数
    op_args=[1,2,3,"hello","world"],
    # op_kwargs 对应 print__hello1 方法中的b参数
    op_kwargs={"id":"1","name":"zs","age":18},
    dag = dag
)

second=PythonOperator(
    task_id='second',
    #填写  print__hello2 方法时,不要加上“()”
    python_callable=print__hello2,
    # random_base 参数对应 print_hello2 方法中参数“random_base”
    op_kwargs={"random_base":random.randint(0,9)},
    dag=dag
)

first >> second


http://www.kler.cn/a/136292.html

相关文章:

  • drawDB docker部属
  • 图像处理 | 图像二值化
  • Dart语言的语法糖
  • CSS——22.静态伪类(伪类是选择不同元素状态)
  • execl条件比较两个sheet每个单元格的值
  • 推动多语言语音科技迈向新高度:INTERSPEECH 2025 ML-SUPERB 2.0 挑战赛
  • 【Linux】【开发】使用sed命令遇到的乱码问题
  • 内置函数和消息传递API
  • 类与对象(上篇)
  • WinForms C# 导入和导出 CSV 文件 Spread.NET
  • Rust开发——切片(slice)类型
  • -bash: jps: command not found
  • React整理总结(五、Redux)
  • 【左程云算法全讲11】贪心算法 并查集
  • k8s的高可用集群搭建,详细过程实战版
  • 原型模式-C++实现
  • 《崩坏:星穹铁道》1.5仙舟罗浮-绥园全宝箱攻略
  • 【Linux】软连接和硬链接:创建、管理和解除链接的操作
  • Flutter 中数据存储的四种方式
  • 机器学习笔记 - Ocr识别中的CTC算法原理概述
  • JVM:内存模型、内存分配机制、内存分配冲突、JVM垃圾标记算法、JVM1.8增加元数据区缘由
  • python中sklearn库在数据预处理中的详细用法,及5个常用的Scikit-learn(通常简称为 sklearn)程序代码示例
  • 机器学习第8天:SVM分类
  • 创新工具 | 教你6步用故事板设计用户体验事半功倍
  • 【计算机网络笔记】路由算法之链路状态路由算法
  • 集合的自反关系和对称关系