How to run distributed machine learning jobs using Apache Spark and EC2 (and Python)

This is the age of big data.

Sometimes sci-kit learn doesn’t cut it.

In order to make your operations and data-driven decisions scalable – you need to distribute the processing of your data.

Two popular libraries that do such distributed machine learning are Mahout (which uses MapReduce) and MLlib (which uses Spark, which is sometimes considered as a successor to MapReduce).

What I want to do with this tutorial is to show you how easy it is to do distributed machine learning using Spark and EC2.

When I started a recent project of mine, I was distraught at how complicated a Mahout setup could be.

I am not an ops person. I hate installing and configuring things. For something like running distributed k-means clustering, 90% of the work could go into just setting up a Hadoop cluster, installing all the libraries your code needs to run, making sure they are the right versions, etc…

The Hadoop ecosystem is very sensitive to these things, and sometimes MapReduce jobs can be very hard to debug.

With Spark, everything is super easy. Installing Spark and Hadoop is tedious but do-able. Spinning up a cluster is very easy. Running a job is very easy. We will use Python, but you can also use Scala or Java.

Outline of this tutorial:

  1. Install Spark on a driver machine.
  2. Create a cluster.
  3. Run a job.

 

1. Install Spark

I used an Ubuntu instance on EC2. I’m assuming you already know how to set up a security group, get your PEM, and SSH into the machine.

image

Once you’ve spun up your AMI, we can begin installing all the stuff we’ll need.

To make this even easier you can probably do this on your local machine, but if for some reason you’re using Windows or you don’t want to mess up your local machine, then you’ll want to do this.

First, set your AWS ID and secret environment variables.

export AWS_ACCESS_KEY_ID=…
export AWS_SECRET_ACCESS_KEY=…

Now install Java:

sudo apt-get update
sudo apt-get install default-jdk maven
export MAVEN_OPTS=”-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m”

For the last line, we will need this RAM available to build Spark, if I remember correctly.

Now download and build Spark:

wget http://mirror.cc.columbia.edu/pub/software/apache/spark/spark-1.3.0/spark-1.3.0.tgz
tar -xf spark-1.3.0.tgz
cd spark-1.3.0
mvn -DskipTests clean package

By the time you read this a new version of Spark may be available. You should check.

 

2. Create a Cluster

Assuming you are in the Spark folder now, it is very easy to create a cluster to run your jobs:

./ec2/spark-ec2 -k “Your Key Pair Name” -i /path/to/key.pem -s <number of slaves> launch <cluster name> —copy-aws-credentials -z us-east-1b

I set my zone as “us-east-1b” but you can set it to a zone of your choice.

When you’re finished, don’t forget to tear down your cluster! On-demand machines are expensive.

./spark-ec2 destroy <cluster name>

For some reason, numpy isn’t installed when you create a cluster, and the default Python distribution on the m1.large machines is 2.6, while Spark installs its own 2.7. So, even if you easy_install numpy on each of the machines in the cluster, it won’t work for Spark.

You can instead copy the library over to each cluster machine from your driver machine:

scp -i /path/to/key.pem /usr/lib/python2.7/dist-packages/numpy* root@<cluster-machine>:/usr/lib/python2.7/dist-packages/
scp -r -i /path/to/key.pem /usr/lib/python2.7/dist-packages/numpy root@<cluster-machine>:/usr/lib/python2.7/dist-packages/

You can easily write a script to automatically copy all this stuff over (get the machine URLs from the EC2 console).

 

3. Run a Job

Spark gives you a Python shell.

First, go to your EC2 console and find the URL for your cluster master. SSH into that machine (username is root).

cd spark
MASTER=spark://<cluster-master-ip>:7077 ./pyspark

Import libraries:

from pyspark.mllib.clustering import KMeans
from numpy import array

Load your data:

data = sc.textFile(“s3://<my-bucket>/<path>/*.csv”)

Note 1: You can use a wildcard to grab multiple files into one variable – called an RDD – resilient distributed dataset.

Note 2: Spark gives you a variable called ‘sc’, which is an object of type SparkContext. It specifies the master node, among other things.

Maybe filter out some bad lines:

data = data.filter(lambda line: ‘ERROR’ not in line)

Turn each row into an array / vector observation:

data = data.map(lambda line: array([float(x) for x in line.split()]))

Train your model:

clusters = KMeans.train(parsedData, 2, maxIterations=20,
runs=1, initializationMode=”k-means||”)

Save some output:

sc.parallelize(clusters.centers).saveAsTextFile(”s3://…./output.csv”)

You can also run a standalone Python script using spark-submit instead of the shell.

./bin/spark-submit —master spark://<master-ip>:7077 myscript.py

Remember you’ll have to instantiate your own SparkContext in this case.

 

Future Improvements

The goal of this tutorial is to make things easy.

There are many areas for improvement – for instance – on-demand machines on Amazon are the most expensive.

Spark still spins up “m1.large” instances, even though EC2′s current documentation recommends using the better, faster, AND cheaper “m3.large” instance instead.

At the same time, that custom configuration could mean we can’t use the spark-ec2 script to spin up the cluster automatically. There might be an option there to choose. I didn’t really look.

One major reason I wrote this tutorial is because all the information in it is out there in some form, but it is disparate and some of it can be hard to find without knowing what to search for.

So that’s it. The easiest possible way to run distributed machine learning.

How do you do distributed machine learning?