Testing DAGs in EKS Airflow¶
Archived (pre-2022)
Preserved for reference only -- likely outdated. View original | Last updated: December 2021
In case you'd like to test some functionality which affects (or could potentially) entire AirFlow service, it might be a good idea to work under a separate pod (e.g. you need to mount a secret volume, hence restart scheduler instance). This particular example is for EKS AirFlow located in eu-west-1 / Ireland (ofw-prd) (Airflow).
1. Fetch name of the PV containing DAGs¶
2. Create a pod with this PV mounted inside¶
Note the iamge tag, since the repo is quite dynamic the one below could be absent.
apiVersion: v1
kind: Pod
metadata:
name: airflow-dag-edit
labels:
spotinst.io/restrict-scale-down: "True"
spec:
serviceAccountName: airflow
containers:
- name: airflow-dag-edit
imagePullPolicy: Always
image: 767648288756.dkr.ecr.eu-west-1.amazonaws.com/bln-airflow:1.10.15-offerwall-latest
command: ["/bin/sh"]
args: ["-c", "while true; do echo $(date -u); sleep 5; done"]
volumeMounts:
- mountPath: /airflowdags
name: airflow-dags
readOnly: false
nodeSelector:
airflow: airflow
tolerations:
- effect: NoSchedule
key: noschedule
operator: Equal
value: airflow
- effect: NoExecute
key: noexecute
operator: Equal
value: airflow
- effect: NoExecute
key: node.kubernetes.io/not-ready
operator: Exists
tolerationSeconds: 300
- effect: NoExecute
key: node.kubernetes.io/unreachable
operator: Exists
tolerationSeconds: 300
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-main
3. Add your new DAG to the mounted dir and reload GUI page in browser¶
~/tmp/ k get pods -n airflow
NAME READY STATUS RESTARTS AGE
airflow-dag-edit 1/1 Running 0 4h12m
airflow-main-scheduler-7d77f7d5cc-696kp 1/1 Running 0 3d1h
airflow-main-web-6f8dc9cfd7-nf62w 1/1 Running 0 3d1h
airflow-main-worker-0 1/1 Running 0 3d1h
~/tmp/ k exec -it airflow-dag-edit -- bash
airflow@airflow-dag-edit:/opt/airflow$ ls /airflowdags/
canary_dag.py druid_mm_autoscaler_v1.json druid_mm_autoscaler_v1.py example_bash_operator.py tolik_playing_with_airflow.py
4. (Bonus) Tasks could also be tested from scheduler instance via airflow client¶
~/tmp/ k exec -it airflow-main-scheduler-7d77f7d5cc-696kp -- bash
airflow@airflow-main-scheduler-7d77f7d5cc-696kp:/opt/airflow$ airflow dags list
[2021-02-22 15:32:11,538] {__init__.py:50} INFO - Using executor KubernetesExecutor
[2021-02-22 15:32:11,539] {dagbag.py:417} INFO - Filling up the DagBag from /opt/airflow/dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
canary_dag
druid_mm_autoscaler_v1
example_bash_operator
tolik_playing_with_airflow
airflow@airflow-main-scheduler-7d77f7d5cc-696kp:/opt/airflow$ airflow tasks list tolik_playing_with_airflow
[2021-02-22 15:32:36,108] {__init__.py:50} INFO - Using executor KubernetesExecutor
[2021-02-22 15:32:36,110] {dagbag.py:417} INFO - Filling up the DagBag from /opt/airflow/dags
running_cmd
airflow@airflow-main-scheduler-7d77f7d5cc-696kp:/opt/airflow$ airflow tasks test tolik_playing_with_airflow running_cmd 2020-02-01
Local test
Note that airflow tasks test command above: it runs task instances locally (no pod is created), outputs their log to stdout, does not bother with dependencies, and does not communicate state (running, success, failed, …) to the database. It simply allows testing a single task instance.
The same applies to airflow dags test, but on a DAG level. It performs a single DAG run of the given DAG id. While it does take task dependencies into account, no state is registered in the database. It is convenient for locally testing a full run of your DAG, given that e.g. if one of your tasks expects data at some location, it is available.