There is nothing inbuilt in Airflow that does that for you. In order to delete the DAG, delete it from the repository and delete the database entries in the Airflow metastore table - dag.
I just wrote a script that deletes everything related to a particular dag, but this is only for MySQL. You can write a different connector method if you are using PostgreSQL. Originally the commands where posted by Lance on https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0
I just put it in script. Hope this helps. Format: python script.py dag_id
import sys
import MySQLdb
dag_input = sys.argv[1]
query = {'delete from xcom where dag_id = "' + dag_input + '"',
'delete from task_instance where dag_id = "' + dag_input + '"',
'delete from sla_miss where dag_id = "' + dag_input + '"',
'delete from log where dag_id = "' + dag_input + '"',
'delete from job where dag_id = "' + dag_input + '"',
'delete from dag_run where dag_id = "' + dag_input + '"',
'delete from dag where dag_id = "' + dag_input + '"' }
def connect(query):
db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
cur = db.cursor()
cur.execute(query)
db.commit()
db.close()
return
for value in query:
print value
connect(value)
I've written a script that deletes all metadata related to a specific dag for the default SQLite DB. This is based on Jesus's answer above but adapted from Postgres to SQLite. Users should set ../airflow.db to wherever script.py is stored relative to the default airflow.db file (usually ~/airflow). To execute, use python script.py dag_id.
import sqlite3
import sys
conn = sqlite3.connect('../airflow.db')
c = conn.cursor()
dag_input = sys.argv[1]
for t in ["xcom", "task_instance", "sla_miss", "log", "job", "dag_run", "dag" ]:
query = "delete from {} where dag_id='{}'".format(t, dag_input)
c.execute(query)
conn.commit()
conn.close()
For those who are still finding answers. On Airflow version 1.8, its very difficult to delete a DAG, you can refer to answers above. But since 1.9 has been released, you just have to
remove the dag on the dags folder and restart webserver
The PR #2199 (Jira: AIRFLOW-1002) adding DAG removal to Airflow has now been merged which allows fully deleting a DAG's entries from all of the related tables.
Remove the dag(you want to delete) from the dags folder and run airflow resetdb.
Alternatively, you can go into the airflow_db and manually delete those entries from the dag tables(task_fail, xcom, task_instance, sla_miss, log, job, dag_run, dag, dag_stats).
I have airflow version 1.10.2 and I tried executing airflow delete_dag command but the command throws following error:
bash-4.2# airflow delete_dag dag_id
[2019-03-16 15:37:20,804] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=28224
/usr/lib64/python2.7/site-packages/psycopg2/init.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: http://initd.org/psycopg/docs/install.html#binary-install-from-pypi.
""")
This will drop all existing records related to the specified DAG. Proceed? (y/n)y
Traceback (most recent call last):
File "/usr/bin/airflow", line 32, in
args.func(args)
File "/usr/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
return f(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 258, in delete_dag
raise AirflowException(err)
airflow.exceptions.AirflowException: Server error
Though I am able to delete through Curl command.
Please let me know if anyone have idea about this command's execution, is this known or I am doing something wrong.
versions <= 1.9.0:
There is not a command to delete a dag, so you need to first delete the dag file, and then delete all the references to the dag_id from the airflow metadata database.
WARNING
You can reset the airflow meta database, you will erase everything, including the dags, but remember that you will also erase the history, pools, variables, etc.
Airflow 1.10.1 has been released. This release adds the ability to delete a DAG from the web UI after you have deleted the corresponding DAG from the file system.
See this ticket for more details:
[AIRFLOW-2657] Add ability to delete DAG from web ui
Please note that this doesn't actually delete the DAG from the file system, you will need to do this manually first otherwise the DAG will get reloaded.
Based on the answer of @OlegYamin, I'm doing the following to delete a dag backed by postgres, where airflow uses the public schema.
delete from public.dag_pickle where id = (
select pickle_id from public.dag where dag_id = 'my_dag_id'
);
delete from public.dag_run where dag_id = 'my_dag_id';
delete from public.dag_stats where dag_id = 'my_dag_id';
delete from public.log where dag_id = 'my_dag_id';
delete from public.sla_miss where dag_id = 'my_dag_id';
delete from public.task_fail where dag_id = 'my_dag_id';
delete from public.task_instance where dag_id = 'my_dag_id';
delete from public.xcom where dag_id = 'my_dag_id';
delete from public.dag where dag_id = 'my_dag_id';
WARNING: The effect/correctness of the first delete query is unknown to me. It is just an assumption that it is needed.
DAG-s can be deleted in Airflow 1.10 but the process and sequence of actions must be right.
There's an "egg and chicken problem" - if you delete DAG from frontend while the file is still there the DAG is reloaded (because the file is not deleted). If you delete the file first and refresh the page then DAG cannot be deleted from web gui any more.
So the sequence of actions that let me delete a DAG from frontend was:
delete the DAG file (in my case delete from pipeline repository and deploy to airflow servers, esp the scheduler)
DO NOT refresh web GUI.
In the web GUI in the DAGs view (normal frontpage) click on "Delete dag" -> the red icon on the far right.
It cleans up all the remains of this DAG from the database.
First -->
Delete the DAG file from $AIRFLOW_HOME/dags folder.
Note: Depending on whether you have used subdirectories, you may have to dig through the subdirectories to find the DAG file and delete it.
Second -->
Delete the DAG from the Webserver UI using the delete button (x in circle)
For those who have direct access to the Postgres psql console of the airflow db, you can simply execute the following request to remove the DAG:
\set dag_id YOUR_DAG_ID
delete from xcom where dag_id=:'dag_id';
delete from task_instance where dag_id=:'dag_id';
delete from sla_miss where dag_id=:'dag_id';
delete from log where dag_id=:'dag_id';
delete from job where dag_id=:'dag_id';
delete from dag_run where dag_id=:'dag_id';
delete from dag where dag_id=:'dag_id';
A similar (with minor changes) query is suitable for other databases, such as MySQL and SQLite.
where dag_id is the name of the dag. This uses the standard CLI command instead of deleting records from the metadatabase yourself. You also need to delete the DAG file from the dags directory using a PythonOperator.
I have such a DAG that do this:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash import BashOperator
import os
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'start_date': days_ago(1),
'owner': 'airflow',
'retries': 1
}
def delete_dag(**context):
conf = context["dag_run"].conf
dag_id = conf["dag_name"]
t1 = BashOperator(task_id='delete_dag_task', bash_command=f'airflow dags delete -y {dag_id}')
t1.execute(context=context)
def delete_dag_file(**context):
conf = context["dag_run"].conf
dag_id = conf["dag_name"]
script_dir = os.path.dirname(__file__)
dag_file_path = os.path.join(script_dir, '{}.py'.format(dag_id))
try:
os.remove(dag_file_path)
except OSError:
pass
with DAG('dag-deleter',
schedule_interval=None,
default_args=default_args,
is_paused_upon_creation=False,
catchup=False) as dag:
delete_dag = PythonOperator(
task_id="delete_dag",
python_callable=delete_dag,
provide_context=True)
delete_dag_file = PythonOperator(
task_id="delete_dag_file",
python_callable=delete_dag_file,
provide_context=True
)
delete_dag >> delete_dag_file
and I trigger the DAG using the REST API, passing the following payload in the http request:
/opt/airflow$ airflow dags delete my_dag
[2022-10-12 12:21:40,657] {__init__.py:38} INFO - Loaded API auth backend: <module 'airflow.api.auth.backend.deny_all' from '/home/airflow/.local/lib/python3.8/site-packages/airflow/api/auth/backend/deny_all.py'>
This will drop all existing records related to the specified DAG. Proceed? (y/n)y
[2022-10-12 12:21:48,775] {delete_dag.py:43} INFO - Deleting DAG: my_dag
Removed 135 record(s)