How to prevent airflow from backfilling dag runs?

Say you have an airflow DAG that doesn't make sense to backfill, meaning that, after it's run once, running it subsequent times quickly would be completely pointless.

For example, if you're loading data from some source that is only updated hourly into your database, backfilling, which occurs in rapid succession, would just be importing the same data again and again.

This is especially annoying when you instantiate a new hourly task, and it runs N amount of times for each hour it missed, doing redundant work, before it starts running on the interval you specified.

The only solution I can think of is something that they specifically advised against in FAQ of the docs

We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing.

Is there any way to disable backfilling for a DAG, or should I do the above?

53926 次浏览

This appears to be an unsolved Airflow problem. I know I would really like to have exactly the same feature. Here is as far as I've gotten; it may be useful to others.

The are UI features (at least in 1.7.1.3) which can help with this problem. If you go to the Tree view and click on a specific task (square boxes), a dialog button will come up with a 'mark success' button. Clicking 'past', then clicking 'mark success' will label all the instances of that task in DAG as successful and they will not be run. The top level DAG (circles on top) can also be labeled as successful in a similar fashion, but there doesn't appear to be way to label multiple DAG instances.

I haven't looked into it deeply enough yet, but it may be possible to use the 'trigger_dag' subcommand to mark states of DAGs. see here: https://github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d

A CLI feature to mark DAGs is in the works: http://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%3CJIRA.12973462.1464369259000.37918.1465189859133@Atlassian.JIRA%3E https://github.com/apache/incubator-airflow/pull/1590

UPDATE (9/28/2016): A new operator 'LatestOnlyOperator' has been added (https://github.com/apache/incubator-airflow/pull/1752) which will only run the latest version of downstream tasks. Sounds very useful and hopefully it will make it into the releases soon

UPDATE 2: As of airflow 1.8, the LatestOnlyOperator has been released.

Upgrade to airflow version 1.8 and use catchup_by_default=False in the airflow.cfg or apply catchup=False to each of your dags.

https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default

Setting catchup=False in your dag declaration will provide this exact functionality.

I don't have the "reputation" to comment, but I wanted to say that catchup=False was designed (by me) for this exact purpose. In addition, I can verify that in 1.10.1 it is working when set explicitly in the instantiation. However I do not see it working when placed in the default args. I've been away from Airflow for 18 months though, so it will be a bit before I can take a look at why the default args isn't working for catchup.

dag = DAG('example_dag',
max_active_runs=3,
catchup=False,
schedule_interval=timedelta(minutes=5),
default_args=default_args)