在 Airflow 中创建动态工作流的正确方法

问题

有没有办法在 Airflow 中创建一个工作流,使得任务 B 的数量在任务 A 完成之前是未知的?我已经看到了子标记,但它看起来只能与一组静态任务一起工作,必须在创建 Dag 时确定。

如果是的话,请举个例子。

我有一个问题,它是不可能知道的任务 B 的数量,将需要计算任务 C,直到任务 A 已经完成。每个任务 B * 将需要几个小时来计算,不能合并。

              |---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
|       ....     |
|---> Task B.N --|

想法一

我不喜欢这个解决方案,因为我必须创建一个阻塞的 ExternalTaskSensor,并且所有的 Task B * 都需要2-24小时才能完成。所以我不认为这是一个可行的解决方案。肯定有更简单的办法吧?或者 Airflow 不是为此而设计的?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C


Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
|-- Task B.1 --|
|-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
|     ....     |
|-- Task B.N --|

编辑1:

到目前为止,这个问题仍然没有一个很好的答案。有几个人联系我想找到解决办法。

93914 次浏览

下面是我如何处理一个类似的请求,没有任何子标记:

首先创建一个方法,该方法返回您想要的任何值

def values_function():
return values

接下来创建动态生成作业的方法:

def group(number, **kwargs):
#load the values if needed in the command you plan to execute
dyn_value = "\{\{ task_instance.xcom_pull(task_ids='push_func') }}"
return BashOperator(
task_id='JOB_NAME_{}'.format(number),
bash_command='script.sh {} {}'.format(dyn_value, number),
dag=dag)

然后把它们结合起来:

push_func = PythonOperator(
task_id='push_func',
provide_context=True,
python_callable=values_function,
dag=dag)


complete = DummyOperator(
task_id='All_jobs_completed',
dag=dag)


for i in values_function():
push_func >> group(i) >> complete

OA: “在 Airflow 中有没有什么方法可以创建一个工作流,使得任务 B 的数量在任务 A 完成之前是未知的?”

简短的回答是否定的。在开始运行 DAG 流之前,Airflow 将构建 DAG 流。

也就是说,我们得出了一个简单的结论,那就是我们没有这样的需要。 当您想要并行处理某些工作时,您应该评估可用的资源,而不是要处理的项目数量。

我们是这样做的: 我们动态地生成一个固定数量的任务,比如说10个,这将分割工作。例如,如果我们需要处理100个文件,每个任务将处理其中的10个。我今天晚些时候会发布代码。

更新

这是密码,抱歉耽搁了。

from datetime import datetime, timedelta


import airflow
from airflow.operators.dummy_operator import DummyOperator


args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 8),
'email': ['myemail@gmail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}


dag = airflow.DAG(
'parallel_tasks_v1',
schedule_interval="@daily",
catchup=False,
default_args=args)


# You can read this from variables
parallel_tasks_total_number = 10


start_task = DummyOperator(
task_id='start_task',
dag=dag
)




# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
return DummyOperator(
provide_context=True,
task_id='parallel_task_' + str(current_task_number),
python_callable=parallelTask,
# your task will take as input the total number and the current number to elaborate a chunk of total elements
op_args=[current_task_number, int(parallel_tasks_total_number)],
dag=dag)




end = DummyOperator(
task_id='end',
dag=dag)


for page in range(int(parallel_tasks_total_number)):
created_task = create_dynamic_task(page)
start_task >> created_task
created_task >> end

代码解释:

这里我们有一个单一的开始任务和一个单一的结束任务(都是虚拟的)。

然后,从 for 循环的 start 任务开始,创建10个具有相同 Python 可调用的任务。这些任务是在 create _ Dynamic _ task 函数中创建的。

对于每个 python 调用,我们将并行任务的总数和当前任务索引作为参数传递。

假设您有1000个项目需要详细说明: 第一个任务将在输入中接收到它应该详细说明10个块中的第一个块。它将把1000个项目分成10个部分,并详细说明第一个部分。

我发现这个 中柱和这个问题非常相似。然而,它是充满了错误,并不工作时,我试图实现它。

我对上述问题的回答如下:

如果你正在动态创建任务,你必须这样做 通过迭代不是由上游任务创建的,或者可以独立于该任务定义的东西。我了解到,你不能传递执行日期或其他气流变量的东西以外的模板(例如,任务) ,许多人已经指出了以前。参见 这篇文章

我已经找到了一种基于以前任务的结果来创建工作流的方法。
基本上你要做的就是用以下两个子标签:

  1. Xcom 在第一个执行的子日志中推送一个列表(或者稍后创建动态工作流所需的任何东西)(参见 test1.py def return_list())
  2. 将 main dag 对象作为参数传递给第二个子 dag
  3. 现在,如果您有 main dag 对象,您可以使用它来获取其任务实例的列表。从任务实例列表中,您可以使用 parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]过滤掉当前运行的任务) ,您可以在这里添加更多过滤器。
  4. 对于这个任务实例,可以使用 xcom pull 通过将 dag _ id 指定到第一个子 dag 之一: dag_id='%s.%s' % (parent_dag_name, 'test1')来获取所需的值
  5. 使用列表/值动态创建任务

