300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > 关于Airflow跨DAG依赖总结

关于Airflow跨DAG依赖总结

时间:2020-05-28 09:12:03

相关推荐

关于Airflow跨DAG依赖总结

关于Airflow跨DAG依赖总结

单个DAG中Task之间的依赖

这是最常见的Task之间的依赖,在DAG中有多种方式指定依赖关系

# 定义DAGdag = DAG(...)# 定义task_atask_a = BashOperator(...)# 定义task_btask_b = BashOperator(...)# 指定task依赖关系task_a >> task_btask_a.set_downstream(task_b)# or# task_b << task_a# task_b.set_upstream(task_a)

这里简单指定了同一个DAG中两个task的依赖关系,即task_b依赖task_a,task_b只有等待task_a执行完且成功才能执行,这里task_b是task_a的下游(downstream)

不同DAG中Task之间的依赖

假设有这样一个需求,DAG A和DAG B分别用于处理两种不同的业务,但是DAG B中的task_b只有在DAG A中的task_a执行完且成功的情况下才能执行。很显然此时单个DAG中task的关系依赖已经不能满足需求,这时可以引入一个特殊的Operator:ExternalTaskSensor

ExternalTaskSensor

一种特殊的Operator,用于嗅探DAG外部Task的执行状态,通俗来说,就是让该ExternalTaskSensor作为DAG中的一个Task,专门用来等待外部的Task,而需要做具体业务操作的Task则作为该ExternalTaskSensor的下游,所以,只有ExternalTaskSensor这个Task成功了,下游的Task才能执行,这样便实现了跨DAG依赖的需求

主要参数

external_dag_id(str) – 包含需要等待的外部Task的GAD id

external_task_id(str or None) – 需要等待的外部Task,如果为None则等待整个DAG(默认为None)

allowed_states(Iterable) – DAG或Task允许的状态, 默认是 [‘success’]

failed_states(Iterable) –DAG或Task不允许的状态, 默认是 None

execution_delta(Optional[datetime.timedelta]) – 与之前执行的DAG或Task的时间差。这里的意思是指需要等待的DAG或Task的执行时间在当前DAG或Task的执行时间之前,也就是说在当前DAG或Task执行的时候需要等待的DAG活Task至少已经开始执行。 默认值是当前DAG或Task的执行时间. 如果需要指定前一天的DAG或Task,可以使用datetime.timedelta(days=1). execution_delta和execution_date_fn都可以作为ExternalTaskSensor的参数,但是不能同时使用.

execution_date_fn(Optional[Callable]) – 这是一个以当前执行时间作为参数的方法,用来返回期望的执行时间.通俗来说就是需要等待的DAG或Task执行时间在当前DAG的执行时间之后,用来往后倒一段时间,在该时间点的DAG或Task执行成功之前,该Task会一直等待,直到改时间点有DAG或Task执行成功或当前Task超时execution_delta和execution_date_fn都可以作为ExternalTaskSensor的参数,但是不能同时使用.

check_existence(bool) – 用于检查外部DAG id或者Task id是否存在(当为true时),如果不存在,则立刻停止等待

timeout(int) – 超时时间,单位为秒,如果在超时时间内还未等到外部DAG或Task成功执行,则抛出失败异常,进入重试(如果有重试次数的话)

mode(str) – 指定Task在等待期间的模式,有三种模式:

poke(默认): 在等待期间,Task会一直占用Worker Slotreschedule: Task只会在检查外部DAG或Task时才会占用一个Worker Slot,在两次检查之间会进入sleep状态(ExternalTaskSensor会每隔一段时间检查外部DAG或Task是否已执行成功,默认检查间隔时1分钟)smartsensor: 可进行批量处理的一种模式,内容较为复杂,具体配置可以参看这里Smart Sensors.

显然,在这里推荐使用reschedule模式。

话不多说,上案例

假如现在有两个DAG:mon.1dhandle.data.1d,其中mon.1d用于初始化所有DAG运行需要的环境参数,比如DAG需要运行在哪个环境(test or dev)再将参数保存到MySQL数据库,执行规则是0 1 * * *,也就是每天凌晨1点;而handle.data.1d需要先从MySQL数据库读取环境信息,再处理数据,执行规则是5 1 * * *,也就是每天凌晨1点05分。很显然,handle.data.1d需要等待mon.1d执行完后才能执行。这里虽然handle.data.1d的执行时间在mon.1d之后,但是我们不知道mon.1d具体需要执行多久才能执行完,万一因为环境或者网络因素导致DAG的执行时间超过5分钟,那handle.data.1d在1点05分执行时肯定会失败,所以handle.data.1d需要加一个ExterTaskSensor用来嗅探mon.1d是否在凌晨1点成功执行完毕。

mon.1d

