Machine Learning with Spark (2/5) - logistic regression

Scala
2/5

Machine Learning with Spark

Logistic Regression

Prerequisites:

  • spark 2.0.0 or higher, preferable with pre-built hadoop. Download link
  • scala 2.11.8 or higher. Download link

This is a generic How To on Logistic Regression with Spark.

The following tutorial will be performed entirely in the spark-shell, although it is absolutely possible to wrap up everything in a function and run it as a compiled object (See this Scala tutorial).

The first part in this tutorial is similar with the linear regression tutorial and as always we will be working directly into the spark-shell environment.

Make sure you reduce the verbosity of the log with sc.setLogLevel("ERROR").

We will start with importing all the libraries needed throughout the tutorial.

import org.apache.spark.ml.classification.{LogisticRegression => LogR}
import org.apache.spark.ml.feature.{VectorAssembler => VA, StringIndexer => SI, OneHotEncoder => OHE, VectorIndexer}
import org.apache.spark.ml.{Pipeline => PL}
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator => MCE}
import org.apache.spark.ml.linalg.{Matrix => MX, Matrices => MS, Vectors}

Next, we need to load the data and cache it for faster access.

val file_name = "/path/to/local/file.csv"
val df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_name)
df.cache 

This time we'll assume that our data is numerical as well as categorical. Let's say that one of the feature columns called featureA is categorical, representing the gender of an observation. The other columns, featureB and featureC are all numerical as well as the label column.

val new_df = df.select(df("label"), $"featureA", $"featureB", $"featureC")

When we are dealing with categorical data, we need to format the data in such a way the the machine will understand. In this particular case, we need to add two more components to our feature vector representing the genders.

If one observation's gender is male, then we will set the first component to 1 and the second component to 0. In case the observation's gender is female we will set the first component to 0 and the second component to 1.

This can be achieved with Spark by using the StringIndexer and OneHotEncoder libraries.

val genIndexer = new SI().setInputCol("featureA").setOutputCol("featureAindex")
val genEncoder = new OHE().setInputCol("featureAindex").setOutputCol("featureAencoded")

Next step is to create a vector assembler from all the features.

val df_vector = new VA().setInputCols(Array("featureAencoded", "featureB", "featureC")).setOutputCol("features")

Once the vector assembler was created, we need to build the pipeline for our logistic regression model.

val logReg = new LogR()
val pipeline = new PL().setStages(Array(genIndexer, genEncoder, df_vector, logReg))

We will split the new data set into a training and test data sets and fit the model.

val model = pipeline.fit(training)
val results = model.transform(test)

That's all there is to the training and testing of the model. Now it's time to evaluate the model.
Unfortunately, the model evaluation libraries are still in experimental stage and the results should be taken with a grain of skepticism.

val eval = new MCE().setLabelCol("label").setPredictionCol("prediction")

println(s"Accuracy: ${eval.setMetricName("accuracy").evaluate(results)}")
println(s"Precision: ${eval.setMetricName("weightedPrecision").evaluate(results)}")
println(s"Recall: ${eval.setMetricName("weightedRecall").evaluate(results)}")
println(s"F1: ${eval.setMetricName("f1").evaluate(results)}")

To my knowledge, at the time of this writing, there is no simple way of building the confusion matrix using only the ml libraries, so we will try to make it manually.

val TP = results.select("label", "prediction").filter("label = 0 and prediction = 0").count
val TN = results.select("label", "prediction").filter("label = 1 and prediction = 1").count
val FP = results.select("label", "prediction").filter("label = 0 and prediction = 1").count
val FN = results.select("label", "prediction").filter("label = 1 and prediction = 0").count
val total = results.select("label").count.toDouble

val confusion: MX = MS.dense(2, 2, Array(TP, FN, FP, TN))

val accuracy	= (TP + TN) / total
val precision   = (TP + FP) / total
val recall      = (TP + FN) / total
val F1		= 2/(1/precision + 1/recall)

It is possible however to use the old (probably soon to be deprecated) mllib library org.apache.spark.mllib.evaluation.MulticlassMetrics for evaluation. However, the dataframe needs to be transformed into an RDD.

import org.apache.spark.mllib.evaluation.{MulticlassMetrics => MM }

val eval_rdd =  results.select($"prediction",$"label").as[(Double, Double)].rdd
val eval = new MM(eval_rdd)

println(eval.confusionMatrix)

That's all about logistic regression with Spark.


[+] Useful links
  • [Download Spark](https://spark.apache.org/downloads.html)
  • [Machine Learning Guide](http://spark.apache.org/docs/latest/ml-guide.html)
  • [Logistic Regression with Spark](https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression)
  • [Part 1/5 Machine Learning with Spark](https://blog.epigno.systems/2017/01/01/machine-learning-with-spark-1/)

  • > ==Disclaimer==: This is by no means an original work it is merely meant to serve as a compilation of thoughts, code snippets and teachings from different sources.