Machine Learning with PySpark - Linear Regression

PySpark

Machine Learning with PySpark

Linear Regression

Prerequisites:

This is a very simple example on how to use PySpark and Spark pipelines for linear regression. We'll be using a real example, but these steps can be generalized for similar datasets.

For a generic Spark & Scala linear regression "how to", see my earlier blog post.

  1. Create a cluster with the following settings:
  • Databricks Runtime Version: 3.4
  • Spark version: 2.2.0
  • Scala version: 2.11

You can name your cluster whatever you want.

PySpark
  1. From the home button:
  • create a new notebook
  • make sure the language is Python
  • and the cluster is the one create at the previous step
PySpark

Now we are all set, let's get right into the coding.

Here is the Data information as described on the UC Irvine site. The dataset contains 9568 data points collected from a Combined Cycle Power Plant over 6 years (2006-2011). Features consist of hourly average ambient variables.

  • Temperature (T) in the range 1.81C and 37.11C,
  • Ambient Pressure (AP) in the range 992.89-1033.30 milibar,
  • Relative Humidity (RH) in the range 25.56% to 100.16%
  • Exhaust Vacuum (V) in teh range 25.36-81.56 cm Hg
  • Net hourly electrical energy output (EP) 420.26-495.76 MW

The averages are taken from various sensors located around the plant that record the ambient variables every second. The variables are given without normalization.

The original headers were renamed as below:

HeadersRenamed
Ttemperature
Vexhaust_vacuum
APambient_pressure
RHrelative_humidity
EPenergy_output
Our goal is to predict the `energy_output` (label) based on the other four features.
  1. We will import the necessary libraries:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
  1. Next, we have to upload the dataset file. From the Data button on the left menu bar, chose the + sign next to Tables, then drop (or browse) the dataset files in the designated box. Remember the path, you will need that.
PySpark
  1. Now let's get back to our notebook and set a variable with the path that we got from the previous step. It will be something like below.
file_name = "/FileStore/tables/6zm535q61494044083775/data.csv"
  1. We will load the file into a dataframe.
data = spark.read.options(header='true', inferschema='true', delimiter=',').csv(file_name)
data.cache()
  1. Let's do some simple exploratory data analysis (EDA). This is very shallow. Normally you would have to check for missing data, decide how to fill in that, plot your data and see how it looks like etc....
display(data.describe())

As a result, you will see something like:

PySpark
  1. The column named energy_output needs to be renamed to label. If you fail to do this step, later during the model fitting an error that says IllegalArgumentException: u'Field "label" does not exist.' will be raised. So let's do that now.
features = ["temperature", "exhaust_vacuum", "ambient_pressure", "relative_humidity"]
lr_data = data.select(col("energy_output").alias("label"), *features)
lr_data.printSchema()

You will be able to see the schema of our current dataframe.

PySpark
  1. Now that the data is all set, let's split it into training and test. Proportions are up for debate. I'll be using a 70%, 30% split.
(training, test) = lr_data.randomSplit([.7, .3])
  1. This step is the beautiful part about Spark. You can build your ML pipeline and Spark will do all the heavy lifting for you.
    This is how things work in our case:
  • we put all features into a vector
  • since we are dealing with numerical data, we scale those features
  • we chose the algorithm (in our case is linear regression)
  • and we create the pipeline with the steps and in the order mentioned above
vectorAssembler = VectorAssembler(inputCols=features, outputCol="unscaled_features")
standardScaler = StandardScaler(inputCol="unscaled_features", outputCol="features")
lr = LinearRegression(maxIter=10, regParam=.01)

stages = [vectorAssembler, standardScaler, lr]
pipeline = Pipeline(stages=stages)

One thing to keep in mind is that the output column of the previous stage, should be the input column for the next stage.

  1. Now let's fit the training data and transform the test data
model = pipeline.fit(training)
prediction = model.transform(test)
  1. And we are done (sort of). Let's see how the prediction dataframe looks like:
prediction.show()

  1. And evaluate how well is our model doing. Our metrics will be RMSE, MSE, MAE and R^2.
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(prediction)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(prediction, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(prediction, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(prediction, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

The result is something like this:

Metric#
RMSE4.501
MSE20.262
MAE3.627
R^20.929

That's it!


> ==Disclaimer==: This is by no means an original work it is merely meant to serve as a compilation of thoughts, code snippets and teachings from different sources.