Overview

In a previous post I developed a trivial Scala application that performs linear regression with only one feature. In this post, I want to go a bit further, I want to use Spark's MLlib to develop a linear regression model using two features this time.

Machine learning with Scala Spark linear regression

The first thing I need to do in order to use MLlib in my Scala application is to update the dependencies in the build.sbt script. These should now look as

libraryDependencies += "org.apache.spark" % "spark-core_2.12" % "3.0.1"
libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "3.0.1"
libraryDependencies += "org.apache.spark" % "spark-mllib_2.12" % "3.0.1"
package train.spark

import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.types.DoubleType


object LinearRegressionApp {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Linear regression Spark")
    val sc = new SparkContext(conf)

    val session = SparkSession.builder().appName("Linear regression Spark").master("local[4]").getOrCreate()

    // Should be some file on your system
    val csvFile = "/home/alex/qi3/spark_scala/data/spark_regression.csv" 
    val inputTrainigSet = session.read.format("csv").load(csvFile)

    println("Number of Partitions: "+inputTrainigSet.rdd.getNumPartitions)
    println("Action: First element: "+inputTrainigSet.rdd.first()) 

   val analysisData  = inputTrainigSet.withColumn("x1", inputTrainigSet("_c0").cast(DoubleType))
                                      .withColumn("x2", inputTrainigSet("_c1").cast(DoubleType))
                                      .withColumn("y",  inputTrainigSet("_c2").cast(DoubleType)) 
                                      .drop("_c0")
                                      .drop("_c1")
                                      .drop("_c2")


   //creating features column
   val assembler = new VectorAssembler()
            .setInputCols(Array("x1","x2"))
            .setOutputCol("features")

    // create the model
    val lr = new LinearRegression()
        .setMaxIter(10)
        .setRegParam(0.3)
        .setElasticNetParam(0.8)
        .setFeaturesCol("features")
        .setLabelCol("y")

   val trainigSet = assembler.transform(analysisData)

   // Fit the model
   val lrModel = lr.fit(trainigSet)

  // Print the coefficients and intercept for linear regression
  println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

  // Summarize the model over the training set and print out some metrics
  val trainingSummary = lrModel.summary

  println(s"numIterations: ${trainingSummary.totalIterations}")

  // there is sth wrong with my scala/spark version and this
  // throws an excpetion
  //println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")

  trainingSummary.residuals.show()
  println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
  println(s"r2: ${trainingSummary.r2}")
}
}
21/08/25 12:36:15 WARN Utils: Your hostname, LT-2R0620-101 resolves to a loopback address: 127.0.1.1; using 192.168.0.71 instead (on interface wlp58s0)
21/08/25 12:36:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alex/MySoftware/spark-3.0.1-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/08/25 12:36:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/08/25 12:36:17 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
Number of Partitions: 1
Action: First element: [0.0,4.0,4.357400305044133]
21/08/25 12:36:22 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/08/25 12:36:22 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
Coefficients: [1.2545846367230242,0.7527507820338242] Intercept: 1.305736977601481
numIterations: 3
+--------------------+
|           residuals|
+--------------------+
| 0.04066019930735543|
| -0.6631570819021908|
|  0.8844468485401586|
|-0.27725408848247746|
|   1.523792089069631|
|  0.9081058052618962|
|  0.6154843963633212|
| -1.5426210882366824|
|  -1.116750516169644|
| -0.5438006575317718|
|-0.41191237820348237|
|-0.10423573938951769|
| -0.7720329729420263|
| -0.5175509972153742|
|  0.5066514385552212|
| 0.28386941829179424|
| -1.7266735995448794|
| -0.7963013580643907|
| -0.8306208671329927|
| -0.7913153349720496|
+--------------------+
only showing top 20 rows

RMSE: 1.0241722775198268
r2: 0.8486882566011