@task.branch(task_id="branch_task")
def branch_func(ti):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "big_task" # run just this one task, skip all else
elif xcom_value >= 3:
return ["small_task", "warn_task"] # run these, skip all else
else:
return None # skip everything
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from plugins.airflow_utils import default_args, kubernetes_pod_task
# callable for pre_execute arg
def skip_if_specified(context):
task_id = context['task'].task_id
conf = context['dag_run'].conf or {}
skip_tasks = conf.get('skip_task', [])
if task_id in skip_tasks:
raise AirflowSkipException()
# these are necessary to make this solution work
support_task_skip_args = {'pre_execute': skip_if_specified,
'trigger_rule': 'all_done'}
extended_args = {**default_args, **support_task_skip_args}
dag_name = 'optional_task_skip'
dag = DAG(dag_name,
max_active_runs=3,
schedule_interval=None,
catchup=False,
default_args=extended_args)
# select endpoints and modes
# !! make sure the dict items are in the same order as the order you want them to run !!
task_options = {
'option_name_1':
{'param': 'fetch-users', 'enabled': True, 'catchup': False},
'option_name_2':
{'param': 'fetch-jobs', 'enabled': True},
'option_name_3':
{'param': 'fetch-schedules', 'enabled': True, 'catchup': True},
'option_name_4':
{'param': 'fetch-messages', 'enabled': True, 'catchup': False},
'option_name_5':
{'param': 'fetch-holidays', 'enabled': True, 'catchup': False},
}
def add_tasks():
task_list_ = []
for task_name_, task_config_ in task_options.items():
if task_config_['enabled']:
parameter_ = task_config_['param']
catchup_ = '-c ' if task_config_.get('catchup') else ''
task_list_.append(
kubernetes_pod_task(
dag=dag,
command=f"cd people_data; python3 get_people_data.py {parameter_} {catchup_}",
task_id=f"{task_name_}"))
if len(task_list_) > 1:
task_list_[-2] >> task_list_[-1]
else:
# the callable that throws the skip signal
def skip_task(): raise AirflowSkipException()
task_list_.append(
PythonOperator(dag=dag,
python_callable=skip_task,
task_id=f"{task_name_}",
)
)
if len(task_list_) > 1:
task_list_[-2] >> task_list_[-1]
# populate the DAG
add_tasks()
注:
default_args, kubernetes_pod_task只是方便的包装。
Kubernetes pod 任务在一个简单的函数中注入一些变量和秘密,并使用 from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator模块,我不会也不能与您分享这些。