Distributed Data Pipelines with AWS ECS Fargate and Prefect Cloud

Intro

This is a step by step guide I wish I had when learning how to deploy Prefect flows with Prefect Cloud and AWS's container services.

Previously, I've been running all of my flows on my local machine.  Though it was incredibly convenient, whenever Windows decided to update, the power went out, or the internet went out - all of my flows were also down too.

As I continue this Data Science journey, I knew this would eventually become unsustainable.

I wanted a solution that was secure, highly available, and scalable.  In the end, I chose AWS since they are the leading cloud infrastructure service provider.  It didn't hurt that I also had $5,000 in AWS credits from Y Combinator's Startup School.

This is really my first step into cloud computing and it certainly has stretched my brain in all sorts of ways.

By the end of this tutorial we'll:

  • have an ECSAgent running on EC2
  • package our flows and store them on AWS ECR
  • define an ECSRun configuration to run our flow in an ECS cluster with serverless AWS Fargate compute.

If this is also your first time learning about containers and AWS, I highly recommend watching this video from Techworld with Nana.

Prerequisites

I tried to make this guide as approachable as possible, you'll only need the following items:

  • AWS Account
  • Prefect Cloud Account
  • Basic Docker concepts
  • Basic Python knowledge

Setting up our AWS Account

Create IAM permissions

For this tutorial, we'll need access to AWS via a terminal, as well as permissions to run new ECS tasks.

First, head over to your AWS console and navigate to the Identity and Access Management (IAM) service.

Create an admin user with programmatic access.

Attach the existing AdministratorAccess policy to the user.

Create the user and save the Access key ID and Secret access key.  We'll need this for later.

Next, head over to the IAM Roles section to create a new role.  Select the Elastic Container Service and then select the Elastic Container Service Task use case.

Search for ecs and select AmazonECS_FullAccess policy.

Give the role a name and create the role.  Set aside the Role Amazon Resources Name (ARN), we will need this later.

Give the role a name and create the role.  Set aside the Role Amazon Resources Name (ARN), we will need this later.

Now let's use these permissions to connect to our workspace via the terminal.

Accessing AWS via the Command Line

AWS Command Line Interface is a unified tool that allows you to manage your AWS Services from the terminal.

To install the CLI, click here to visit the AWS CLI homepage.

Download and run the installer for your operating system.

Verify the installation by running:

aws --version

Configure the CLI to interact with AWS:

aws configure

Enter your user Access Key ID and Secret Access Key from earlier.

For my default region name I've entered us-east-1 and for my default output format, I've entered None.

Create storage for flows with ECR

Now that we have access to AWS via the CLI, our next step is to create a location where our Prefect flows will be stored.

Prefect supports a number of storage options, in this guide we'll be using Amazon's image repository service, AWS Elastic Container Repository (ECR).

To create a new repository, we'll run the following command:

aws ecr create-repository --repository-name basic-etl-prefect-flow

To keep things organized, we'll name the repository the same as our Prefect flow.

We should get a JSON response with a repositoryUri that looks like this:

 <ACCOUNT>.dkr.ecr.<REGION>.amazonaws.com/basic-etl-prefect-flow

Run the following command to authorize Docker to push images to ECR:

aws ecr get-login-password --region <REGION> | docker login -- username AWS --password-stdin <ACCOUNT>.dkr.ecr.<REGION>.amazonaws.com

Save this URI, we'll need it again later when writing our flows.

Now that we have a place to store our flows, we'll set up the container orchestration layer followed by the compute resources where the flows will actually run.

Set up our container orchestration layer with ECS

Similar to Kubernetes and Docker Swarm, Amazon has their own fully managed orchestration service called AWS Elastic Container Service (ECS).

To create our ECS control plane, we'll create a cluster by running the following command:

aws ecs create-cluster --cluster-name my-fargate-cluster --capacity-providers FARGATE

You can name your cluster whatever you'd like.

On success, we should get a JSON response.  Set aside the cluster name, we'll need this again for later.

For our capacity provider, we'll be using AWS Fargate as our serverless compute engine for containers.

We'll focus on writing our flows and let Fargate manage and provision the compute capacity needed for our flows.  Once our flow finishes running, Fargate will tear down the resources and only charge us based on our compute time.

You can learn more about the Fargate pricing model by clicking here.

Now that our cluster is set up, let's move on to setting up Prefect Cloud.

Prefect Cloud

If you're reading this, I'm assuming you're already familiar with the Prefect ecosystem.  But if you're not, check out my previous post, where I go step by step in deploying a basic local Python workflow with Prefect Cloud.

For this tutorial, we'll just need to install Prefect with the optional AWS dependencies, and a couple of tokens to connect Prefect to our workspace and AWS.

