Overview

In the post Apache Spark. Create an RDD with Scala we saw how to create an RDD within a Scala Spark application. RDDs however are considered as low level. In this post, we will review how to create a DataFrame in Spark using the Scala API.

You can find the example snippets at Computational Statistics with Scala.

Create a DataFrame with Scala

The Spark DataFrame is inspired by the equivalent pandas DataFrame. A DataFrame in Spark is like a distributed in-memory table with named columns and schemas [1]. Similar to an RDD, a DataFrame is also immutable and Spark keeps a lineage of all the transformations. Thus, when we add or change the names and types of the columns, a new DataFrame is actually created.

A DataFrame is a structured data container. Given this, a number of advantages exist such as better performance and space efficiencies. The DataFrame is one of the high-level structured APIs and it is built on top of Spark SQL engine [1].

The following code snippet shows how to create a DataFrame in Spark

package train.spark

import org.apache.spark.sql.SparkSession

object CreateDataFrame {


  def main(args: Array[String]) {

    val spark = SparkSession
    .builder()
    .appName("Spark DataFrame Demo")
    .getOrCreate()

    val csvFile = "/home/alex/qi3/learn_scala/scripts/spark/data/train.csv" 
    val df = spark.read.csv(csvFile)

    // print the schema
    df.printSchema()

  }
}

The output is

21/10/15 15:22:10 WARN Utils: Your hostname, LT-2R0620-101 resolves to a loopback address: 127.0.1.1; using 192.168.0.71 instead (on interface wlp58s0)
21/10/15 15:22:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alex/MySoftware/spark-3.0.1-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/10/15 15:22:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)

Specify a schema

In the previous example, we saw how to create a DataFrame from a CSV file. However, in the returned DataFrame all columns are of type string which is not actual the case. One neat way to fix this is by declaring the underlying schema to be used. A schema in Spark defines the column names and the associated data types for a DataFrame [1]. There are certain advantages that we get when we define the schema up front [1]:

  • Spark does not have to infer the data types
  • We can detect errors early if the data doesn't match the specified schema
  • Spark does not have to create an extra job in order to read a portion of the file to ascertain the schema

Let's see how we can specify a schema. We have two way for doing so [1]:

  • Define it programmatically
  • Use a data definition language or DDL

The code snippet below shows the first approach.

package train.spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf


object CreateDataFrame {


  def main(args: Array[String]) {

    val csvFile = "/home/alex/qi3/learn_scala/scripts/spark/data/train.csv" 
    val appName: String = "Spark DataFrame Demo"

    val conf = new SparkConf().setAppName(appName)
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val customSchema = StructType(Array(
            StructField("mu-1", DoubleType, false),
            StructField("mu-2", DoubleType, false),
            StructField("label", IntegerType, false)))

    // specify a schema
    val df_schema = sqlContext.read.format("csv")
                  .option("delimiter",",")
                  .schema(customSchema)
                  .load(csvFile)

    df_schema.printSchema() 

    df_schema.groupBy("label").count().show()
    df_schema.show(5)

  }
}
root
 |-- mu-1: double (nullable = true)
 |-- mu-2: double (nullable = true)
 |-- label: integer (nullable = true)

+-----+-----+
|label|count|
+-----+-----+
| null|    1|
|    1|  185|
|    3|  185|
|    4|  185|
|    2|  185|
|    0|  185|
+-----+-----+

+-----+-----+-----+
| mu-1| mu-2|label|
+-----+-----+-----+
| null| null| null|
|22.91|28.54|    0|
|17.26|30.72|    0|
|17.05|31.08|    0|
|24.05|26.27|    0|
+-----+-----+-----+
only showing top 5 rows

Summary

References

  1. Jules S. Damji, Brooke Wenig, Tathagata Das, Denny Lee, Learning Spark. Lighting-fast data analytics, O'Reilly, 2nd Edition.