现在我已经测试了这在我的本地气流安装,它的工作正常。我不知道如果同时运行多个 dag 实例,xcom pull 部分是否会出现问题,但是您可能会使用唯一键或类似的东西来唯一标识所需的 xcom 值。 人们可能会优化3。第一步是100% 确保获得当前主日志的特定任务,但是对于我来说,这个任务执行得足够好,我认为只需要一个 task _ instance 对象就可以使用 xcom _ pull。

另外,我在每次执行之前清理第一个 subdag 的 xcoms,只是为了确保不会意外地得到任何错误的值。

我很不擅长解释,所以我希望下面的代码能让一切都清楚:

Test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator


log = logging.getLogger(__name__)




def test1(parent_dag_name, start_date, schedule_interval):
dag = DAG(
'%s.test1' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date,
)


def return_list():
return ['test1', 'test2']


list_extract_folder = PythonOperator(
task_id='list',
dag=dag,
python_callable=return_list
)


clean_xcoms = PostgresOperator(
task_id='clean_xcoms',
postgres_conn_id='airflow_db',
sql="delete from xcom where dag_id='\{\{ dag.dag_id }}'",
dag=dag)


clean_xcoms >> list_extract_folder


return dag

Test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator


log = logging.getLogger(__name__)




def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
dag = DAG(
'%s.test2' % parent_dag_name,
schedule_interval=schedule_interval,
start_date=start_date
)


if len(parent_dag.get_active_runs()) > 0:
test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
dag_id='%s.%s' % (parent_dag_name, 'test1'),
task_ids='list')
if test_list:
for i in test_list:
test = DummyOperator(
task_id=i,
dag=dag
)


return dag

以及主要工作流程:

Test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2


DAG_NAME = 'test-dag'


dag = DAG(DAG_NAME,
description='Test workflow',
catchup=False,
schedule_interval='0 0 * * *',
start_date=datetime(2018, 8, 24))


test1 = SubDagOperator(
subdag=test1(DAG_NAME,
dag.start_date,
dag.schedule_interval),
task_id='test1',
dag=dag
)


test2 = SubDagOperator(
subdag=test2(DAG_NAME,
dag.start_date,
dag.schedule_interval,
parent_dag=dag),
task_id='test2',
dag=dag
)


test1 >> test2

我想我在 https://github.com/mastak/airflow_multi_dagrun中找到了一个更好的解决方案,它通过触发多个 dagrun (类似于 TriggerDagRuns)来使用 DagRuns 的简单排队。大部分的学分归于 https://github.com/mastak,虽然我不得不修补 一些细节使它与最近的气流工作。

解决方案使用 触发多个 DagRuns 的自定义运算符:

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone




class TriggerMultiDagRunOperator(TriggerDagRunOperator):
CREATED_DAGRUN_KEY = 'created_dagrun_key'


@apply_defaults
def __init__(self, op_args=None, op_kwargs=None,
*args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}


def execute(self, context):


context.update(self.op_kwargs)
session = settings.Session()
created_dr_ids = []
for dro in self.python_callable(*self.op_args, **context):
if not dro:
break
if not isinstance(dro, DagRunOrder):
dro = DagRunOrder(payload=dro)


now = timezone.utcnow()
if dro.run_id is None:
dro.run_id = 'trig__' + now.isoformat()


dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
execution_date=now,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True,
)
created_dr_ids.append(dr.id)
self.log.info("Created DagRun %s, %s", dr, now)


if created_dr_ids:
session.commit()
context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
else:
self.log.info("No DagRun created")
session.close()

然后,您可以从 Python Operator 中的可调用函数提交几个 dagrun,例如:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago




def generate_dag_run(**kwargs):
for i in range(10):
order = DagRunOrder(payload={'my_variable': i})
yield order


args = {
'start_date': days_ago(1),
'owner': 'airflow',
}


dag = DAG(
dag_id='simple_trigger',
max_active_runs=1,
schedule_interval='@hourly',
default_args=args,
)


gen_target_dag_run = TriggerMultiDagRunOperator(
task_id='gen_target_dag_run',
dag=dag,
trigger_dag_id='common_target',
python_callable=generate_dag_run
)

