How do I fix the failed task in dag of Airflow
Matthew Barrera
I'm new to the Airflow and just tried to do an easy data transformation in the DAG for practice. However, I'm not sure why the first task always failed unexpectedly. Can someone give me some tips for how to debug the failed task in DAG or point the wrong part out? much appreciated.
import pandas as pd
import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def transform_data(*args, **kwargs): df = pd.read_csv("C:/aws-airflow/data.csv") df['element'] = df['element'].str.slice(2).str.replace('-', ",") df.to_csv("C:/aws-airflow/data_new.csv")
default_args = { 'owner' : 'airflow', 'start_date' : datetime(2021, 9, 1), 'retries' : 0, 'retry_delay' : timedelta(minutes = 2)
With DAG( dag_id = 'data_pipeline', schedule_interval = "@daily", default_args = default_args
) as dag: transform_data = PythonOperator( task_id = "transform_data", python_callable = transform_data ) task_end = DummyOperator( task_id = 'none' ) transform_date >> task_end 5 1 Answer
There are 2 issues in your code.
- You have rename your python function to have a different name that the operator object. Let's call it transform_data_fn.
- When you are defining the task dependencies, you have a typo.
transform_datainstead oftransform_date.
I took your code as it was, and changed the 2 mistakes:
import pandas as pd
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def transform_data_fn(*args, **kwargs): df = pd.read_csv("data/data.csv") df["element"] = df["element"] df.to_csv("data/data_new.csv")
default_args = { "owner": "airflow", "start_date": datetime(2021, 9, 1), "retries": 0, "retry_delay": timedelta(minutes=2),
}
with DAG( dag_id="data_pipeline", schedule_interval="@daily", default_args=default_args
) as dag: transform_data = PythonOperator( task_id="transform_data", python_callable=transform_data_fn, dag=dag ) task_end = DummyOperator(task_id="none") transform_data >> task_endNOTE: I'm not doing any transformation in the dataframe as it is irrelevant to the code itself. Also, because I didn't know how your csv file looks like.
2