Machine Learning with Spark (1/5) - linear regression

Scala
1/5

Machine Learning with Spark

Linear Regression

Prerequisites:

  • spark 2.0.0 or higher, preferable with pre-built hadoop. Download link
  • scala 2.11.8 or higher. Download link

This is a generic How To on Linear Regression with Spark.

The following tutorial will be performed entirely in the spark-shell, although it is absolutely possible to wrap up everything in a function and run it as a compiled object (See this Scala tutorial).

Open the Spark shell with the spark-shell command and let's make the logging output a little bit less verbose by running the command sc.setLogLevel("ERROR"). As always, the Spark context is available in the shell as sc, while the Spark session is called spark.

Let's assume that the file format we want to analyze is in csv format and it has a header. When we load the file, if we use the option("inferSchema", "true"), Spark will do its best at inferring the schema, based on each column's data.

Since we will be using frequently the newly created dataframe (let's call it df), we should also cache it for faster access.

scala> val file_name = "/path/to/local/file.csv"

scala> val df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_name)

scala> df.cache

We will need to import some libraries in order to perform our analysis. The LinearRegression library will be used for this particular example, while the VectorAssembler library will be used to transform our data set into a data format suitable for our machine learning algorithm.

For convenience, we've renamed each library so that we don't have to type its whole name when we call it.

import org.apache.spark.ml.regression.{LinearRegression => LR}
import org.apache.spark.ml.feature.{VectorAssembler => VA}

Another assumption is that all columns are numerical and there are no categorical values.
Let's say that we have three columns of numerical data representing the features (eg: featureA, featureB, featureC) and one column with the labels (eg: result).

We need now to create a new dataframe with two columns named label and features which is a transformation of our initial dataframe. The column features will be of type vector, while the column label can be any numeric type.

scala> val features = Array("featureA","featureB","featureC")

scala> val new_df = new VA().setInputCols(features).setOutputCol("features").transform(df).select($"result".as("label"), $"features")
new_df: org.apache.spark.sql.DataFrame = [label: double, features: vector]

As you can see, our new dataframe (new_df) has exactly two columns, one representing the labels and one representing the features.

Now it's time to fit our data into our model.

var lr = new LR()
var model = lr.fit(new_df)

Notice that we have not set any parameters for our model. Everything is default. To see an explanation of the parameters, run print(model.explainParams) from the command line. To check their values, run print(model.extractParamMap) .

Let's see a model summary. There are lots of methods that can be applied to the model summary, as shown below.

scala> val summary = model.summary
scala> summary.
coefficientStandardErrors   featuresCol         meanSquaredError   objectiveHistory   predictions   rootMeanSquaredError   
devianceResiduals           labelCol            model              pValues            r2            tValues                
explainedVariance           meanAbsoluteError   numInstances       predictionCol      residuals     totalIterations      

Normally, before fitting the data, we need to split the data set into training and testing chunks. The proportion is up for grabs, but in general around 70% training and 30% testing.

val Array(training, test) = df.randomSplit(Array(.7, .3), seed = 196)

The training and test data sets need to be transformed using the VectorAssembler as shown earlier.
After that we fit the model using the training data, and then apply the model to the test data.

var results = model.transform(test)

Once this step is done, we need to evaluate how well our model performed after training and testing. For that, we need to use the RegressionEvaluator library.

import org.apache.spark.ml.evaluation.{RegressionEvaluator => RE}

val eval = new RE().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
println(s"RMSE: ${eval.evaluate(results)}")

eval.setMetricName("mse")
println(s"MSE: ${eval.evaluate(results)}")

eval.setMetricName("mae")
println(s"MAE: ${eval.evaluate(results)}")

eval.setMetricName("r2")
println(s"R2: ${eval.evaluate(results)}")

Let's put everything in an Object

As we mentioned at the beginning of the blog, it is possible to wrap up everything in an object and load that object in the spark shell. Let's see how that works, based on the above example. Below is the code for LogisticRegressionObject.scala file.

import org.apache.log4j.{Level, Logger => L}
import org.apache.spark.ml.feature.{VectorAssembler => VA}
import org.apache.spark.ml.regression.{LinearRegression => LR}
import org.apache.spark.sql.{SparkSession => SS}


object LinearRegressionObject {

	L.getLogger("org").setLevel(Level.ERROR)

	def main(): Unit = {

		val spark = SS.builder().getOrCreate()

		val file_name = "/path/to/local/file.csv"
		val data = spark.read.option("header", "true").option("inferSchema", "true").format("csv").load(file_name)

		val features = Array("featureA","featureB","featureC")

		// creating a feature vector
		val new_df = new VA().setInputCols(features).setOutputCol("features").transform(df).select($"result".as("label"), $"features")

		// the LR model
		val lr = new LR().setMaxIter(20)
		val model = lr.fit(vector)

		// some results
		val summary = model.summary

		println(s"RMSE: ${summary.rootMeanSquaredError}")
		println(s"MSE: ${summary.meanSquaredError}")
		println(s"MAE: ${summary.meanAbsoluteError}")
		println(s"R2: ${summary.r2}")

		spark.stop()
	}
}

Next, fire up your spark-shell and load the object:

scala> :load /local/path/to/LinearRegressionObject.scala

If there are no errors, the return will be something like: defined object LinearRegressionObject. While still in the shell, run LinearRegressionObject.main in order to access the object. Now you should see the results of RMSE, MSE, MAE, R2.

And we are done!


[+] Useful links
  • [Download Spark](https://spark.apache.org/downloads.html)
  • [Machine Learning Guide](http://spark.apache.org/docs/latest/ml-guide.html)
  • [Linear Regression with Spark](https://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression)

  • > ==Disclaimer==: This is by no means an original work it is merely meant to serve as a compilation of thoughts, code snippets and teachings from different sources.