Skip to content

Testing DAGs in EKS Airflow

Archived (pre-2022)

Preserved for reference only -- likely outdated. View original | Last updated: December 2021

In case you'd like to test some functionality which affects (or could potentially) entire AirFlow service, it might be a good idea to work under a separate pod (e.g. you need to mount a secret volume, hence restart scheduler instance). This particular example is for EKS AirFlow located in eu-west-1 / Ireland (ofw-prd) (Airflow).

1. Fetch name of the PV containing DAGs

 ~/tmp/ k get pvc | grep dags
airflow-main        Bound    k8s-airflow-airflow-dags   500Gi      RWX                           167d

2. Create a pod with this PV mounted inside

Note the iamge tag, since the repo is quite dynamic the one below could be absent.

apiVersion: v1
kind: Pod
metadata:
  name: airflow-dag-edit
  labels:
    spotinst.io/restrict-scale-down: "True"
spec:
  serviceAccountName: airflow
  containers:
    - name: airflow-dag-edit
      imagePullPolicy: Always
      image: 767648288756.dkr.ecr.eu-west-1.amazonaws.com/bln-airflow:1.10.15-offerwall-latest
      command: ["/bin/sh"]
      args: ["-c", "while true; do echo $(date -u); sleep 5; done"]
      volumeMounts:
        - mountPath: /airflowdags
          name: airflow-dags
          readOnly: false
  nodeSelector:
    airflow: airflow
  tolerations:
  - effect: NoSchedule
    key: noschedule
    operator: Equal
    value: airflow
  - effect: NoExecute
    key: noexecute
    operator: Equal
    value: airflow
  - effect: NoExecute
    key: node.kubernetes.io/not-ready
    operator: Exists
    tolerationSeconds: 300
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    operator: Exists
    tolerationSeconds: 300
  volumes:
    - name: airflow-dags
      persistentVolumeClaim:
        claimName: airflow-main
 ~/tmp/ k apply -f airflow_dag_edit.yaml

3. Add your new DAG to the mounted dir and reload GUI page in browser

 ~/tmp/ k get pods -n airflow
NAME                                      READY   STATUS    RESTARTS   AGE
airflow-dag-edit                          1/1     Running   0          4h12m
airflow-main-scheduler-7d77f7d5cc-696kp   1/1     Running   0          3d1h
airflow-main-web-6f8dc9cfd7-nf62w         1/1     Running   0          3d1h
airflow-main-worker-0                     1/1     Running   0          3d1h

 ~/tmp/ k exec -it airflow-dag-edit -- bash

airflow@airflow-dag-edit:/opt/airflow$ ls /airflowdags/
canary_dag.py  druid_mm_autoscaler_v1.json  druid_mm_autoscaler_v1.py  example_bash_operator.py  tolik_playing_with_airflow.py

4. (Bonus) Tasks could also be tested from scheduler instance via airflow client

 ~/tmp/ k exec -it airflow-main-scheduler-7d77f7d5cc-696kp -- bash

airflow@airflow-main-scheduler-7d77f7d5cc-696kp:/opt/airflow$ airflow dags list
[2021-02-22 15:32:11,538] {__init__.py:50} INFO - Using executor KubernetesExecutor
[2021-02-22 15:32:11,539] {dagbag.py:417} INFO - Filling up the DagBag from /opt/airflow/dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
canary_dag
druid_mm_autoscaler_v1
example_bash_operator
tolik_playing_with_airflow

airflow@airflow-main-scheduler-7d77f7d5cc-696kp:/opt/airflow$ airflow tasks list tolik_playing_with_airflow
[2021-02-22 15:32:36,108] {__init__.py:50} INFO - Using executor KubernetesExecutor
[2021-02-22 15:32:36,110] {dagbag.py:417} INFO - Filling up the DagBag from /opt/airflow/dags
running_cmd

airflow@airflow-main-scheduler-7d77f7d5cc-696kp:/opt/airflow$ airflow tasks test tolik_playing_with_airflow running_cmd 2020-02-01

Local test

Note that airflow tasks test command above: it runs task instances locally (no pod is created), outputs their log to stdout, does not bother with dependencies, and does not communicate state (running, success, failed, …) to the database. It simply allows testing a single task instance.

The same applies to airflow dags test,  but on a DAG level. It performs a single DAG run of the given DAG id. While it does take task dependencies into account, no state is registered in the database. It is convenient for locally testing a full run of your DAG, given that e.g. if one of your tasks expects data at some location, it is available.