Journey of migrating from Talend to Airflow

Russel Huang
5 min readMay 22, 2021
Photo by Stephen Dawson on Unsplash

Background

Our product is a Loyalty Point system. When we talk about loyalty point, there are 2 basic operation by user perspective :

  1. Earn a point
  2. Redeem a point

Both of the operation seems simple in user’s perspective, but it actually has certain complexity behind it.

Loyalty Point equals money. We need to know where each redeemed point or earned point coming from. Is this 20 points earned from buying 200$ laptop ? Or is it earn from an event hold by e-commerce platform? It also might be user converted the point from other loyalty point system. So we required to track each point sources and generate various kind of financial report (not just one) for accounting team.

In the beginning, the whole system was outsourced to other company. And they decide Talend for collect data and generate the financial report. After we accept the whole system by ourselves and maintain it over a few months, we found some difficulties on using Talend for several reason:

  • Limited to SQL language. This is the most painful thing. We have problems on aggregating the data using pure SQL since the loyalty point logic is complex. In the end, the SQL query grow larger and harder to maintain.
  • Hard to implement on CI/CD.
  • Heavy. Our colleague need two minutes for opening the tools only…

So we started doing some research and looking for better solution, then we find…

Airflow

In a nutshell, airflow is like a scheduler for a workflow pipeline. Workflow has DAGs to execute. DAGs (Directed Acyclic Graph) is a collection of task that we define / write as a Python script. It is easier to develop using programming language to parse and transform each data to final state than using pure SQL.

Another benefit is we can easily breakdown the complex logic to smaller independent task for easier maintainability (also has Asynchronous Parallel execution).

Let’s take serving fruits as an example. You have multiple fruits inside refrigerator, all fruits inside was bought by your mom every morning and it may vary. So it’s dynamic.

Let’s organize the plan:

  1. Take all kind of fruit from refrigerator
  2. Clean each fruit
  3. Cut each fruit
  4. Serve all the fruit together

We can design the workflow like this:

When dealing with data processing you might need to fetch from different sources, aggregating, etc. Instead using Talend with pure SQL, Airflow let us to develop a reusable or function-like component that only have single responsibility. Which led to better maintainability, faster development, and scales.

4 Airflow Main Component

Taken from Airflow website:

  • Web server, a GUI where you can monitor all the task and information
  • Scheduler, Responsible for scheduling jobs
  • Executor, Responsible for executing the tasks
  • Metadata database, Stores all DAGs and tasks states

Executor

Airflow has multiple kinds of executors, but I’ll just explain the one that we used. You can see the rest of it here.

Local Executor

It resembles a single-node architecture. All task and DAGs run on this one server only, but you’re still offered parallelism (asynchronous process on tasks). The downside of this executor is not scale well (Vertical Scale) and single point of failure.

Other than above listed benefits, Local executor also easy to setup. Ideal for testing, but I think depends on your tasks you can also use this executor on production-level.

We make a simple local setup using docker compose here. Just mount local dags folder to the container like this.

version: '3.1'
services:
airflow-webserver:
container_name: airflow-webserver
image: puckel/docker-airflow
ports:
- "8080:8080"
restart: always
environment:
- AIRFLOW_CONN_$CONNECTIONID=$DB_CONN_STRING
volumes:
- $LOCAL_DAGS_FOLDER:/usr/local/airflow/dags

BOOM! You should be able to see airflow web server GUI at your localhost:8080.

As your usage gets heavies and need more resources to run, then you can move to the next executor.

Kubernetes Executor

It utilize kubernetes native pods to run each task individually. You can define how much resources pods needed. They can scale it up and down which also terminate the pods after it finish execute. Another reason we choose this executor is because our services itself are containerized and orchestrated using kubernetes, it’s integrated perfectly. Just helm install.

Metadata Database

Airflow will use SQLite by default when using Local Executor. It will help you to setup and configure the endpoint so there is no need to do additional steps on it.

For Kubernetes Executor, Postgres or MySQL are the option. You also need to configure the endpoint and credentials by yourself.

externalDatabase:  ## the type of external database: {mysql,postgres}  type: postgres  host: ${POSTGRES_ENDPOINT}  port: 5432  database: airflow  user: airflow  ## Kubernetes Secret Contained the password
passwordSecret: ${KUBERNETES_AIRFLOW_PASSWORD_SECRET}

Details on helm values.yaml can be found here

How to fetch DAGs

Git-sync

We used this at first, basically there will be a git-sync container inside each scheduler and webserver pods. It periodically fetch the git repository from remote URL that has been specified, and place it to the shared volumes between the container.

The problem is the documentation does not written clearly on how to set each environment variables (Too much..) for using git-sync. Particularly on how each component (webserver, scheduler, worker) git-sync configuration are configured separately…

In addition of that, even though it is easier to deploy but it has different release tag flow and versioning from our others repository.

Persist to Disk

Airflow by default will scan the dags under $AIRFLOW_HOME/dags folder path. We can create a Dockerfile based on airflow docker image then copy our dags folder to it. Create the custom docker image and push it to private container registry.

FROM apache/airflow:1.10.12-python3.6
USER airflow
COPY dags/ /opt/airflow/dags
COPY ./requirements.txt ./
# Install dependencies
RUN pip install --user -r requirements.txt
EXPOSE 8080

Then in the helm values.yaml add this env variable for letting worker nodes know which docker image to use.

AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: "$DOCKER_IMAGE"
AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: "$IMAGE_TAG"

CI/CD Pipeline

With this setup, let me introduce our build and deploy pipeline.

Build

Simply put, it’s running docker build command based on Dockerfile written above then push to gitlab container registry.

Deploy

We have a custom docker image that have both kubectl and helm pointed to our kubernetes cluster (differ by environment) as our deploy job base image.

develop branch => QA Cluster

release branch => Staging cluster

master branch => Production cluster

We modify webserver and worker container repository to the new one, some db connection config values on helm values.yaml based on current pushed branch. The aftermath is to run helm update based on updated values.yaml

It is pretty similar from our application CI/CD pipeline.

After migrating to Airflow we can feel the benefits immediately by saving our time on dealing with accounting reports. We can focus more on developing the core business value!

--

--

Russel Huang

Curiosity killed the cat, but satisfaction brought it back