我用 https://github.com/flinz/airflow_multi_dagrun的代码创建了一个 fork

是的,这是可能的,我已经创建了一个示例 DAG 来演示这一点。

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator


main_dag_id = 'DynamicWorkflow2'


args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}


dag = DAG(
main_dag_id,
schedule_interval="@once",
default_args=args)




def start(*args, **kwargs):


value = Variable.get("DynamicWorkflow_Group1")
logging.info("Current DynamicWorkflow_Group1 value is " + str(value))




def resetTasksStatus(task_id, execution_date):
logging.info("Resetting: " + task_id + " " + execution_date)


dag_folder = conf.get('core', 'DAGS_FOLDER')
dagbag = DagBag(dag_folder)
check_dag = dagbag.dags[main_dag_id]
session = settings.Session()


my_task = check_dag.get_task(task_id)
ti = TaskInstance(my_task, execution_date)
state = ti.current_state()
logging.info("Current state of " + task_id + " is " + str(state))
ti.set_state(None, session)
state = ti.current_state()
logging.info("Updated state of " + task_id + " is " + str(state))




def bridge1(*args, **kwargs):


# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 2


variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))


logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))


variableValue = Variable.get("DynamicWorkflow_Group2")
logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))


# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))




def bridge2(*args, **kwargs):


# You can set this value dynamically e.g., from a database or a calculation
dynamicValue = 3


variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))


logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))


variableValue = Variable.get("DynamicWorkflow_Group3")
logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))


# Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
for i in range(dynamicValue):
resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))




def end(*args, **kwargs):
logging.info("Ending")




def doSomeWork(name, index, *args, **kwargs):
# Do whatever work you need to do
# Here I will just create a new file
os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')




starting_task = PythonOperator(
task_id='start',
dag=dag,
provide_context=True,
python_callable=start,
op_args=[])


# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
task_id='bridge1',
dag=dag,
provide_context=True,
python_callable=bridge1,
op_args=[])


DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))


for index in range(int(DynamicWorkflow_Group1)):
dynamicTask = PythonOperator(
task_id='firstGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['firstGroup', index])


starting_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge1_task)


# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
task_id='bridge2',
dag=dag,
provide_context=True,
python_callable=bridge2,
op_args=[])


DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))


for index in range(int(DynamicWorkflow_Group2)):
dynamicTask = PythonOperator(
task_id='secondGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['secondGroup', index])


bridge1_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(bridge2_task)


ending_task = PythonOperator(
task_id='end',
dag=dag,
provide_context=True,
python_callable=end,
op_args=[])


DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))


for index in range(int(DynamicWorkflow_Group3)):


# You can make this logic anything you'd like
# I chose to use the PythonOperator for all tasks
# except the last task will use the BashOperator
if index < (int(DynamicWorkflow_Group3) - 1):
dynamicTask = PythonOperator(
task_id='thirdGroup_' + str(index),
dag=dag,
provide_context=True,
python_callable=doSomeWork,
op_args=['thirdGroup', index])
else:
dynamicTask = BashOperator(
task_id='thirdGroup_' + str(index),
bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
dag=dag)


bridge2_task.set_downstream(dynamicTask)
dynamicTask.set_downstream(ending_task)


# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)

在运行 DAG 之前,创建这三个气流变量

airflow variables --set DynamicWorkflow_Group1 1


airflow variables --set DynamicWorkflow_Group2 0


airflow variables --set DynamicWorkflow_Group3 0

你会看到 DAG 从这里开始

enter image description here

在它运行之后

enter image description here

在我关于创建 气流动态工作流的文章中,您可以看到关于这个 DAG 的更多信息。

作业图不是在运行时生成的。相反,该图形是在 Airflow 从 dags 文件夹中提取时构建的。因此,不可能在作业每次运行时都有不同的图表。可以在 装弹时配置作业以根据查询生成图。在此之后的每次运行中,该图都将保持不变,这可能并不十分有用。

可以使用 Branch Operator 设计一个图,该图根据查询结果在每次运行时执行不同的任务。

我所做的是预先配置一组任务,然后获取查询结果并将其分布到各个任务中。无论如何,这可能更好,因为如果查询返回大量结果,您可能不想让调度程序充斥大量并发任务。为了更加安全,我还使用了一个池来确保我的并发性不会因为意外的大查询而失控。

