Adding support for EFS mounts.

Changelog:
* Upgraded CDK version to support EFS usage
* Upgraded Fargate PlatformVersion to support EFS mounts
* Refacored RDS contruct as per new CDK
* Created a new LogGroup for OnDemand DagTasks
* Added TAG for stack, to track resources belonging to this setup
* Updated sample DAG to utilize EFS. Tasks Odd and Even will publish to EFS and Numbers will read from EFS
* Now you can see logs from OnDemand tasks on Airflow UI, once task run finishes
This commit is contained in:
Chaithanya Maisagoni
2020-12-08 11:58:26 -08:00
parent 91438606d9
commit 7814377342
12 changed files with 2568 additions and 576 deletions

View File

@@ -28,6 +28,7 @@ def get_ecs_operator_args(taskDefinitionName, taskContainerName, entryFile, para
launch_type="FARGATE",
# The name of your task as defined in ECS
task_definition=taskDefinitionName,
platform_version="1.4.0",
# The name of your ECS cluster
cluster=os.environ['CLUSTER'],
network_configuration={
@@ -44,7 +45,9 @@ def get_ecs_operator_args(taskDefinitionName, taskContainerName, entryFile, para
'command': ["python", entryFile, param]
}
]
}
},
awslogs_group="FarFlowDagTaskLogs",
awslogs_stream_prefix="FarFlowDagTaskLogging/"+taskContainerName
)
oddTaskConfig = {

View File

@@ -1,22 +1,46 @@
import { Construct } from "@aws-cdk/core";
import { AwsLogDriver } from "@aws-cdk/aws-ecs";
import {AwsLogDriver, } from "@aws-cdk/aws-ecs";
import { RetentionDays } from "@aws-cdk/aws-logs";
import {IVpc, ISecurityGroup, Port} from "@aws-cdk/aws-ec2";
import efs = require('@aws-cdk/aws-efs');
import { LogGroup } from '@aws-cdk/aws-logs';
import { AirflowDagTaskDefinition } from "./task-construct"
import { AirflowDagTaskDefinition, EfsVolumeInfo } from "./task-construct"
export interface DagTasksProps {
readonly vpc: IVpc;
readonly defaultVpcSecurityGroup: ISecurityGroup;
}
export class DagTasks extends Construct {
constructor(
scope: Construct,
taskName: string,
props: DagTasksProps
) {
super(scope, taskName + "-TaskConstruct");
const logging = new AwsLogDriver({
streamPrefix: 'FarFlowDagTaskLogging',
logRetention: RetentionDays.ONE_MONTH
logGroup: new LogGroup(scope, "FarFlowDagTaskLogs", {
logGroupName: "FarFlowDagTaskLogs",
retention: RetentionDays.ONE_MONTH
})
});
let sharedFS = new efs.FileSystem(this, 'EFSVolume', {
vpc: props.vpc,
securityGroup: props.defaultVpcSecurityGroup
});
sharedFS.connections.allowInternally(Port.tcp(2049));
let efsVolumeInfo: EfsVolumeInfo = {
containerPath: "/shared-volume",
volumeName: "SharedVolume",
efsFileSystemId: sharedFS.fileSystemId
}
// Task Container with multiple python executables
new AirflowDagTaskDefinition(this, 'FarFlowCombinedTask', {
containerInfo: {
@@ -26,7 +50,8 @@ export class DagTasks extends Construct {
cpu: 512,
memoryLimitMiB: 1024,
taskFamilyName: "FarFlowCombinedTask",
logging: logging
logging: logging,
efsVolumeInfo: efsVolumeInfo
});
// Task Container with single python executable
@@ -38,7 +63,8 @@ export class DagTasks extends Construct {
cpu: 256,
memoryLimitMiB: 512,
taskFamilyName: "FarFlowNumbersTask",
logging: logging
logging: logging,
efsVolumeInfo: efsVolumeInfo
});
}
}

View File

@@ -1,7 +1,7 @@
import { Duration, Construct } from "@aws-cdk/core";
import {
DatabaseInstance,
DatabaseInstanceEngine,
DatabaseInstanceEngine, PostgresEngineVersion,
StorageType
} from "@aws-cdk/aws-rds";
import { ISecret, Secret } from "@aws-cdk/aws-secretsmanager";
@@ -59,7 +59,9 @@ export class RDSConstruct extends Construct {
);
this.rdsInstance = new DatabaseInstance(this, "RDSInstance", {
engine: DatabaseInstanceEngine.POSTGRES,
engine: DatabaseInstanceEngine.postgres({
version: PostgresEngineVersion.VER_12_4
}),
instanceType: defaultDBConfig.instanceType,
instanceIdentifier: defaultDBConfig.dbName,
vpc: props.vpc,
@@ -72,9 +74,11 @@ export class RDSConstruct extends Construct {
storageType: StorageType.GP2,
backupRetention: Duration.days(defaultDBConfig.backupRetentionInDays),
deletionProtection: false,
masterUsername: defaultDBConfig.masterUsername,
credentials: {
username: defaultDBConfig.masterUsername,
password: databasePasswordSecret
},
databaseName: defaultDBConfig.dbName,
masterUserPassword: databasePasswordSecret,
port: defaultDBConfig.port
});

View File

@@ -1,13 +1,12 @@
import { Construct, CfnOutput, Duration } from "@aws-cdk/core";
import { IVpc } from "@aws-cdk/aws-ec2";
import {CfnOutput, Construct, Duration} from "@aws-cdk/core";
import {IVpc} from "@aws-cdk/aws-ec2";
import {FargatePlatformVersion, FargateTaskDefinition} from '@aws-cdk/aws-ecs';
import {PolicyConstruct} from "../policies";
import {workerAutoScalingConfig} from "../config";
import ecs = require('@aws-cdk/aws-ecs');
import ec2 = require("@aws-cdk/aws-ec2");
import elbv2 = require("@aws-cdk/aws-elasticloadbalancingv2");
import { FargateTaskDefinition } from '@aws-cdk/aws-ecs';
import { PolicyConstruct } from "../policies";
import { workerAutoScalingConfig } from "../config";
export interface ServiceConstructProps {
readonly vpc: IVpc;
@@ -37,7 +36,8 @@ export class ServiceConstruct extends Construct {
this.fargateService = new ecs.FargateService(this, name, {
cluster: props.cluster,
taskDefinition: props.taskDefinition,
securityGroup: props.defaultVpcSecurityGroup
securityGroup: props.defaultVpcSecurityGroup,
platformVersion: FargatePlatformVersion.VERSION1_4
});
const allowedPorts = new ec2.Port({
protocol: ec2.Protocol.TCP,

View File

@@ -3,6 +3,7 @@ import { Construct } from "@aws-cdk/core";
import ecs = require('@aws-cdk/aws-ecs');
import { DockerImageAsset } from '@aws-cdk/aws-ecr-assets';
import { FargateTaskDefinition } from '@aws-cdk/aws-ecs';
import {ManagedPolicy} from "@aws-cdk/aws-iam";
export interface AirflowDagTaskDefinitionProps {
readonly taskFamilyName: string;
@@ -10,6 +11,7 @@ export interface AirflowDagTaskDefinitionProps {
readonly cpu: number;
readonly memoryLimitMiB: number;
readonly logging: ecs.LogDriver;
readonly efsVolumeInfo?: EfsVolumeInfo;
}
export interface ContainerInfo {
@@ -17,6 +19,12 @@ export interface ContainerInfo {
readonly assetDir: string;
}
export interface EfsVolumeInfo {
readonly volumeName: string;
readonly efsFileSystemId: string;
readonly containerPath: string;
}
export class AirflowDagTaskDefinition extends Construct {
constructor(
@@ -33,14 +41,32 @@ export class AirflowDagTaskDefinition extends Construct {
family: props.taskFamilyName
});
if (props.efsVolumeInfo) {
workerTask.addVolume({
name: props.efsVolumeInfo.volumeName,
efsVolumeConfiguration: {
fileSystemId: props.efsVolumeInfo.efsFileSystemId
}
});
workerTask.taskRole.addManagedPolicy(ManagedPolicy.fromAwsManagedPolicyName("AmazonElasticFileSystemClientReadWriteAccess"));
}
const workerImageAsset = new DockerImageAsset(this, props.containerInfo.name + '-BuildImage', {
directory: props.containerInfo.assetDir,
});
workerTask.addContainer(props.containerInfo.name, {
let container = workerTask.addContainer(props.containerInfo.name, {
image: ecs.ContainerImage.fromDockerImageAsset(workerImageAsset),
logging: props.logging
});
if (props.efsVolumeInfo) {
container.addMountPoints({
containerPath: props.efsVolumeInfo.containerPath,
sourceVolume: props.efsVolumeInfo.volumeName,
readOnly: false
});
}
}
}

View File

@@ -13,6 +13,8 @@ class FarFlow extends cdk.Stack {
// Create VPC and Fargate Cluster
// NOTE: Limit AZs to avoid reaching resource quotas
let vpc = new ec2.Vpc(this, 'Vpc', { maxAzs: 2 });
cdk.Tags.of(scope).add("Stack", "FarFlow");
let cluster = new ecs.Cluster(this, 'ECSCluster', { vpc: vpc });
// Setting default SecurityGroup to use across all the resources
@@ -34,7 +36,10 @@ class FarFlow extends cdk.Stack {
});
// Create TaskDefinitions for on-demand Fargate tasks, invoked from DAG
new DagTasks(this, "DagTasks");
new DagTasks(this, "DagTasks", {
vpc: vpc,
defaultVpcSecurityGroup: defaultVpcSecurityGroup
});
}
}

View File

@@ -12,6 +12,8 @@ export class PolicyConstruct extends Construct {
this.managedPolicies = [
ManagedPolicy.fromAwsManagedPolicyName("AmazonSQSFullAccess"),
ManagedPolicy.fromAwsManagedPolicyName("AmazonECS_FullAccess"),
ManagedPolicy.fromAwsManagedPolicyName("AmazonElasticFileSystemClientReadWriteAccess"),
ManagedPolicy.fromAwsManagedPolicyName("CloudWatchLogsReadOnlyAccess")
];
/*

2980
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -19,12 +19,12 @@
"typescript": "~3.7.2"
},
"dependencies": {
"@aws-cdk/aws-ec2": "*",
"@aws-cdk/aws-ecs": "*",
"@aws-cdk/aws-ecs-patterns": "*",
"@aws-cdk/aws-ecr-assets": "*",
"@aws-cdk/aws-rds": "*",
"@aws-cdk/core": "*",
"@aws-cdk/aws-ec2": "1.76.0",
"@aws-cdk/aws-ecs": "1.76.0",
"@aws-cdk/aws-ecs-patterns": "1.76.0",
"@aws-cdk/aws-ecr-assets": "1.76.0",
"@aws-cdk/aws-rds": "1.76.0",
"@aws-cdk/core": "1.76.0",
"@types/uuid": "8.3.0",
"uuid": "^8.3.0"
}

View File

@@ -8,6 +8,9 @@ if __name__ == '__main__':
number = args.number
print("Printing Even numbers in given range")
f = open("/shared-volume/even.txt", "a")
for i in range(int(number)):
if(i % 2 == 0):
f.write(str(i))
print(i)
f.close()

View File

@@ -8,6 +8,9 @@ if __name__ == '__main__':
number = args.number
print("Printing Odd numbers in given range")
f = open("/shared-volume/odd.txt", "a")
for i in range(int(number)):
if(i % 2 != 0):
f.write(str(i))
print(i)
f.close()

View File

@@ -1,11 +1,47 @@
from argparse import ArgumentParser
import os
parser = ArgumentParser(description='Airflow Fargate Example')
parser.add_argument('number', help='number', type=int)
def delete_file(file_path):
try:
os.remove(file_path)
print("Successfully deleted file: " + file_path)
except OSError:
print("File not found: " + file_path)
pass
if __name__ == '__main__':
args = parser.parse_args()
number = args.number
print("Printing all numbers in given range")
for i in range(int(number)):
print(i)
f_numbers = open("/shared-volume/numbers.txt", "a")
# Copy from even.txt to numbers.txt
f_even = open("/shared-volume/even.txt", "r")
for line in f_even:
f_numbers.write(line)
f_even.close()
# Copy from odd.txt to numbers.txt
f_odd = open("/shared-volume/odd.txt", "r")
for line in f_odd:
f_numbers.write(line)
f_odd.close()
f_numbers.close()
# Print contents of numbers.txt
f_numbers = open("/shared-volume/numbers.txt", "r")
for line in f_numbers:
print(line)
print("\n")
f_numbers.close()
# Deleting all files, to avoid EFS cost
delete_file("/shared-volume/even.txt")
delete_file("/shared-volume/odd.txt")
delete_file("/shared-volume/numbers.txt")
delete_file("/shared-volume/numbers.txt") # Will result in File not found message