当前位置: 首页 > article >正文

【Airflow】构建爬虫任务系统

爬虫脚本太多了需要进行管理一下,领导决定使用airflow
我了解了一下这个平台是用来做任务调度。
是一个ETL工具
ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程
在这里插入图片描述
这里是一个github的地址

https://github.com/apache/airflow

这里是官方文档

https://airflow.apache.org/docs/apache-airflow/stable/index.html

这里是我学习的视频资料

https://www.bilibili.com/video/BV19f4y1V7UG/
博主的视频有一些老了,不过也是可以学习的。

先看一下成果吧
在这里插入图片描述

这里是首页记录了,我得dags任务
dags就相当于我得一个爬虫任务,
在这里插入图片描述
我们进入这个dag里面,可以看到
有4个小任务(task)和一task执行的状态,还有一些基本信息。
在这里插入图片描述
可以看每一个task的日志,如果发生问题也可以更方便的定位问题的。
每个dag都有很多配置,比如定时任务,失败重试,自动拉起,报警邮件等功能。
以后再也不用上服务器去看日志了
嘻嘻嘻嘻嘻

conda环境安装

这里需要一个干净的环境,我使用的miniconda
来到 conda网站

https://docs.conda.io/projects/miniconda/en/latest/miniconda-other-installer-links.html

在这里插入图片描述
这里选择linux 的 我是用的是python3.8
然后复制这个链接地址。
在这里插入图片描述
然后我们到服务器上将这个包下好。
执行命令

wget https://repo.anaconda.com/miniconda/Miniconda3-py38_23.9.0-0-Linux-x86_64.sh

在这里插入图片描述
下好之后。
安装好之后,执行

bash Miniconda3-py38_23.9.0-0-Linux-x86_64.sh

然后一直yes 就ok
在这里插入图片描述
conda这里就装好了
在这里插入图片描述
也带着python。然后我们创建一个环境

conda create -n airflow  # 创建环境
conda activate airflow   # 激活环境

这样环境就差不多了
安装一下airflow

 conda install apache-airflow

在这里插入图片描述
这里会依赖很多包
在这里插入图片描述
执行airflow命令,出现这些,说明airflow已经装好了。

配置airflow

找到你得airflow路径,
里面会有一个airflow.cfg文件
在这里插入图片描述
这里是配置dags的文件路径
在这里插入图片描述
这里默认使用的是sqlite数据库,
后续也可以改成mysql

https://blog.csdn.net/qq_43439214/article/details/129898191
这个博主的文章很不错,将airflow的数据库换成了mysql

我这里就先用默认的了,
执行命令

airflow db init 

在这里插入图片描述
出现下面内容说明数据库已经初始化好了。
执行下面命令进行创建用户

airflow users create --username zhang --firstname zhang --lastname zhang --role Admin --email  airflow@example.com

并为其用户设置密码。
配置好了之后
我们开一个会话
在这里插入图片描述
来通过webserver开启 web服务,这里在要访问你得公网地址默认端口是8080
在这里插入图片描述
到这里来到了ui界面输入,你刚刚创建的用户名和密码。
在这里插入图片描述
来到这里了。
这样我们的配置基本就没问题了。

举个例子

来举一个例子,
来写一个dags看一下。

from __future__ import annotations
# [START tutorial]
# [START import_module]
import os
import sys
from datetime import datetime, timedelta
from textwrap import dedent
from airflow.operators.python import PythonOperator, BranchPythonOperator


# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

# [END import_module]
# [START instantiate_dag]


def first_task():
    print("这里是first_task")
    print("这里是first_task")
    print("这里是first_task")
    print("这里是first_task")
    print("这里是first_task")


def second_task():
    print("这里是second_task")
    print("这里是second_task")
    print("这里是second_task")
    print("这里是second_task")
    print("这里是second_task")


with DAG(
    dag_id="test_airflow",
    default_args={
        'owner': "guapisansan",
        "depends_on_past": True,
        "email": ["177664833@qq.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
    # [END default_args]
    description="测试一把",
    schedule_interval='0 15 * * *',  # 设置调度间隔为每天下午六点,
    catchup=False,
    start_date=datetime(2023, 10, 23),
    tags=["test"],
) as dag:
    first_task_op = PythonOperator(
        task_id="first_task",
        python_callable=first_task
    )

    second_task_op = PythonOperator(
        task_id="second_task",
        python_callable=second_task
    )

    first_task_op >> second_task_op


我这里给了一段非常简单的代码。
大概意思就是有两个task,分别是first和second 他们对应的也分别是两个python函数
配置了一些参数,如 description dag的简介, 还有一些参数,可以在官方文档查询。

记不记得我们的airflow.cfg文件里面有一个这个参数
在这里插入图片描述
这里就是我们存放所有 dags的路径。一定要是绝对路径
将刚刚写的脚本放到这个路径下,
然后我们在执行命令

airflow dags list

在这里插入图片描述
这里存在我刚刚的创建的test_airflow
这时候我们在创建一个会话
执行命令

airflow scheduler

这是一个调度命令
这时候我们刷新刚刚的web页面
在这里插入图片描述
发现我们dags已经挂到ui页面上了
我们点进去看看
在这里插入图片描述
看到了我们的子任务task 出现了,
现在需要手动触发一下看看结果。
在这里插入图片描述
点击这个trigger按钮
在这里插入图片描述
可以看到这两个task都执行成功了。
右边的框里面有执行时间,还有一些任务的基础信息,
在这里插入图片描述
在这里插入图片描述
点击小图片查看任务执行的日志,
发现我们print打印出来了,
我这里只是举一个小例子先跑起来,可以根据自己的脚本进行配置等等。
airflow是一个非常好用的任务调度工具,相对于数据处理更好,我们当作爬虫管理系统也可以用。
用它将分散的脚本整理,更加方便观察和调度。


http://www.kler.cn/a/103672.html

相关文章:

  • django报错--Not Found The requested URL was not found on the server.
  • 【【萌新的SOC学习之SD卡DMA回路读写大数据的实验】】
  • 多级缓存入门
  • 安卓主板_MTK联发科4G低功耗安卓主板开发板方案
  • Git - 导出(archive)、忽略(gitignore)、隐藏(Stash)、合并冲突(merge)的解决方法
  • 【机器学习合集】标准化与池化合集 ->(个人学习记录笔记)
  • 订水商城H5实战教程-04用户注册
  • VMwarePlayer安装Ubuntu,切换中文并安装中文输入法
  • Python基础教程:列表推导式详解
  • 【Java网络初识】 一
  • K8s 概念及组件
  • gRPC之gRPC转换HTTP
  • paddlepaddle使用实践过程中的问题记录
  • 基于大数据的社交平台数据爬虫舆情分析可视化系统 计算机竞赛
  • 软件工程与计算总结(二十三)软件工程职业基础
  • 基于XML的Web服务Java接口(JAX-WS)、Jakarta XML Web Services Eclipse 实现
  • WebSocket 入门案例
  • iOS 配置通用链接(Universal Link)服务端和开发者后台都配置好了,还是跳转不到App
  • 【C语言基础】函数的参数是指针
  • pynvml.nvml.NVMLError_FunctionNotFound: Function Not Found