"""
- This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime


from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask


########################################################################


default_args = {
'owner': 'airflow',
'catchup': False,
'depends_on_past': False,
'start_date': datetime(2019, 7, 2, 19, 50, 00),
'email': ['rotten@stackoverflow'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'max_active_runs': 1
}


dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)


totalBuckets = 5


get_orders_query = """
select
o.id,
o.customer
from
orders o
where
o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
and
o.is_test = false
and
o.is_processed = false
"""


###########################################################################################################


# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
return PythonOperator(
task_id=f'order_processing_task_{bucket_number}',
python_callable=runOrderProcessing,
pool='order_processing_pool',
op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
provide_context=True,
dag=dag
)




# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)


if orderList is not None:
for order in orderList:
logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
doStuff(**op_kwargs)




# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')


# initialize the task list buckets
tasks = {}
for task_number in range(0, totalBuckets):
tasks[f'order_processing_task_{task_number}'] = []


# populate the task list buckets
# distribute them evenly across the set of buckets
resultCounter = 0
for record in myDatabaseHook.get_records(get_orders_query):


resultCounter += 1
bucket = (resultCounter % totalBuckets)


tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})


# push the order lists into xcom
for task in tasks:
if len(tasks[task]) > 0:
logging.info(f'Task {task} has {len(tasks[task])} orders.')
context['ti'].xcom_push(key=task, value=tasks[task])
else:
# if we didn't have enough tasks for every bucket
# don't bother running that task - remove it from the list
logging.info(f"Task {task} doesn't have any orders.")
del(tasks[task])


return list(tasks.keys())


###################################################################################################




# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
task_id='clean_xcoms',
mysql_conn_id='airflow_db',
sql="delete from xcom where dag_id='\{\{ dag.dag_id }}'",
dag=dag)




# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
task_id='get_orders',
python_callable=getOpenOrders,
provide_context=True,
dag=dag
)
get_orders_task.set_upstream(clean_xcoms)


# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
taskBucket = createOrderProcessingTask(bucketNumber)
taskBucket.set_upstream(get_orders_task)




###################################################################################################

我认为您正在寻找的是创建 DAG 动态 我遇到这种情况几天前在一些搜索后,我发现这个 博客

动态任务生成

start = DummyOperator(
task_id='start',
dag=dag
)


end = DummyOperator(
task_id='end',
dag=dag)


def createDynamicETL(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task

设置 DAG 工作流

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# Use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)


# Extract table names and fields to be processed
tables = configFile['tables']


# In this loop tasks are created for each table defined in the YAML file
for table in tables:
for table, fieldName in table.items():
# In our example, first step in the workflow for each table is to get SQL data from db.
# Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
'getSQLData',
{'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
'dbname': configFile['dbname']})


# Second step is upload data to s3
upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
'uploadDataToS3',
{'previous_task_id': '{}-getSQLData'.format(table),
'bucket_name': configFile['bucket_name'],
'prefix': configFile['prefix']})


# This is where the magic lies. The idea is that
# once tasks are generated they should linked with the
# dummy operators generated in the start and end tasks.
# Then you are done!
start >> get_sql_data_task
get_sql_data_task >> upload_to_s3_task
upload_to_s3_task >> end

这是我们的 DAG 代码放在一起后的样子 enter image description here

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator


start = DummyOperator(
task_id='start',
dag=dag
)




def createDynamicETL(task_id, callableFunction, args):
task = PythonOperator(
task_id=task_id,
provide_context=True,
# Eval is used since the callableFunction var is of type string
# while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable=eval(callableFunction),
op_kwargs=args,
xcom_push=True,
dag=dag,
)
return task




end = DummyOperator(
task_id='end',
dag=dag)


with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)


# Extract table names and fields to be processed
tables = configFile['tables']


# In this loop tasks are created for each table defined in the YAML file
for table in tables:
for table, fieldName in table.items():
# In our example, first step in the workflow for each table is to get SQL data from db.
# Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
'getSQLData',
{'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
'dbname': configFile['dbname']})


# Second step is upload data to s3
upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
'uploadDataToS3',
{'previous_task_id': '{}-getSQLData'.format(table),
'bucket_name': configFile['bucket_name'],
'prefix': configFile['prefix']})


# This is where the magic lies. The idea is that
# once tasks are generated they should linked with the
# dummy operators generated in the start and end tasks.
# Then you are done!
start >> get_sql_data_task
get_sql_data_task >> upload_to_s3_task
upload_to_s3_task >> end

非常有帮助 希望它也能帮助别人

不明白问题是什么?

这里 是一个标准示例。 现在,如果在函数 亚达格中用 for i in range(random.randint(0, 10)):代替 for i in range(5):,那么一切都会正常工作。 现在假设操作符‘ start’将数据放入一个文件中,函数将读取这个数据,而不是随机值。然后操作符“开始”将影响任务的数量。

问题只会出现在 UI 中的显示中,因为当输入子日志时,任务的数量将等于目前从文件/database/XCom 中读取的最后一次数量。 它会自动限制一次发射多个 dag。

根据上下文的不同,这可以以异步批处理工作者样式实现。“动态任务”可以被视为要完成的工作项列表,并分解为发布到外部消息代理队列中的异步消息,供工作节点拾取。

一个任务动态地生成“工作”,并将所有项(我们事先不知道有多少项,甚至不知道具体是哪些项)发布到主题/队列中。

工作人员从队列中消费“工作任务”。可以直接使用外部气流技术实现,也可以作为气流传感器任务(可能在单独的 DAG 中)。当他们完成他们的任务,气流传感器被触发,执行流程继续。

要还原单个工作项的流,可以考虑使用 EIP Claim Check 模式。

范式转换

根据这里的所有答案,在我看来 最好的方法不是把动态的“工作列表”生成代码看作是一个初始任务,而是看作是一个预 DAG 定义计算。

当然,这是假设只有一次初始计算,并且在每次 DAG 运行开始时(正如 OP 所描述的)。如果有些半途而废的任务必须重新定义 DAG,那么这种方法就不会起作用,因为气流似乎并不适合这种模式。但是,可以考虑链接控制器/目标 DAG (见下文)。

代码示例:

from airflow.decorators import dag, task
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago


DEFAULT_ARGS = {"owner": "airflow"}




def get_list_of_things(connection_id):
list_all_the_things_sql = """
SELECT * FROM things
"""
pg_hook = PostgresHook(postgres_conn_id=connection_id)
connection = pg_hook.get_conn()
cursor = connection.cursor()
cursor.execute(list_all_the_things_sql)  # NOTE: this will execute to build the DAG, so if you grock the code, expect the DAG not to load, unless you have a valid postgres DB with a table named "things" and with things in it.
res = cursor.fetchall()
return res




@dag(default_args=DEFAULT_ARGS, schedule_interval="@once", start_date=days_ago(2), dag_id='test_joey_dag')
def dynamicly_generated_dag():
connection_id = "ProdDB"


@task
def do_a_thing(row):
print(row)
return row


start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
    



data_list = get_list_of_things(connection_id)
for row in data_list:
start >> do_a_thing(row) >> end




dag = dynamicly_generated_dag()

如果 get_list_of_things()计算时间很长,那么预先计算它并用控制器/目标模式在外部触发这个 DAG 可能是明智的:
Trigger _ controller _ dag
Trigger _ target _ dag

答得好

太过了? 不管怎样。

很多其他的答案都是有点方钉圆孔。添加复杂的新操作符,滥用内置变量,或者在某种程度上未能回答这个问题。我对它们中的任何一个都不是特别满意,因为它们要么在通过 web UI 查看时隐藏自己的行为,要么容易崩溃,要么需要大量自定义代码(这也容易崩溃)。

这个解决方案使用内置的功能,不需要新的操作符和有限的额外代码,DAGs 可见通过 UI 没有任何技巧,并遵循气流最佳实践(见 无能为力)。

这个问题的解决方案相当复杂,所以我把它分成几个部分。这些是:

  • 如何安全地触发动态数量的任务
  • 如何等待所有这些任务完成,然后调用最终任务
  • 如何将其集成到任务管道中
  • 限制(没有什么是完美的)

一个任务可以触发动态数量的其他任务吗?

是的。算是吧。不需要编写任何新的运算符,只需使用内置运算符,就可以让 DAG 触发动态数量的其他 DAG。然后可以将其扩展为具有依赖于其他 DAG 的动态数量的 DAG (参见等待任务完成)。这类似于 Flinz 的解决方案,但是更加健壮,并且定制代码更少。

这是通过使用一个 BranchPythonOperator 来完成的,该操作有选择地触发另外两个 TriggerDagRunOperator。其中一个递归地重新调用当前 DAG,另一个调用外部 DAG,即目标函数。

递归 _ dag.py 的顶部给出了一个可用于触发 dag 的示例配置。

Print _ config.py (要触发的 DAG 示例)

from datetime import timedelta


from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


def print_output(dag_run):
dag_conf = dag_run.conf
if 'output' in dag_conf:
output = dag_conf['output']
else:
output = 'no output found'
print(output)


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}


with DAG(
'print_output',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
print_output = PythonOperator(
task_id='print_output_task',
python_callable=print_output
)

Py (奇迹发生的地方)

"""
DAG that can be used to trigger multiple other dags.
For example, trigger with the following config:
{
"task_list": ["print_output","print_output"],
"conf_list": [
{
"output": "Hello"
},
{
"output": "world!"
}
]
}
"""


from datetime import timedelta
import json


from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}


dag_id = 'branch_recursive'
branch_id = 'branch_operator'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = repeat_task_id + '_conf'
next_task_id = 'next_dag_operator'
next_task_conf = next_task_id + '_conf'


def choose_branch(task_instance, dag_run):
dag_conf = dag_run.conf
task_list = dag_conf['task_list']
next_task = task_list[0]
later_tasks = task_list[1:]
conf_list = dag_conf['conf_list']
# dump to string because value is stringified into
# template string, is then parsed.
next_conf = json.dumps(conf_list[0])
later_confs = conf_list[1:]


task_instance.xcom_push(key=next_task_id, value=next_task)
task_instance.xcom_push(key=next_task_conf, value=next_conf)


if later_tasks:
repeat_conf = json.dumps({
'task_list': later_tasks,
'conf_list': later_confs
})


task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
return [next_task_id, repeat_task_id]


return next_task_id


def add_braces(in_string):
return '\{\{' + in_string + '}}'


def make_templated_pull(key):
pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{branch_id}\')'
return add_braces(pull)


with DAG(
dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_branch
)


trigger_next = TriggerDagRunOperator(
task_id=next_task_id,
trigger_dag_id=make_templated_pull(next_task_id),
conf=make_templated_pull(next_task_conf)
)


trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=dag_id,
conf=make_templated_pull(repeat_task_conf)
)


branch >> [trigger_next, trigger_repeat]

此解决方案的优点是使用非常有限的自定义代码。Flinz 的解决方案可能会部分失败,导致一些计划任务和其他没有。然后在重试时,DAGS 可能被安排运行两次,或者在第一个日期失败,导致失败的任务完成部分工作。这种方法将告诉您哪些 DAGs 未能触发,并且只重试未能触发的 DAGs。因此这个方法是幂等的,另一个不是。

一个 DAG 是否可以依赖于其他 DAGS 的动态数量?

是的,但是... 如果任务不能并行运行,这很容易做到。并行运行更加复杂。

为了按顺序运行,重要的改变是在 trigger_next中使用 wait_for_completion=True,在“ touch _ next”之前使用 python 操作符设置 xcom 值,并添加一个分支操作符来启用或禁用重复任务,然后具有线性依赖性

setup_xcom >> trigger_next >> branch >> trigger_repeat

为了并行运行,您可以类似地递归地链接几个使用模板化 external_dag_id值的 ExternalTaskSensor,以及与触发的日志运行相关联的时间戳。要获得触发的 dag 时间戳,可以使用触发 dag 的时间戳触发 dag。然后,这些传感器一个接一个地等待所有创建的 DAG 完成,然后触发最终 DAG。下面的代码中,这次我在打印输出 DAG 中添加了一个随机睡眠,这样等待 DAG 实际上可以执行一些等待。

注意: curse _ wait _ dag.py 现在定义了2个 dag,这两个都需要启用才能正常工作。

一个可以用来触发 dag 的示例配置出现在 curse _ wait _ dag. py 的顶部

Print _ config.py (修改为添加随机睡眠)

"""
Simple dag that prints the output in DAG config
Used to demo TriggerDagRunOperator (see recursive_dag.py)
"""


from datetime import timedelta
from time import sleep
from random import randint


from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


def print_output(dag_run):
sleep_time = randint(15,30)
print(f'sleeping for time: {sleep_time}')
sleep(sleep_time)
dag_conf = dag_run.conf
if 'output' in dag_conf:
output = dag_conf['output']
else:
output = 'no output found'
print(output)


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}


with DAG(
'print_output',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
print_output = PythonOperator(
task_id='print_output_task',
python_callable=print_output
)

Curse _ wait _ dag.py (这里发生了更多的奇迹)

"""
DAG that can be used to trigger multiple other dags,
waits for all dags to execute, then triggers a final dag.
For example, trigger the DAG 'recurse_then_wait' with the following config:
{
"final_task": "print_output",
"task_list": ["print_output","print_output"],
"conf_list": [
{
"output": "Hello"
},
{
"output": "world!"
}
]
}
"""




from datetime import timedelta
import json


from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils import timezone


from common import make_templated_pull


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}


def to_conf(id):
return f'{id}_conf'


def to_execution_date(id):
return f'{id}_execution_date'


def to_ts(id):
return f'{id}_ts'


recurse_dag_id = 'recurse_then_wait'
branch_id = 'recursive_branch'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = to_conf(repeat_task_id)
next_task_id = 'next_dag_operator'
next_task_conf = to_conf(next_task_id)
next_task_execution_date = to_execution_date(next_task_id)
end_task_id = 'end_task'
end_task_conf = to_conf(end_task_id)


wait_dag_id = 'wait_after_recurse'
choose_wait_id = 'choose_wait'
next_wait_id = 'next_wait'
next_wait_ts = to_ts(next_wait_id)


def choose_branch(task_instance, dag_run, ts):
dag_conf = dag_run.conf
task_list = dag_conf['task_list']
next_task = task_list[0]
# can't have multiple dag runs of same DAG with same timestamp
assert next_task != recurse_dag_id
later_tasks = task_list[1:]
conf_list = dag_conf['conf_list']
next_conf = json.dumps(conf_list[0])
later_confs = conf_list[1:]
triggered_tasks = dag_conf.get('triggered_tasks', []) + [(next_task, ts)]


task_instance.xcom_push(key=next_task_id, value=next_task)
task_instance.xcom_push(key=next_task_conf, value=next_conf)
task_instance.xcom_push(key=next_task_execution_date, value=ts)


if later_tasks:
repeat_conf = json.dumps({
'task_list': later_tasks,
'conf_list': later_confs,
'triggered_tasks': triggered_tasks,
'final_task': dag_conf['final_task']
})


task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
return [next_task_id, repeat_task_id]
    

end_conf = json.dumps({
'tasks_to_wait': triggered_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=end_task_conf, value=end_conf)


return [next_task_id, end_task_id]


def choose_wait_target(task_instance, dag_run):
dag_conf = dag_run.conf
tasks_to_wait = dag_conf['tasks_to_wait']
next_task, next_ts = tasks_to_wait[0]
later_tasks = tasks_to_wait[1:]
task_instance.xcom_push(key=next_wait_id, value=next_task)
task_instance.xcom_push(key=next_wait_ts, value=next_ts)
    

if later_tasks:
repeat_conf = json.dumps({
'tasks_to_wait': later_tasks,
'final_task': dag_conf['final_task']
})
task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
        

def execution_date_fn(_, task_instance):
date_str = task_instance.xcom_pull(key=next_wait_ts, task_ids=choose_wait_id)
return timezone.parse(date_str)


def choose_wait_branch(task_instance, dag_run):
dag_conf = dag_run.conf
tasks_to_wait = dag_conf['tasks_to_wait']


if len(tasks_to_wait) == 1:
return end_task_id


return repeat_task_id


with DAG(
recurse_dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as recursive_dag:
branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_branch
)


trigger_next = TriggerDagRunOperator(
task_id=next_task_id,
trigger_dag_id=make_templated_pull(next_task_id, branch_id),
execution_date=make_templated_pull(next_task_execution_date, branch_id),
conf=make_templated_pull(next_task_conf, branch_id)
)


trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=recurse_dag_id,
conf=make_templated_pull(repeat_task_conf, branch_id)
)


trigger_end = TriggerDagRunOperator(
task_id=end_task_id,
trigger_dag_id=wait_dag_id,
conf=make_templated_pull(end_task_conf, branch_id)
)


branch >> [trigger_next, trigger_repeat, trigger_end]


with DAG(
wait_dag_id,
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as wait_dag:
py_operator = PythonOperator(
task_id=choose_wait_id,
python_callable=choose_wait_target
)


sensor = ExternalTaskSensor(
task_id='do_wait',
external_dag_id=make_templated_pull(next_wait_id, choose_wait_id),
execution_date_fn=execution_date_fn
)


branch = BranchPythonOperator(
task_id=branch_id,
python_callable=choose_wait_branch
)


trigger_repeat = TriggerDagRunOperator(
task_id=repeat_task_id,
trigger_dag_id=wait_dag_id,
conf=make_templated_pull(repeat_task_conf, choose_wait_id)
)


trigger_end = TriggerDagRunOperator(
task_id=end_task_id,
trigger_dag_id='\{\{ dag_run.conf[\'final_task\'] }}'
)


py_operator >> sensor >> branch >> [trigger_repeat, trigger_end]

与代码集成

很好,但你真的想用这个。那么,你需要做什么?这个问题包括一个试图做以下事情的例子:

             |---> Task B.1 --|
|---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
|       ....     |
|---> Task B.N --|

为了实现问题目标(下面的实现例子) ,您需要将任务 A、 B 和 C 分成它们自己的 DAG。然后,在 DAG A 中,在末尾添加一个新的操作符,触发上面的 DAG“ curse _ Then _ wait”。向这个 DAG 传递一个配置,其中包括每个 B DAG 所需的配置,以及 B DAG id (这可以很容易地更改为使用不同的 DAG,去疯狂吧)。然后包含 DAG C 的名称,最后一个 DAG 将在最后运行。这个配置应该是这样的:

{
"final_task": "C_DAG",
"task_list": ["B_DAG","B_DAG"],
"conf_list": [
{
"b_number": 1,
"more_stuff": "goes_here"
},
{
"b_number": 2,
"foo": "bar"
}
]
}

实施时应该是这样的:

Trigger _ recurse. py

from datetime import timedelta
import json


from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


from recurse_wait_dag import recurse_dag_id


def add_braces(in_string):
return '\{\{' + in_string + '}}'


def make_templated_pull(key, task_id):
pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{task_id}\')'
return add_braces(pull)


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}


setup_trigger_conf_id = 'setup_trigger_conf'
trigger_conf_key = 'trigger_conf'


def setup_trigger_conf(task_instance):
trigger_conf = {
'final_task': 'print_output',
'task_list': ['print_output','print_output'],
'conf_list': [
{
'output': 'Hello'
},
{
'output': 'world!'
}
]
}


print('Triggering the following tasks')
for task, conf in zip(trigger_conf['task_list'], trigger_conf['conf_list']):
print(f'    task: {task} with config {json.dumps(conf)}')
print(f'then waiting for completion before triggering {trigger_conf["final_task"]}')


task_instance.xcom_push(key=trigger_conf_key, value=json.dumps(trigger_conf))


with DAG(
'trigger_recurse_example',
start_date=days_ago(2),
tags=['my_test'],
default_args=default_args,
description='A simple test DAG',
schedule_interval=None
) as dag:
py_operator = PythonOperator(
task_id=setup_trigger_conf_id,
python_callable=setup_trigger_conf
)


trigger_operator = TriggerDagRunOperator(
task_id='trigger_call_and_wait',
trigger_dag_id=recurse_dag_id,
conf=make_templated_pull(trigger_conf_key, setup_trigger_conf_id)
)


py_operator >> trigger_operator

所有这些最终看起来像下面这样,用垂直和水平线来显示一个 DAG 触发另一个 DAG 的位置:

A
|
Recurse - B.1
|
Recurse - B.2
|
...
|
Recurse - B.N
|
Wait for B.1
|
Wait for B.2
|
...
|
Wait for B.N
|
C

限制

任务在单个图表中不再可见。这可能是这种方法的最大问题。通过向所有相关的 DAGs 添加标记,DAGs 至少可以一起查看。然而,将 DAG B 的多个并行运行与 DAG A 的运行联系起来是很麻烦的。但是,由于单个 DAG 运行显示其输入配置,这意味着每个 DAG B 运行不依赖于 DAG A,而只依赖于它的输入配置。因此,这种关系至少可以部分忽略。

任务不能再使用 xcom 进行通信。B 任务可以通过 DAG 配置从任务 A 接收输入,但是任务 C 不能从 B 任务获得输出。所有 B 任务的结果应该放在一个已知的位置,然后由任务 C 读取。

也许可以改进到‘ curse _ and _ wait’的 config 参数,将 task _ list 和 conf _ list 结合起来,但是这解决了上面提到的问题。

没有对最终 DAG 进行配置,这应该很容易解决。

只适用于 v2.3及以上版本:

这个特性是通过动态任务映射实现的,只适用于 Airflow 2.3或更高版本

点击这里查看更多文档和示例:

例如:

@task
def make_list():
# This can also be from an API call, checking a database, -- almost anything you like, as long as the
# resulting list/dictionary can be stored in the current XCom backend.
return [1, 2, {"a": "b"}, "str"]




@task
def consumer(arg):
print(list(arg))




with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
consumer.expand(arg=make_list())

例子二:

from airflow import XComArg


task = MyOperator(task_id="source")


downstream = MyOperator2.partial(task_id="consumer").expand(input=XComArg(task))

图形视图和树形视图也被更新:

  • graph view
  • tree view

有关事项:

虽然这个问题在5年前就被问过,但是现在对于很多任务来说都是相关的。 我有相同的要求,我不能找到适当的方法来创建动态任务的基础上输出以前的任务,要求在我的情况下是:-

  1. 根据以前的任务创建动态任务数
  2. 这些任务必须在使用 dataproc 操作符的远程 GCP dataproc 服务器上执行,而不能使用在气流服务器上执行的 Python 操作符。

写的方法我采取简短的 给你。如果你有相同的要求,给它一个阅读,让我知道如果有其他的方式也。希望能有帮助。

你可以像这样动态创建 Postgres 任务,例如:

for filename in some_files:
try:
f = open(f'{filename}')
query = f.read()
dynamic_task = PostgresOperator(
task_id=f"run_{filename}",
postgres_conn_id="some_connection",
sql=query
)
task_start >> dynamic_task >> task_end
finally:
f.close()