Machine Learning with Spark (5/5) - deploying a model

Scala
5/5

Machine Learning with Spark

Deploying a model in production

Prerequisites:

  • spark 2.0.0 or higher, preferable with pre-built hadoop. Download link
  • scala 2.11.8 or higher. Download link

In the last post of the ML with Spark series, we will focus on how to deploy into production a model. We will be using open data, publicly made available on UC Irvine machine learning repository.

In this particular example, we will employ a simple linear regression model to predict the amount of energy output of a power plant. The dataset used for this analysis can be downloaded from here. The dataset contains 9568 data points collected from a Combined Cycle Power Plant over 6 years (2006-2011).

Data information as described on the site above:
Features consist of hourly average ambient variables

  • Temperature (T) in the range 1.81C and 37.11C,
  • Ambient Pressure (AP) in the range 992.89-1033.30 milibar,
  • Relative Humidity (RH) in the range 25.56% to 100.16%
  • Exhaust Vacuum (V) in the range 25.36-81.56 cm Hg
  • Energy output (EP) 420.26-495.76 MW

The averages are taken from various sensors located around the plant that record the ambient variables every second. The variables are given without normalization. This means that in our data pipeline we may need to add a normalization step.

The original headers were renamed as below:

  • T -> temperature
  • V -> exhaust_vacuum
  • AP -> ambient_pressure
  • RH -> relative_humidity
  • EP -> energy_output

Our goal is to predict the energy_output (or the label) based on the other four features.

First let's import the necessary libraries (quite some). We'll be using MLeap for our production deployment solution.

import ml.combust.bundle.{BundleFile => BF}
import ml.combust.bundle.serializer.SerializationFormat
import ml.combust.mleap.spark.SparkSupport._

import org.apache.spark.sql.{DataFrame, SparkSession => SS}
import org.apache.spark.ml.{PipelineStage, Pipeline => P, PipelineModel => PM}
import org.apache.spark.ml.bundle.{SparkBundleContext => SBC}
import org.apache.spark.ml.feature.{VectorAssembler => VA, StandardScaler => SSc}
import org.apache.spark.ml.regression.{LinearRegression => LR}

And put the csv file into a lazy variable. Make sure you substitute the /path/... with the correct one. This is the csv file that you downloaded from the UC Irvine repository.

lazy val file_name = "/path/to/the/data.csv"

Now it is time to create a Spark context and load the data.

val spark = SS.builder().getOrCreate()
val data = spark
     .read
     .option("header", "true")        // our csv has a header
     .option("inferSchema", "true")   // we want spark to guess the type of data
     .format("csv")                   // the format of our file
     .load(file_name)                 // the file itself

data.cache() // the data is not that big, so let's cache it

Let's see how the schema looks like and do some exploratory analysis.

data.printSchema()

root
 |-- temperature: double (nullable = true)
 |-- exhaust_vacuum: double (nullable = true)
 |-- ambient_pressure: double (nullable = true)
 |-- relative_humidity: double (nullable = true)
 |-- energy_output: double (nullable = true)
display(data.describe())
summarytemperatureexhaust_vacuumambient_pressurerelative_humidityenergy_output
count95689568956895689568
mean19.65123118729154.3058037207361013.2590781772573.3089778428093454.365009406355
stddev7.4524732296110812.70789299832685.9387837058116414.60026875672917.0669949998034
min1.8125.36992.8925.56420.26
max37.1181.561033.3100.16495.76

Next we create an Array with the names of the features that we want to use in the model.

val features = Array("temperature", "exhaust_vacuum", "ambient_pressure", "relative_humidity")

We need to clean up a bit our data in case we have null values. In order to do that across the whole dataset, we have to create an Array of type Columns from our features and label headers.

val allCols = features.union(Seq("energy_output")).map(data.col)

Now we can create a null filter and apply it to the whole dataset in order to remove those rows without values.

val nullFilter = allCols.map(_.isNotNull).reduce(_ && _)

