Building Effective ETL Processes in BigQuery: Data Orchestration with DBT and Airflow

Data Orchestration with DBT and Airflow

This post aims to explain how to configure ETL processes and populate data models hosted on BigQuery. The process utilizes native GCP resources such as BigQuery, Cloud Storage, and Composer (Airflow). The presented use case employs DBT as a framework to process data and populate the data model hosted on BigQuery. Let me explain to you effectively how to utilize these components to create a fully operational data warehouse.

DBT: Transforming Data into Actionable Insights

DBT (Data Build Tool) is an SQL transformation tool that enables analytics teams to collaborate on data models, test and document code while preserving software engineering best practices. It supports modeling in SQL or Python and allows for the use of built-in or custom-written macros. DBT supports databases like BigQuery, Snowflake, Redshift, and Databricks.

BigQuery: Empowering Real-Time Data Analytics

BigQuery is a fully managed, serverless data warehouse and analytics platform provided by Google Cloud. It is designed to handle large-scale, real-time data analytics, enabling users to efficiently store, query, and analyze massive datasets. BigQuery supports standard SQL queries, making it accessible to both data analysts and data scientists. It provides high scalability, automatic scaling, and built-in machine learning capabilities, facilitating the discovery of valuable insights from data for data-driven decision-making.

Composer (Airflow): Orchestrating Data Pipelines

Composer, based on the Apache Airflow framework, emerges as a tool for building, scheduling, and monitoring data pipelines and workflows. With Composer, users can define complex workflows as directed acyclic graphs (DAGs) and automate the execution of tasks at scale. It offers a rich set of pre-built connectors to various data sources and allows integration with other Google Cloud services and external systems. Composer simplifies the management of data pipelines, enhances collaboration among teams, and provides monitoring and alerting capabilities for workflow execution.

Cloud Storage: Secure and Scalable Data Storage

Cloud Storage is a Google Cloud object storage service that provides secure and scalable storage for diverse data types. It allows users to store and retrieve data in various formats, from simple text files to large multimedia files. Cloud Storage offers high durability, availability, and global accessibility, making it suitable for a wide range of applications, including backup and restore, content storage and distribution, data archiving, and serving static website content. It provides features such as lifecycle management, access control, versioning, and automatic scalability, empowering users to effectively manage and control their cloud-based data.

An essential component of the ETL process is orchestration. In the case of DBT, the framework handles this aspect, but it’s crucial to establish dependencies between data ingestion, transformation, and post-processing activities such as snapshots or archiving. To achieve this, orchestration tools like Azure Data Factory (ADF), GCP Workflows, Dagster, Prefect, Airflow, or the in-app scheduling provided by dbt Cloud can be employed to automate the execution of dbt jobs based on specified criteria.

This article will cover orchestrating dbt with Airflow and BigQuery. Depending on the team size, this approach can be cost-effective due to the absence of license costs, although Composer’s cost should be considered. Airflow offers seamless integration with other cloud services, enabling the construction of intricate processes that interact with various platform components. However, this method’s drawback is that not all DBT features offered by the paid version are available.

To initiate work with Airflow in GCP, several options are available, including a Virtual Machine with Airflow installation, a Kubernetes cluster with Airflow, or GCP Composer. Among these options, GCP Composer is the simplest to configure for users lacking experience in Airflow installation and configuration. The steps to configure GCP Composer are as follows:

1. Steps to configure GCP composer:

In the first step we need to create a composer service in our project. This can be accomplished using the GCP console, Terraform, or GCP tools:


gcloud composer environments create dbt-demo \ –location europe-central2 \
–image-version composer-2.2.0-airflow-2.5.1

2. Create a folder for data model scripts

Once the environment is operational, upload an entire dbt model (including models, snapshots, tests, and packages) to a linked GCS bucket, structured as shown below.

Every dbt project necessitates a file called dbt_project.yml, which serves as a configuration file defining project aspects such as the files directory for various file types and the models configuration indicating whether models are configured as tables or views.

3. Configure the dbt profile and upload the file under data/profiles

4. Write and upload Airflow scripts with dbt commands to the DAG folder in GCP bucket

Here you can find an example code for Airflow DAG that runs a DBT steps:

from airflow import DAG
from airflow_dbt.operators.dbt_operator import (
 DbtSnapshotOperator,
 DbtRunOperator,
 DbtTestOperator,
 DbtDepsOperator
)
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
default_args = {
 'start_date': datetime(2023, 5, 31),
 'catchup': False,
 'dir': '/home/airflow/gcs/dags',
 'profiles_dir': '/home/airflow/gcs/data/profiles'
}
with DAG(dag_id='dbt', default_args=default_args, schedule_interval='0 6 * * *') as dag:
start = DummyOperator(
 task_id='start',
 )
dbt_snapshot = DbtSnapshotOperator(
 task_id='dbt_snapshot',
 )
dbt_deps = DbtDepsOperator(
 task_id='dbt_deps',
 )
dbt_run = DbtRunOperator(
 task_id='dbt_run',
 exclude = 'network_endpoint is_provider'
 )
dbt_test = DbtTestOperator(
 task_id='dbt_test',
 retries=0
 )
end = DummyOperator(
 task_id='end',
 )
start >> dbt_deps >> dbt_run >> dbt_snapshot >> dbt_test >> end

Several other dbt operators can be utilized. A complete list of supported dbt commands includes:

  • DbtDocsGenerateOperator → dbt docs generate
  • DbtDepsOperator → dbt deps
  • DbtSeedOperato → dbt seed
  • DbtSnapshotOperator → dbt snapshot
  • DbtRunOperator → dbt run
  • DbtTestOperator → dbt test

DBT offers flexibility in running entire models, excluding unnecessary models from jobs, or selecting specific ones. In Cloud Composer, this can be achieved by specifying arguments such as:

  • models – adds –models argument to the command
  • exclude – adds the –exclude argument to the command
  • select – adds the –select argument to the command

5. Finally, test the sample DAG by directly running it in the Airflow webserver



Leverage the power of data

Implement modern data platform and become data-driven organization

Do you have any questions?