[FarFlow] Using AWS Fargate(ECS) to host Apache Airflow

This commit is contained in:
Chaithanya Maisagoni
2020-08-12 16:34:16 -07:00
commit ecde2a83a2
23 changed files with 1750 additions and 0 deletions

26
airflow/Dockerfile Executable file
View File

@@ -0,0 +1,26 @@
FROM apache/airflow:1.10.11
ENV AIRFLOW_HOME=/usr/local/airflow
USER root
RUN apt-get update && apt-get install -y python3-pip \
libcurl4-gnutls-dev \
librtmp-dev \
python3-dev \
libpq-dev
RUN python3 -m pip install PyGreSQL argcomplete pycurl
COPY ./config/* /
COPY ./dags ${AIRFLOW_HOME}/dags
RUN chown -R airflow: ${AIRFLOW_HOME}
EXPOSE 8080
USER airflow
WORKDIR ${AIRFLOW_HOME}
# ENTRYPOINT ["/entrypoint.sh"]

View File

@@ -0,0 +1,5 @@
#!/usr/bin/env bash
set -Eeuxo pipefail
sleep 30
airflow scheduler

View File

@@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -Eeuxo pipefail
airflow initdb
sleep 5
airflow webserver

5
airflow/config/worker_entry.sh Executable file
View File

@@ -0,0 +1,5 @@
#!/usr/bin/env bash
set -Eeuxo pipefail
sleep 30
airflow worker

110
airflow/dags/dag.py Normal file
View File

@@ -0,0 +1,110 @@
import os
import sys
from datetime import datetime
from datetime import timedelta
from pprint import pprint
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.ecs_operator import ECSOperator
DAG_NAME = 'Test_Dag'
default_args = {
'owner': 'CM',
'start_date': datetime(2019, 6, 8),
'email': ['xyz@amazon.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1)
}
def get_ecs_operator_args(taskDefinitionName, taskContainerName, entryFile, param):
return dict(
launch_type="FARGATE",
# The name of your task as defined in ECS
task_definition=taskDefinitionName,
# The name of your ECS cluster
cluster=os.environ['CLUSTER'],
network_configuration={
'awsvpcConfiguration': {
'securityGroups': [os.environ['SECURITY_GROUP']],
'subnets': os.environ['SUBNETS'].split(","),
'assignPublicIp': "DISABLED"
}
},
overrides={
'containerOverrides': [
{
'name': taskContainerName,
'command': ["python", entryFile, param]
}
]
}
)
oddTaskConfig = {
'taskDefinitionName': 'FarFlowCombinedTask',
'taskContainerName': 'MultiTaskContainer',
'entryFile': 'odd_numbers.py',
'param': '10'
}
evenTaskConfig = {
'taskDefinitionName': 'FarFlowCombinedTask',
'taskContainerName': 'MultiTaskContainer',
'entryFile': 'even_numbers.py',
'param': '10'
}
numbersTaskConfig = {
'taskDefinitionName': 'FarFlowNumbersTask',
'taskContainerName': 'NumbersContainer',
'entryFile': 'numbers.py',
'param': '10'
}
oddTask_args = get_ecs_operator_args(**oddTaskConfig)
evenTask_args = get_ecs_operator_args(**evenTaskConfig)
numbersTask_args = get_ecs_operator_args(**numbersTaskConfig)
dag = DAG( DAG_NAME,
schedule_interval=None,
default_args=default_args)
start_process = DummyOperator(task_id="start_process", dag=dag)
# Following tasks will get triggered from worker and runs on OnDemand Fargate Task
odd_task = ECSOperator(task_id="odd_task", **oddTask_args, dag=dag)
even_task = ECSOperator(task_id="even_task", **evenTask_args, dag=dag)
numbers_task = ECSOperator(task_id="numbers_task", **numbersTask_args, dag=dag)
# [START howto_operator_python]
# Pulled from : https://github.com/apache/airflow/blob/master/airflow/example_dags/example_python_operator.py#L40
def print_context(ds, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
task_config = {
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": "value4"
}
on_worker_task = PythonOperator(
task_id='runs_on_worker',
python_callable=print_context,
dag=dag,
op_args=[task_config]
)
# [END howto_operator_python]
start_process >> [odd_task, even_task] >> numbers_task >> on_worker_task