Use GCP Workflows to Orchestrate Data Pipelines

GCP Workflows for Data and Machine Learning Pipelines

Data orchestration is a process that organizes and manages your data pipelines. In the case of the Data Lake or Data Warehouse, it supports retrieving data from sources, transforming it, and loading it to a destination. Data pipelines contain several tasks that need to be executed to achieve the expected result. Retrieving data from sources is relatively straightforward. We need to integrate with them, import data, and then we have to transform and load data, which is more complex and requires dependencies between steps. Here we can use a tool that orchestrates tasks in a process. The GCP platform allows us to use Apache Airflow, but recently I found out that we could use GCP Workflows which are much cheaper.

GCP Workflows is a serverless engine that we can use in Google Cloud. It is a fully managed orchestrator and it doesn’t need infrastructure to work. We don’t pay for idle time, just for executions. According to GCP documentation, we can use it in several use cases, such as:

  • App integration and microservice orchestration
  • Business process automation
  • Data and ML pipelines
  • IT process automation
GCP Workflows Use Cases

In this post, I’ll demonstrate how we can use GCP Workflows to build a cheap process to load data into the BigQuery data warehouse or create ML processes. Components that I decided to use:

  • Workflows
  • Cloud Run
  • Container Registry
  • Cloud Storage
  • BigQuery

Cloud Run is a platform that allows you to run your Dockerized code developed in any programming language. Cloud Run Services and Cloud Run Jobs are two ways to run code. With Cloud Run Services you can trigger the execution of your code using an HTTP endpoint. The purpose of a Cloud Run Job is to execute a job and finish it when the work is completed. Cloud Run Service can be really cheap because you only pay for CPU usage. To learn more about pricing, visit cloud.google.com/run/pricing.

Container Registry

The service hosts, manages, and secures Docker container images. You can use it in other GCP services like Compute Engine, CI/CD, and Cloud Run.

Cloud Storage

It’s a storage for your data in GCP platform that allows you to keep data in buckets. You can select different storage classes to manage your costs of storing data and latency.

It’s a serverless data warehouse that helps you manage and analyze your data with built-in features like machine learning, geospatial analysis, and business intelligence without worrying about infrastructure. It’s SQL based engine supporting transactions (ACID). It supports interfaces like the BiGQuery console and the command line tool. Developers can use client libraries that work with programming languages like Python, Java, JavaScript, and Go.

In terms of this article, I’ll present how to build a dockerized application. This will download a gzip file, infer a schema, load files into BigQuery tables, and transform them into a star schema model.

The first step will be to create a Docker file with Python. I will use this container as an environment to execute my Python scripts. I decided to use an example that I found in GCP tutorials. Here you can find instructions on how to create a Docker image that mounts a GCP bucket as a driver inside of your Docker image. You can get the Docker file from this location.

The next step is to create an HTTP endpoint. This will trigger my application actions like downloading files, loading a file into BigQuery, and transforming it using SQL in BigQuery.

import os
import signal
from flask import abort, Flask, redirect, request
import logging
import actions


app = Flask(__name__)
# Set config for file system path and filename prefix
mnt_dir = os.environ.get("MNT_DIR", "/mnt/nfs/filestore")
filename = os.environ.get("FILENAME", "test")


@app.route("/import", methods=["POST"])
def download():
    url = request.get_json()["url"]
    name = request.get_json()["name"]
    logging.info(f"log: {url}")
    actions.import_file(url, name, "/opt/share/raw/")
    return name, 204


@app.route("/load", methods=["POST"])
def load():
    name = request.get_json()["name"]
    logging.info(f"log: {name}")
    actions.load_file_to_bq(name, "/opt/share/raw/", "test", "gs://xxx/raw/")
    return name, 204


@app.route("/exec", methods=["POST"])
def exec():
    name = request.get_json()["name"]
    logging.info(f"log: {name}")
    result = actions.exec_bq_query(name)
    return result, 204


def shutdown_handler(signal, frame):
    """SIGTERM handler"""
    print("Caught SIGTERM signal.", flush=True)
    return


signal.signal(signal.SIGTERM, shutdown_handler)

# To locally run the app
if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))

Once I created my main.py file, I added logic for my application in another file actions.py

from google.cloud import bigquery
import gzip
from google.cloud.bigquery import SchemaField
import time
import requests
import logging


def exec_bq_query(name: str) -> str:
    client = bigquery.Client()
    sql = ""
    with open(name, "r") as file:
        sql = file.read()
    job = client.query(sql)
    job.result()
    return job.destination.table_id


def import_file(url: str, name: str, path: str) -> None:
    start = time.time()
    r = requests.get(url, stream=True)
    with open(f"{path}{name}", "wb") as f:
        for chunk in r.raw.stream(1024, decode_content=False):
            if chunk:
                f.write(chunk)

    end = time.time()
    logging.info(f"{end-start}")


def read_schema(name: str, path: str) -> str:
    fields = []
    with gzip.open(f"{path}{name}", "rt") as f:
        for line in f:
            fields = line.split("\t")
            break
    schema = []
    for x in fields:
        schema.append(SchemaField(x.replace("\n", ""), "STRING"))
    return schema