Create a local folder and Python file named flow.py.  Then set up a virtual environment:

python -m venv venv

Install Prefect with the optional AWS dependencies:

pip install prefect[aws]

Go to your account settings, personal access tokens, and create a new USER token.  Set this user token aside.

Go to your team API Tokens and create a RUNNER token.  Set this runner token aside.

Authenticate your workspace with Prefect Cloud:

prefect backend cloud

prefect auth login -t <USER TOKEN>

Create a project to organize your flows:

prefect create project Prefect_Tutorials

Now that we have Prefect and AWS set up, we can start writing our flow!

Prefect Flow with ECSRun

First we'll freeze our dependencies:

pip freeze > requirements.txt

Then we'll create a very basic Dockerfile:

# specify a base image
FROM python:3-slim

# copy all folder contents to the image
COPY . .

# install all dependencies
RUN pip install -r requirements.txt

Now, let's create and break down our script.  I'll create a new Python file named flow.py:

import prefect
import pandas as pd

from prefect.storage import Docker
from prefect.run_configs import ECSRun
from prefect import task, Flow, Parameter

As mentioned, we're using ECR to store our code.  We'll use Docker Storage to point to the ECR repository we made earlier:

STORAGE = Docker(registry_url='<ACCOUNT>.dkr.ecr.<REGION>.amazonaws.com/', 
                 image_name='basic-etl-prefect-flow', 
                 image_tag='latest',
                 dockerfile='path/to/Dockerfile')

Next, we'll configure our flow run to be deployed as a task within our ECS cluster:

RUN_CONFIG = ECSRun(run_task_kwargs={'cluster': 'my-fargate-cluster'},
                    execution_role_arn='arn:aws:iam::<ACCOUNT>:role/<ROLE>,
                    labels=['ecs', 'dev'])

The run_task_kwargs coincide with run_task request syntax from boto3.  More details on what you can specify in this argument here.

To keep things simple, we'll only specify the cluster name we made earlier.

Earlier we created an ECS admin Role ARN.  We'll copy and paste the value in execution_role_arn.  This ARN will allow our flow to pull our image from ECR.

Later on, if our flow is pulling from AWS resources within the flow (S3, RDS, etc.) we'll also have to specify an ARN in the task_role_arn argument.

You can create whatever labels you want.  Essentially they control which Agents can run which flows.  You can read more about labels here.

Next, we'll define our and register our flow:

with Flow('basic-etl-prefect-flow', storage=STORAGE, config=RUN_CONFIG) as flow:
     ...
     ...
     ...

flow.register('Prefect_Tutorials')

Now for the magic.

Run the flow in our local environment:

python3 flow.py

Prefect will automatically do the following:

  • Build the Docker image
  • Perform a health check
  • Push the image to AWS ECR
  • Register the flow with Prefect Cloud

Boom!  How awesome is that?!

Once our flow is successfully registered with Prefect Cloud, all we have left to do now is to run an ECSAgent and schedule a flow run.

Set up an ECSAgent

We'll run our ECSAgent locally to test our flow.

Ultimately we'll probably want to run many ECSAgents in a small Elastic Compute Cloud (EC2) with supervisord monitoring their health.

First, we'll authenticate our agent with Prefect Cloud.  Head over to ~/.prefect.  Create the following config.toml file:

[cloud.agent]
auth_token = <RUNNER TOKEN>

To start our ECSAgent, we can run the command directly in Python or create a new script with the following lines:

from prefect.agent.ecs.agent import ECSAgent

AGENT = ECSAgent(cluster="my-fargate-cluster", labels=['ecs', 'dev'])

AGENT.start()

Once our ECSAgent is up and running and waiting for flows.  Next, we'll head over to Prefect Cloud and schedule a quick run of our flow.

From my experience, I found that it took roughly 1.5 to 2 minutes for AWS to provision a Fargate instance and begin a flow run.\

Alternatively, you can use repl.it to host an "Always-On" ECSAgent.  You can fork my Repl and run your own by clicking here.

Image

Final thoughts

Whew!  We've come a long ways.  We now know how to write and build Prefect flows that are meant to be dockerized and pushed to AWS ECR.

When a flow is scheduled, our flows are able to pull the images from ECR and deploy them to AWS Fargate instances!  We now have all the infrastructure and workflows in place to scale additional flows with unlimited compute with AWS.

Hopefully this tutorial has helped you connect some of the dots!  I spent many sleepless nights trying to wrap my head around how all this works.

If you found this tutorial valuable, consider contributing to my AWS bill via PayPal here. :P

Otherwise, a follow on my Twitter would be just as valuable.

Resources

Some of the references and resources that helped me:

Jimmy Le

Jimmy Le

I guess I'm the owner!