该DAG中有四个Task: info_bash:输出一些当前DAG的信息,可以与DAG中其他Task并行init_env_conf:初始化环境参数信息save_env_conf:保存当前环境参数信息到数据库run_init_common_data:提交一个Spark任务,该任务基于当前环境参数运行

很显然,重要的是save_env_conf这个Task,只有该Task成功执行完毕,当前DAG中的Task和其他DAG才能执行

from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.python import PythonOperatordefault_args = {'owner': 'example','email': ['example@'],'email_on_failure': True,'email_on_retry': True,'retries': 1,'retry_delay': timedelta(minutes=2),'start_date': datetime(, 12, 6),}common_dag = DAG(dag_id='mon.1d',default_args=default_args,description='init common data',schedule_interval='0 1 * * *',)run_info_bash = BashOperator(task_id='info_bash',bash_command='python3 -V', # 输出Python环境信息dag= common_dag,)run_init_env_conf = PythonOperator(task_id="init_env_conf",python_callable=init_env_conf, # 调用Python方法,初始化环境参数dag= common_dag,)run_save_env_conf = PythonOperator(task_id='save_env_conf',python_callable=save_env_conf, # 调用Python方法,保存环境参数到数据库dag=common_dag,)run_init_common_data = BashOperator(task_id='init_common_data',bash_command='spark-submit .....', # 提交spark任务dag= common_dag,)run_info_bashrun_init_env >> run_save_env >> run_init_common_data

handle.data.1d

在该DAG中有四个Task: run_info_bash:同上,输出DAG运行信息monitor_common_dag_save_env_conf:嗅探外部Task的Sensor,这里为嗅探即等待mon.1dDAG中的save_env_conf Taskrun_read_env_conf:读取环境信息run_handle_data:处理数据

from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.python import PythonOperatorfrom airflow.sensors.external_task import ExternalTaskSensordefault_args = {'owner': 'example','email': ['example@'],'email_on_failure': True,'email_on_retry': True,'retries': 1,'retry_delay': timedelta(minutes=2),'start_date': datetime(, 12, 6),}handle_data_dag = DAG(dag_id='handle.data.1d',default_args=default_args,description='Handle Data',schedule_interval='5 1 * * *')run_info_bash = BashOperator(task_id='info_bash',bash_command='python3 -V', # 输出Python环境信息dag=handle_data_dag,)# 这里实例化一个ExterTaskSensormonitor_common_dag_save_env_conf = ExternalTaskSensor(task_id='monitor_common_dag_save_env_conf',external_dag_id='mon.1d', # 需要等待的外部DAG idexternal_task_id='save_env_conf', # 需要等待的外部Task idexecution_delta=timedelta(minutes=5), # 执行时间差,这里指定5分钟,那么当前ExternalTaskSensor会基于当前执行时间(1:05)往前倒5分钟(1:00)寻找在这个时间点已经成功执行完毕的**mon.1d**的save_env_conf## 假如「**mon.1d**」的执行规则是「10 1 * * *」也就是每天凌晨1点10分,## 那么这里可以使用「execution_date_fn」,让当前DAG等待至1点10分,## 直到「**mon.1d**」的「save_env_conf」成功执行完# execution_date_fn=lambda dt: dt + timedelta(minutes=5),timeout=600, # 超时时间,如果等待了600秒还未符合期望状态的外部Task,那么抛出异常进入重试allowed_states=['success'], # Task允许的状态,这里只允许外部Task执行状态为'success'mode='reschedule', # reschedule模式,在等待的时候,两次检查期间会sleep当前Task,节约系统开销check_existence=True,# 校验外部Task是否存在,不存在立马结束等待dag=handle_data_dag,)run_read_env_conf =PythonOperator(task_id="read_env_conf",python_callable=read_env_conf, # 读取环境信息dag=handle_data_dag,)run_handle_data = BashOperator(task_id='danhle_data',bash_command='spark-submit .....',dag=handle_data_dag,)run_info_bashmonitor_common_dag_save_env_conf >> run_read_env_conf >> run_handle_data

需要注意的地方

execution_deltaexecution_date_fn关心的是DAG的Execution_Date,也就是DAG的「schedule_interval」所指定的执行时间,至于这个DAG什么时候执行完,execution_deltaexecution_date_fn并不关心。其实通过Airflow的Tree View可以看出来,一次DAG的运行即是一次记录,如图。而DAG的运行成功与否的状态都会记录在此次运行记录中,execution_delta和execution_date_fn关心的便是图中框出来的两个地方

在Airflow中schedule_interval非常重要,因为在跨DAG依赖的场景中,这关乎到DAG能否正常成功运行,execution_deltaexecution_date_fn两个参数都依赖于DAG的执行策略,所以,DAGs能否正常高效的运行,需要合理安排每个DAG的schedule_interval

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。