End-to-End Data Science Using Spark on Azure HDInsight

Introduction

As a part of the Microsoft Data Science Process, we’ve created a comprehensive walkthrough using pySpark and MLlib to demonstrate how to conduct end-to-end data science on Azure HDInsight Spark clusters. With detailed examples and pySpark code that are accessible publicly from a GitHub repository, we highlight how to:060916_0012_EndtoEndDat1.png

  • Easily provision a managed Azure HDInsight Spark cluster, use Azure storage blobs for data import and export, and use Jupyter notebook server on the cluster.
  • Quickly upload sample Jupyter notebooks from a public GitHub repository to your Spark cluster and start running them immediately in Spark to perform end-to-end data science.
  • Train machine learning models using cross validation and hyper-parameter sweeping.
  • Consume the trained models in production using Spark.

The entire walkthrough can be finished in about 20 mins (on a cluster with 2 data-nodes), once a HDInsight cluster is provisioned.

In the walkthrough, we used a sample of the 2013 NYC taxi trip and fare dataset to build ML models to predict whether a taxi trip was tipped or not (classification), or how much a trip was tipped (regression). The models include logistic and linear regression, random forests and gradient boosted trees, using Spark’s MLlib API. We also show how to store these models in Azure blob storage, and consume them in production to score new data-sets. Advanced modeling and consumption topics cover model training using cross-validation and hyper-parameter sweeping, and how to schedule automated scoring of data-sets using saved models.

We believe this walkthrough is going to be very useful for data-scientists and ML practitioners who want to develop and deploy end-to-end data-science process on Spark clusters.

Description

The walkthrough consists of several topics:

  1. Overview and setup
  2. Data exploration and modeling
  3. Advanced modeling
  4. Scoring and consumption

Overview and Setup

This topic provides instructions on how to provision a HDInsight Spark 1.6 cluster and execute code using the pySpark kernel available in the Jypyter notebooks.


Figure 1: Using Jupyter notebooks on Azure HDInsight Spark clusters through web browser. You can
upload notebooks from GitHub directly to the cluster notebook server and run code in the pySpark kernel.

One can upload the published pySpark notebooks directly from GitHub to the Jupyter notebook server on the Spark cluster and start running the code. The notebooks are available from:

  1. Exploration and modeling
  2. Advanced modeling
  3. Model consumption

Exploration and Modeling

This topic shows the use of HDInsight Spark to perform data exploration, and to create binary classification and regression models using a sample of the NYC taxi trip and fare 2013 dataset. It walks you through the conventional steps of an end-to-end Data Science Process, including:

  1. Data ingestion
  2. Data pre-processing and exploration (including visualization using Jupyter’s auto-visualization features, as well as, Python’s matplotlib functions)
  3. Creation of features (using MLlib’s featurizing and encoding functions, as well as, Spark’s SQL API)
  4. Model creation and performance evaluation using MLlib’s functions
  5. Model deployment and consumption


Figure 2: Data visualization. Taxi-tip amount histogram (left), and fare vs. tip amount scatterplot (right).

We have shown how to build linear and tree-based models (random forest and boosting trees), and how to persist those models in Azure storage blobs so they can be used to score new data. We have also shown how to evaluate the accuracy of the models using various metrics such as AUC, R-sqr, RMSE etc.


Figure 3: Evaluating accuracy of MLlib models. ROC curve from binary classification problem of predicting
whether a taxi trip was tipped or not (1/0) (left). Scatter plot and linear fit of actual ($, x-axis) vs. predicted
tip amount ($, y-axis) from the regression problem of predicting the tip amount for taxi trips(right).

Advanced Modeling

In this topic, we demonstrate how to train models using cross-validation and hyper-parameter sweeping. We use custom code, as well as, MLlib’s CrossValidator function. This approach is conventionally used to create models with optimized hyper-parameters that are likely to produce best accuracy in test-sets. Parameter optimization is shown with both linear and tree-based models. The code is generalizable and can be easily adopted for other data-sets or algorithms.

Code sample below: Hyper-parameter sweeping with cross-validation to train a model with highest average accuracy in held-out data. Examples shown below are with regularized Logistic Regressions.

