Big Data

Dynamic DAG generation with YAML and DAG Factory in Amazon MWAA


Amazon Managed Workflow for Apache Airflow (Amazon MWAA) is a managed service that allows you to use a familiar Apache Airflow environment with improved scalability, availability, and security to enhance and scale your business workflows without the operational burden of managing the underlying infrastructure. In Airflow, Directed Acyclic Graphs (DAGs) are defined as Python code. Dynamic DAGs refer to the ability to generate DAGs on the fly during runtime, typically based on some external conditions, configurations, or parameters. Dynamic DAGs helps you to create, schedule, and run tasks within a DAG based on data and configurations that may change over time.

There are various ways to introduce dynamism in Airflow DAGs (dynamic DAG generation) using environment variables and external files. One of the approaches is to use the DAG Factory YAML based configuration file method. This library aims to facilitate the creation and configuration of new DAGs by using declarative parameters in YAML. It allows default customizations and is open-source, making it simple to create and customize new functionalities.

In this post, we explore the process of creating Dynamic DAGs with YAML files, using the DAG Factory library. Dynamic DAGs offer several benefits:

  1. Enhanced code reusability – By structuring DAGs through YAML files, we promote reusable components, reducing redundancy in your workflow definitions.
  2. Streamlined maintenance – YAML-based DAG generation simplifies the process of modifying and updating workflows, ensuring smoother maintenance procedures.
  3. Flexible parameterization – With YAML, you can parameterize DAG configurations, facilitating dynamic adjustments to workflows based on varying requirements.
  4. Improved scheduler efficiency – Dynamic DAGs enable more efficient scheduling, optimizing resource allocation and enhancing overall workflow runs
  5. Enhanced scalability – YAML-driven DAGs allow for parallel runs, enabling scalable workflows capable of handling increased workloads efficiently.

By harnessing the power of YAML files and the DAG Factory library, we unleash a versatile approach to building and managing DAGs, empowering you to create robust, scalable, and maintainable data pipelines.

Overview of solution

In this post, we will use an example DAG file that is designed to process a COVID-19 data set. The workflow process involves processing an open source data set offered by WHO-COVID-19-Global. After we install the DAG-Factory Python package, we create a YAML file that has definitions of various tasks. We process the country-specific death count by passing Country as a variable, which creates individual country-based DAGs.

The following diagram illustrates the overall solution along with data flows within logical blocks.

Overview of the Solution

Prerequisites

For this walkthrough, you should have the following prerequisites:

Additionally, complete the following steps (run the setup in an AWS Region where Amazon MWAA is available):

  1. Create an Amazon MWAA environment (if you don’t have one already). If this is your first time using Amazon MWAA, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Make sure the AWS Identity and Access Management (IAM) user or role used for setting up the environment has IAM policies attached for the following permissions:

The access policies mentioned here are just for the example in this post. In a production environment, provide only the needed granular permissions by exercising least privilege principles.

  1. Create an unique (within an account) Amazon S3 bucket name while creating your Amazon MWAA environment, and create folders called dags and requirements.
    Amazon S3 Bucket
  2. Create and upload a requirements.txt file with the following content to the requirements folder. Replace {environment-version} with your environment’s version number, and {Python-version} with the version of Python that’s compatible with your environment:
    --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
    dag-factory==0.19.0
    pandas==2.1.4

Pandas is needed just for the example use case described in this post, and dag-factory is the only required plug-in. It is recommended to check the compatibility of the latest version of dag-factory with Amazon MWAA. The boto and psycopg2-binary libraries are included with the Apache Airflow v2 base install and don’t need to be specified in your requirements.txt file.

  1. Download the WHO-COVID-19-global data file to your local machine and upload it under the dags prefix of your S3 bucket.

Make sure that you are pointing to the latest AWS S3 bucket version of your requirements.txt file for the additional package installation to happen. This should typically take between 15 – 20 minutes depending on your environment configuration.

Validate the DAGs

When your Amazon MWAA environment shows as Available on the Amazon MWAA console, navigate to the Airflow UI by choosing Open Airflow UI next to your environment.

Validate the DAG

Verify the existing DAGs by navigating to the DAGs tab.

Verify the DAG

Configure your DAGs

Complete the following steps:

  1. Create empty files named dynamic_dags.yml, example_dag_factory.py and process_s3_data.py on your local machine.
  2. Edit the process_s3_data.py file and save it with following code content, then upload the file back to the Amazon S3 bucket dags folder. We are doing some basic data processing in the code:
    1. Read the file from an Amazon S3 location
    2. Rename the Country_code column as appropriate to the country.
    3. Filter data by the given country.
    4. Write the processed final data into CSV format and upload back to S3 prefix.
