Event-based Dataflow Job Orchestration with Cloud Composer, Airflow, and Cloud Functions

Hülya Pamukçu Crowell
5 min readMay 13, 2021

Apache Beam and Google Dataflow are helping data engineers adapt Kappa architecture with a unified programming model and a managed service with a production scale runner. In enterprise solutions, data processing jobs are part of complex pipelines, and fully productionizing these jobs means integration with existing Continuous Integration(CI) and automation for Continuous Deployment(CD). This article explores an event-based Dataflow job automation approach using Cloud Composer, Airflow, and Cloud Functions.

Motivation

Google Cloud Platform(GCP) documentation provides reference solutions for setting up a CI/CD pipeline and scheduling Dataflow jobs. However, these solutions lack a simple interface and abstraction from the underlying orchestration system. Decoupling the intent with a simple manifest as the entry point allows us to create a consistent interface for ad-hoc, production runs, batch, and streaming workloads. The event-driven architecture also allows easy pluggability with CI and extensibility.

The following sections outline the approach for automating Dataflow jobs and define the manifest that abstracts the underlying components.

System

The Dataflow job automation system has two main functionalities: rolling code changes and scheduling recurring jobs. It takes rollout and schedule manifests, which allow dynamic overwrite of environment and pipeline options and job templates as input. It manages the lifecycle of jobs with workflows modeled as Airflow DAGs.

At the high level, the workflow execution is event-driven, triggered by uploading the manifests to the Google Cloud Storage(GCS) bucket. Cloud Functions fn acting on GCS bucket operations externally triggers the Airflow DAGs in the Cloud Composer environment to orchestrate the steps. Below is more detail on the rollout and schedule flows.

Rollout flow

Rollout flow pushes the Dataflow job code and runtime parameter changes. Multiple update strategies for the running job could exist with the old version and parameters. Two main strategies are rolling update and restart. In the rolling update, the new and old versions of the job run parallel while validating the new job. In the restart strategy, the old job is first stopped.

Both strategies have trade-offs. The rolling update is zero downtime, but the writes should be isolated, or the consumers should tolerate duplicates. For BigQuery outputs, a facade view can be used, as described in this article. With restart strategy, on the other hand, there will be downtime while the new job starts and generates data. Below, we illustrate the restart strategy, but the rolling update can easily be implemented with a similar DAG.

The rollout starts when the config is pushed to the GCS bucket. It completes when the new job is running. The contents of the config are shown below.

Note that Runtime Environment values and dynamic options to the pipeline can be different for each version defined in the spec and all specified in a single rollout manifest.

If there are any failures in the workflow execution, it will halt, and rollback can be triggered by reversing the spec “from” - “to” fields. The successful flow is outlined below.

  1. The user/CI uploads the Dataflow template to the templates GCS bucket in the project.
  2. The user/CI uploads the rollout config to the GCS bucket.
  3. Cloud Functions fn is triggered due to the create operation; it passes the contents to Cloud Composer REST API to trigger the deployer DAG externally.
  4. The deployer DAG stops the job running with the old version.
  5. The deployer DAG starts the job with the new spec and awaits autoscaling events, metrics, and messages from the job with sensors.
Rollout Flow: Restart Strategy
Dataflow Deployer DAG

Schedule flow

Schedule flow is responsible for creating/updating recurring Dataflow jobs. The DAGs for these need to be registered with Airflow dynamically with a DAG creator, as shown below.

Scheduling a new batch job starts with uploading the manifest to the schedules GCS bucket. It completes when the dynamic DAG is registered in globals. The contents of the schedule config are as follows:

Note that the DAG for the batch job is also entirely configurable with start/end_date, schedule_interval, etc. After it is created dynamically, it will be viewable in the Airflow UI with the name specified as [spec:dag_id]. The corresponding Dataflow job spec is similar to the deployer spec with the runtime environment and dynamic pipeline options, all in a single manifest.

Based on the spec's start-time, end-time, and interval, Airflow will execute the DAG, which launches the jobs. The main steps in the schedule flow are:

  1. The user/CI uploads the Dataflow template to the templates GCS bucket in the project.
  2. The user/CI uploads the schedule config to the GCS bucket
  3. The create operation in the schedules config bucket triggers cloud functions fn.
  4. Controller DAG is triggered externally via Cloud Composer REST API and updates the global variable to add new specs.
  5. Dynamic DAG creator at every dags folder scan checks for a global variable to create/update batch DAGs.
  6. The Airflow scheduler triggers the batch DAG based on the defined start/end date and interval. When executing, it launches a Dataflow job for each slot.
Schedule Flow

Recap

In this article, we discussed automating Dataflow job updates and scheduling with a simple interface hiding the complexities from the user. The solution is scoped to restart strategy with basic validation and basic scheduling. It can be extended with the following features.

  • Automated rollback flow.
  • Additional validation with a test operator to verify the new job's health.
  • Rolling update strategy.
  • Unscheduling DAGs.

References

--

--