Google Cloud Composer (built on Apache Airflow) Simplified

Google Cloud Platform gives a service called Cloud Composer to schedule, and monitor pipelines that span across hybrid and multi-cloud environments, this service is built on the Apache Airflow open source project and operated using python. When I started using this service, I was not sure how to implement it and use it for my project purpose, I did lot of research and investigation to know more on this service and with time my knowledge on this service has evolved.

Google Cloud Composer built on Apache Airflow

This article is mostly for beginners to understand very basics of Cloud Composer and those who wants to use Cloud Composer for orchestration their data pipelines built in GCP, so lets gets started.

Lets assume that the Airflow environment is already created for your GCP project. Type composer in GCP search bar, which opens a UI as shown below:

Composer UI

Lets understand some important options:

Name: is your project name in which the composer environment was created, this will be attached to your project. If you want to create scheduling across projects then you need a project connection to be established (https://cloud.google.com/composer/docs/how-to/managing/connections)

Airflow: This will open up the airflow scheduling UI, where you can see the workflow, trigger a workflow (DAG), check the logs, etc.

DAGs: Directed Acyclic Graph is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code. We will see a sample DAG in the below section.

Sample DAG: Let’s create and understand a sample DAG, say you want to create and schedule a workflow which takes a file from GCS bucket, loads to BQ and then moves the file to archive location (again a different GCS bucket)

In a DAG, first you need import the libraries which you want to use, in this example, we have imported the default library models used to run airflow DAG, then two GCP operators: GoogleCloudStorageToBigQueryOperator and GoogleCloudStorageToGoogleCloudStorageOperator, and then datetime and timedelta operators for scheduling.

Now we define our GCS bucket locations and default arguments for the DAG such as start date, retries, and there are many others which can be defined as needed but here we only define few which are required.

Now we mention the main processing models.DAG, and mention the details for each task (load_to_bq and move_file_gcs, these are nothing but variables defined for each task) calling the operator which were imported at the start GoogleCloudStorageToBigQueryOperator and GoogleCloudStorageToGoogleCloudStorageOperator, again there are many parameters which can be defined, we have taken only which are required.

Finally, we define our task dependencies by using operator >>

Save this file as <<file_name>>.py (in this example test_load_bq.py) and place it in the DAGs folder (this folder can be opened by clicking the DAGs option on the GUI and upload the file/files using the upload file option.

Next open the Airflow environment by clicking the Airflow option

This will open the Airflow GUI:

Click on the DAG, here it is test_load_bq, this will open the workflow details as shown below, task1 is load_to_bq (this reads the file from GCS and loads to BQ), task2 is move_file_gcs ( this will move the file from one GCS to another GCS bucket). There are various stages mentioned scheduled, skipped, success, failed, running, etc. Here the DAG is in success so shown as Dark Green.

By clicking on any task will open the below, View Log gives the logs for every task, this is helpful to debug the reason incase there is a failure.

So, that’s it for creating and scheduling a sample DAG. There is lot you can explore and do via Cloud Composer. Kindly clap and leave a comment if you think this article is helpful.

Working as a Data Engineer with Cognizant