import boto3
import pandas as pd
import io
   
def process_s3_data(COUNTRY):
### Top level Variables replace S3_BUCKET with your bucket name ###
    s3 = boto3.client('s3')
    S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
    INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
    OUTPUT_KEY = "dags/count_death"
### get csv file ###
   response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
   status = response['ResponseMetadata']['HTTPStatusCode']
   if status == 200:
### read csv file and filter based on the country to write back ###
       df = pd.read_csv(response.get("Body"))
       df.rename(columns={"Country_code": "country"}, inplace=True)
       filtered_df = df[df['country'] == COUNTRY]
       with io.StringIO() as csv_buffer:
                   filtered_df.to_csv(csv_buffer, index=False)
                   response = s3.put_object(
                       Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Body=csv_buffer.getvalue()
                   )
       status = response['ResponseMetadata']['HTTPStatusCode']
       if status == 200:
           print(f"Successful S3 put_object response. Status - {status}")
       else:
           print(f"Unsuccessful S3 put_object response. Status - {status}")
   else:
       print(f"Unsuccessful S3 get_object response. Status - {status}")

  1. Edit the dynamic_dags.yml and save it with the following code content, then upload the file back to the dags folder. We are stitching various DAGs based on the country as follows:
    1. Define the default arguments that are passed to all DAGs.
    2. Create a DAG definition for individual countries by passing op_args
    3. Map the process_s3_data function with python_callable_name.
    4. Use Python Operator to process csv file data stored in Amazon S3 bucket.
    5. We have set schedule_interval as 10 minutes, but feel free to adjust this value as needed.
default:
  default_args:
    owner: "airflow"
    start_date: "2024-03-01"
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "*/10 * * * *"
 
module3_dynamic_dag_Australia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Australia"
 
module3_dynamic_dag_Brazil:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Brazil"
 
module3_dynamic_dag_India:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "India"
 
module3_dynamic_dag_Japan:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Japan"
 
module3_dynamic_dag_Mexico:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Mexico"
 
module3_dynamic_dag_Russia:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Russia"
 
module3_dynamic_dag_Spain:
  tasks:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/local/airflow/dags/process_s3_data.py
      op_args:
        - "Spain"

  1. Edit the file example_dag_factory.py and save it with the following code content, then upload the file back to dags folder. The code cleans the existing the DAGs and generates clean_dags() method and the creating new DAGs using the generate_dags() method from the DagFactory instance.
from airflow import DAG
import dagfactory
  
config_file = "/usr/local/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
  
## to clean up or delete any existing DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())

  1. After you upload the files, go back to the Airflow UI console and navigate to the DAGs tab, where you will find new DAGs.
    List the new DAGs
  2. Once you upload the files, go back to the Airflow UI console and under the DAGs tab you will find new DAGs are appearing as shown below:DAGs

You can enable DAGs by making them active and testing them individually. Upon activation, an additional CSV file named count_death_{COUNTRY_CODE}.csv is generated in the dags folder.

Cleaning up

There may be costs associated with using the various AWS services discussed in this post. To prevent incurring future charges, delete the Amazon MWAA environment after you have completed the tasks outlined in this post, and empty and delete the S3 bucket.

Conclusion

In this blog post we demonstrated how to use the dag-factory library to create dynamic DAGs. Dynamic DAGs are characterized by their ability to generate results with each parsing of the DAG file based on configurations. Consider using dynamic DAGs in the following scenarios:

  • Automating migration from a legacy system to Airflow, where flexibility in DAG generation is crucial
  • Situations where only a parameter changes between different DAGs, streamlining the workflow management process
  • Managing DAGs that are reliant on the evolving structure of a source system, providing adaptability to changes
  • Establishing standardized practices for DAGs across your team or organization by creating these blueprints, promoting consistency and efficiency
  • Embracing YAML-based declarations over complex Python coding, simplifying DAG configuration and maintenance processes
  • Creating data driven workflows that adapt and evolve based on the data inputs, enabling efficient automation

By incorporating dynamic DAGs into your workflow, you can enhance automation, adaptability, and standardization, ultimately improving the efficiency and effectiveness of your data pipeline management.

To learn more about Amazon MWAA DAG Factory, visit Amazon MWAA for Analytics Workshop: DAG Factory. For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repository.


About the Authors

 Jayesh Shinde is Sr. Application Architect with AWS ProServe India. He specializes in creating various solutions that are cloud centered using modern software development practices like serverless, DevOps, and analytics.

Harshd Yeola is Sr. Cloud Architect with AWS ProServe India helping customers to migrate and modernize their infrastructure into AWS. He specializes in building DevSecOps and scalable infrastructure using containers, AIOPs, and AWS Developer Tools and services.