val dataFiltered = data.select(allCols: _*).filter(nullFilter).persist()

If you check again to see how your data looks like now, you will notice that it is the same as the above table. This means we didn't have any null values, but it is always good practice to think about what you want to do with the missing values: remove it all together, or replace them with some average or other heuristic approach.

dataFiltered.describe().show()

And now we reached the point whereby we need to build the pipeline for our machine learning. In this case we will use the whole data for our regression model. As shown in previous posts, normally, you would split the data into training and testing and validate several models (Random Forests, Linear Regression etc...). But for this example we will only stick to the Linear Regression.

We will need a transformer to combine all the features into a single vector. That can be achieved in spark using the VectorAssembler library. VectorAssembler APIs

Here are some examples for Scala, Java & Python. Details

And also, as mentioned at the beginning of the post, we need to scale (normalize) our data. For that we will be using StandardScalar library.

val featureAssembler = new VA(uid = "feature_assembler").setInputCols(features).setOutputCol("unscaled_features")
val featureScaler = new SSc(uid = "feature_scaler").setInputCol("unscaled_features").setOutputCol("scaled_features")

This pipeline is very simple, and that's all we need for our model. From here on, we shall assemble the pipeline and fit the model.

val estimators: Array[PipelineStage] = Array(featureAssembler, featureScaler)
val featurePipeline = new P(uid = "feature_pipeline").setStages(estimators)
val sparkFeaturePipelineModel = featurePipeline.fit(dataFiltered)

Here comes the model.

val linearRegression = new LR(uid = "linear_regression").
			setFeaturesCol("scaled_features").
			setLabelCol("energy_output").
			setPredictionCol("energy_prediction")

val linearRegressionModel = new P().setStages(Array(sparkFeaturePipelineModel, linearRegression)).fit(dataFiltered)

That's all for the model creation. It's deployment time with MLeap.

The following function will take as parameter the pipeline model that we created above, linearRegressionModel, and as optional parameter the path to save the model for production.

def export_pipeline(pipeline: PM, path: String = "/tmp/model.zip"): Unit = {
   val sbc = SBC()
   for(bf <- managed(BF(s"jar:file:$path"))) {
      pipeline.writeBundle.format(SerializationFormat.Json).save(bf)(sbc).get
     }
  }

The last line in our program will be:

export_pipeline(linearRegressionModel)

In the path where you chose to save the model, there will be a zip file whose contents should look like this:

Scala

This is basically your pipeline model with all its steps, transformed by MLeap in a bunch of JSON files.

In order to use this model in production and run predictions against it, you need to use MLeap REST server.

Follow those instructions and voila, you have a model rolled out in production. The nice thing is you can write your model in Python (scikit-learn) also, and deploy it in production using the same REST APIs.

The full code, along with Scala and PySpark notebooks ca be found on GitHub.

And in the end some useful links:


[+] Useful links
  • [Download Spark](https://spark.apache.org/downloads.html)
  • [Machine Learning Guide](http://spark.apache.org/docs/latest/ml-guide.html)
  • [15 hours of machine learning videos](https://www.r-bloggers.com/in-depth-introduction-to-machine-learning-in-15-hours-of-expert-videos/)
  • [Part 1/5 Machine Learning with Spark](https://blog.epigno.systems/2017/01/01/machine-learning-with-spark-1/)
  • [Part 2/5 Machine Learning with Spark](https://blog.epigno.systems/2017/01/03/machine-learning-with-spark-2/)
  • [Part 3/5 Machine Learning with Spark](https://blog.epigno.systems/2017/01/03/machine-learning-with-spark-3/)
  • [Part 4/5 Machine Learning with Spark](https://blog.epigno.systems/2017/01/04/machine-learning-with-spark-4/)
  • [Clustering with Spark](https://spark.apache.org/docs/latest/ml-clustering.html)

  • > ==Disclaimer==: This is by no means an original work it is merely meant to serve as a compilation of thoughts, code snippets and teachings from different sources.