def load_file_to_bq(name: str, path: str, bq_dataset: str, gcp_path: str) -> None:
    client = bigquery.Client()

    schema = read_schema(name, path)
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
        field_delimiter="\t",
        ignore_unknown_values=True,
        quote_character="",
        schema=schema,
    )
    tab_name = name[: name.find(".")]
    job = client.load_table_from_uri(
        f"{gcp_path}{name}", f"{bq_dataset}.{tab_name}", job_config=job_config
    )
    job.result()

I decided to use the IMDb dataset for this experiment. I have a function that downloads files from HTTP. I created a method that reads the headers of files to create table schemas. I tried to use the “autodetect” property in LoadJobConfiguration, but I faced issues with proper data interpretation. The root of the problem here is that the Python library only examines the first 500 rows. If you investigate files in the dataset you will see that for instance rating file has millions of rows. I can use load_table_from_uri to load data from the storage without having to load it into docker’s memory. When data are loaded I execute SQL queries that transform data and create or replace tables.

Docker deployment and the environment configuration

To configure our environment, we need to create a Cloud Storage bucket, as well as a BiQuery dataset and deploy the container to Cloud Run. I will explain how to do it in steps.

Create a bucket

gsutil mb -l REGION gs://BUCKET_NAME
	gcloud iam service-accounts create service_account_name
gcloud projects add-iam-policy-binding PROJECT_ID
\
     --member "serviceAccount:service_account_name@PROJECT_ID
.iam.gserviceaccount.com" \
     --role "roles/storage.objectAdmin"

Build image 

docker build . -t gcp:v1

Debug application

docker run-v HOST_PATH:/app --env-file .env --privileged -p 8080:8080 gcp:v1

I use here a .env file where I keep environment variable values for my local development.

Build and deploy the container image to Cloud Run

gcloud beta run deploy download-app --source . `
     --execution-environment gen2 `
     --allow-unauthenticated `
     --service-account service_account_name `
     --update-env-vars BUCKET=XXX `
     --update-env-vars MNT_DIR=/opt/share

Data Transformation and Orchestration

The last step is to develop a workflow that will orchestrate the process. You can create it using the GCP console. The define workflow tab allows us to create a workflow using YAML notation. You can find my YAML file below.

main:
    params: [input]
    steps:
    - innerLoop:
            for:
              value: items  # innerLoopValue does not exist outside of innerLoop step
              in: [{"url":"https://datasets.imdbws.com/name.basics.tsv.gz", "name": "name.tsv.gz"},
                    {"url":"https://datasets.imdbws.com/title.ratings.tsv.gz", "name": "title_ratings.tsv.gz"},
                    {"url":"https://datasets.imdbws.com/title.principals.tsv.gz", "name": "title_principals.tsv.gz"},
                    {"url":"https://datasets.imdbws.com/title.basics.tsv.gz", "name": "title_basics.tsv.gz"},
                    {"url":"https://datasets.imdbws.com/title.episode.tsv.gz", "name": "title_episode.tsv.gz"}
                    ]
              steps:
                - innerLoopAssign:
                    assign:
                      - workflowScope: ${items}
                - download_cr:
                    call: http.post
                    args:
                        url: https://xxx.a.run.app/import
                        auth:
                            type: OIDC
                        body:
                            "name": ${items.name}
                            "url": ${items.url}
                - load_to_BQ:
                    call: http.post
                    args:
                        url: https://xxx.a.run.app/load
                        auth:
                            type: OIDC
                        body:
                            "name": ${items.name}
    - dim_title:
        call: http.post
        args:
            url: https://xxx.a.run.app/exec
            auth:
                type: OIDC
            body:
                "name": "dim_title.sql"
        result: dim_tile_result
 
    - fact_ratings:
        call: http.post
        args:
            url: https://xxx.a.run.app/exec
            auth:
                type: OIDC
            body:
                "name": "fact_ratings.sql"
        result: dim_tile_result

I will not get into details regarding the creation of steps in the workflow structure. You can find this information in the Workflow documentation. As you can see in the code I created the list of files that I need to import. Then we have a loop that downloads files. The next step is to load them into BigQuery tables. In the end, I call a script that executes SQL transformations.

CREATE OR REPLACE TABLE `xxx.test.dim_title`  
AS
SELECT 
	ROW_NUMBER() OVER() AS id,
  tconst,		
  titleType,		
  primaryTitle,		
  originalTitle,			
  isAdult,			
  case when startYear='\n' then null else startYear end startYear,			
  case when endYear='\n' then null else endYear end endYear,		
  runtimeMinutes,			
  genres,
  CURRENT_DATETIME() Create_timestamp

 FROM `xxx.test.title_basics` 
diagram

Once our workflow is in place, we can test the entire process. In this way, I was able to create a simple process that would populate tables in my data warehouse. It is a very simple example with just two tables, but you can extend it by adding additional dimensions and fact tables to populate them in your data warehouse. 

Machine Learning Use Case

Another use case could be an ML process where you need to import data from SQL databases, REST API, or parquet files. Cleanse or transform the data. Once data is ready you can run a machine learning model that will find anomalies, classify your clients, find patterns, or forecast trends.



Looking for GCP solutions?

Check out our services and migrate with us to the next level

Do you have any questions?