如何在 Airflow 中创建条件任务

我想在 Airflow 中创建一个条件任务,如下面的模式所述。预期情况如下:

  • 任务1执行
  • 如果 Task1成功,则执行 Task2a
  • 否则,如果 Task1失败,则执行 Task2b
  • 最后执行 Task 3

Conditional Task 以上所有任务都是 SSHExecuteOperator。 我猜我应该使用 ShortCircuitOperator 和/或 XCom 来管理条件,但我不清楚如何实现它。你能描述一下解决方案吗?

81035 次浏览

你必须使用 气流触发规则气流触发规则

所有运算符都有一个 touch _ rule 参数,该参数定义触发生成任务的规则。

触发规则的可能性:

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'

以下是解决你问题的方法:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook


sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)


task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)


task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)


task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)


task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)


task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)




task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)

气流2. x

Airflow 提供了一个 分支装饰工程师,它允许您返回应该运行的 task _ id (或 task _ ids 列表) :

@task.branch(task_id="branch_task")
def branch_func(ti):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "big_task" # run just this one task, skip all else
elif xcom_value >= 3:
return ["small_task", "warn_task"] # run these, skip all else
else:
return None # skip everything

您也可以直接从重写 choose_branch方法的 BaseBranchOperator继承,但是对于简单的分支逻辑,修饰符是最好的。

气流1. x

Airflow 有一个 BranchPython 操作符,可以用来更直接地表示分支依赖关系。

医生描述了它的用途:

BranchPythonOperator 与 PythonOperator 非常相似,只是它需要一个返回 task _ id 的 python _ callable。随后执行返回的 task _ id,并跳过所有其他路径。Python 函数返回的 task _ id 必须引用 BranchPythonOperator 任务直接下游的任务。

如果您想跳过某些任务,请记住您不能有一个空路径,如果是这样,请创建一个虚拟任务。

代码示例

def dummy_test():
return 'branch_a'


A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)


branch_task = BranchPythonOperator(
task_id='branching',
python_callable=dummy_test,
dag=dag,
)


branch_task >> A_task
branch_task >> B_task

如果您正在安装 Airflow version > = 1.10.3,您还可以返回一个任务 ID 列表,允许您在一个操作符中跳过多个下游路径,并且在加入之前不必使用虚拟任务。

让我补充一下我的看法。

首先,对于这篇冗长的文章我很抱歉,但是我想分享一下对我有用的完整的解决方案。

背景

我们有一个脚本,从一个非常糟糕和缓慢的 API 中提取数据。 它很慢,所以我们需要选择我们做什么和我们不拉从它(1个请求/秒,超过75万个请求要做) 有时候需求会发生变化,迫使我们将数据完整地提取出来,但是只针对一个或一些端点。所以我们需要能控制的东西。

严格的1个请求/秒的速率限制,如果超过,会延迟几秒钟,这将停止所有并行任务。

'catchup': True的含义本质上是一个回填,它被转换成一个命令行选项(-c)。

我们的任务之间没有数据依赖关系,我们只需要遵循(某些)任务的顺序。

解决方案

通过引入带有额外 DAG 配置的 pre _ execute 可调用性,可以正确地跳过抛出 AirflowSkipException的任务。

其次,基于该配置,我们可以将原始运算符替换为具有相同名称和简单定义的简单 Python 运算符。 这样 UI 就不会混淆,触发器历史也会保持完整——显示跳过任务时的执行情况。

from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator


from plugins.airflow_utils import default_args, kubernetes_pod_task




# callable for pre_execute arg
def skip_if_specified(context):
task_id = context['task'].task_id
conf = context['dag_run'].conf or {}
skip_tasks = conf.get('skip_task', [])
if task_id in skip_tasks:
raise AirflowSkipException()


# these are necessary to make this solution work
support_task_skip_args = {'pre_execute': skip_if_specified,
'trigger_rule': 'all_done'}
extended_args = {**default_args, **support_task_skip_args}


dag_name = 'optional_task_skip'


dag = DAG(dag_name,
max_active_runs=3,
schedule_interval=None,
catchup=False,
default_args=extended_args)


# select endpoints and modes
# !! make sure the dict items are in the same order as the order you want them to run !!
task_options = {
'option_name_1':
{'param': 'fetch-users', 'enabled': True, 'catchup': False},
'option_name_2':
{'param': 'fetch-jobs', 'enabled': True},
'option_name_3':
{'param': 'fetch-schedules', 'enabled': True, 'catchup': True},
'option_name_4':
{'param': 'fetch-messages', 'enabled': True, 'catchup': False},
'option_name_5':
{'param': 'fetch-holidays', 'enabled': True, 'catchup': False},
}




def add_tasks():
task_list_ = []
for task_name_, task_config_ in task_options.items():
if task_config_['enabled']:
parameter_ = task_config_['param']
catchup_ = '-c ' if task_config_.get('catchup') else ''
task_list_.append(
kubernetes_pod_task(
dag=dag,
command=f"cd people_data; python3 get_people_data.py {parameter_} {catchup_}",
task_id=f"{task_name_}"))
if len(task_list_) > 1:
task_list_[-2] >> task_list_[-1]
else:
# the callable that throws the skip signal
def skip_task(): raise AirflowSkipException()


task_list_.append(
PythonOperator(dag=dag,
python_callable=skip_task,
task_id=f"{task_name_}",
)
)
if len(task_list_) > 1:
task_list_[-2] >> task_list_[-1]




# populate the DAG
add_tasks()


注: default_args, kubernetes_pod_task只是方便的包装。 Kubernetes pod 任务在一个简单的函数中注入一些变量和秘密,并使用 from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator模块,我不会也不能与您分享这些。

解决方案延伸了这位先生的伟大思想:
Https://www.youtube.com/watch?v=ablgyapcbw0

不过,这个解决方案也适用于 Kubernetes 运算符。

当然,这可以得到改进,而且您完全可以扩展或重新编写代码来解析手动触发器配置(如视频中所示)。

在我的用户界面中是这样的: enter image description here

(它没有反映上面的示例配置,而是反映了登台基础设施中的实际运行情况)