## CROSS-VALIDATION WITH PARAMETER SWEEPING USING MLLIB’S CROSSVALIDATOR FUNCTION ##
# DEFINE ALGORITHM / MODEL
lr = LogisticRegression()
# DEFINE A GRID OF PARAMETERS
paramGrid = ParamGridBuilder().addGrid(lr.regParam, (0.01, 0.1))\
 .addGrid(lr.maxIter, (5, 10))\
 .addGrid(lr.tol, (1e-4, 1e-5))\
 .addGrid(lr.elasticNetParam, (0.25,0.75))\
 .build()
 # DEFINE CV WITH PARAMETER SWEEP
cv = CrossValidator(estimator= lr, estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(), numFolds=3)
# TRAIN WITH CROSS-VALIDATION
trainDataFrame = sqlContext.createDataFrame(oneHotTRAINbinary, [“features”, “label”])
cv_model = cv.fit(trainDataFrame)
 ## GENERIC CODE FOR CROSS-VALIDATION WITH PARAMETER SWEEPING ##
# CREATE PARAMETER GRID FOR LOGISTIC REGRESSION PARAMETER SWEEP
grid = [{‘regParam’: [0.01, 0.1], ‘iterations’: [5, 10], ‘regType’: [“l1”, “l2”], ‘tolerance’: [1e-3, 1e-4]}]
paramGrid = list(ParameterGrid(grid))
numModels = len(paramGrid)
# SET NUM FOLDS AND NUM PARAMETER SETS TO SWEEP ON
nFolds = 3;
h = 1.0 / nFolds;
metricSum = np.zeros(numModels);
# PERFORM CV WITH PARAMETER SWEEP WITH FOR-LOOPS
for i in range(nFolds):

# Create training and x-validation sets
validateLB = i * h
validateUB = (i + 1) * h
condition = (trainData[“rand”] >= validateLB) & (trainData[“rand”] < validateUB)
validation = trainData.filter(condition)
# Create LabeledPoints from data-frames
if i > 0:
trainCVLabPt.unpersist()
validationLabPt.unpersist()
trainCV = trainData.filter(~condition)
trainCVLabPt = trainCV.map(parseRowOneHotBinary)
trainCVLabPt.cache()
validationLabPt = validation.map(parseRowOneHotBinary)
validationLabPt.cache()
# For parameter sets compute metrics from x-validation
for j in range(numModels):

regt = paramGrid[j][‘regType’]
regp = paramGrid[j][‘regParam’]
iters = paramGrid[j][‘iterations’]
tol = paramGrid[j][‘tolerance’]
# Train logistic regression model with hypermarameter set
  model = LogisticRegressionWithLBFGS.train(trainCVLabPt,regType=regt,iterations=iters,regParam=regp, tolerance = tol, intercept=True)
predictionAndLabels = validationLabPt.map(lambda lp:(float(model.predict(lp.features)), lp.label))
# Use ROC-AUC as accuracy metrics
validMetrics = BinaryClassificationMetrics(predictionAndLabels)
metric = validMetrics.areaUnderROC
metricSum[j] += metric

 # Get parameters with best accuracy during cross-validation
avgAcc = metricSum / nFolds;
bestParam = paramGrid[np.argmax(avgAcc)];

Scoring and Consumption

Here, we show how to save a trained Spark MLlib model in Azure blob and load it to score new data-sets. Spark provides a mechanism to remotely submit jobs or interactive queries through the Livy REST interface. One can use Livy to remotely submit a job that batch scores a file that is stored in an Azure blob and then write the results to another blob. To do this, the Python script from GitHub can be uploaded to the blob of the Spark cluster. Once uploaded, this script can be run within the Spark cluster. It will load the model and run predictions on input files based on the model.

We also describe how one can operationalize this process and schedule scoring at certain time intervals. This can be easily done, for example, using tools available in Azure stack, such as Logic Apps.


Figure 4: Operationalizing Spark MLlib model, using a combination of
HDInsight Spark cluster, Azure storage, Livy server, and Azure apps.

Summary

When it comes to scalable data analysis and ML, data scientists frequently blocked or hindered by issues, such as, the limitations of available algorithms to handle large data-sets efficiently, access to or knowledge about the appropriate infrastructure, and ability to produce operationalized models that can be consumed easily in production. This walkthrough addresses these issues using Spark’s ML API, HDInsight, and tools for operationalization. Although this walkthrough uses a sampled data-set so it can be tested and executed quickly, the same code can be adopted to run on larger data-sets using clusters of appropriate.

Debraj
@d_guhathakurta

 

References

Microsoft Cloud Data Science Process: http://aka/ms/datascienceprocess