Apache Spark. Create a DataFrame with Scala
Create a DataFrame with Scala
In the post Apache Spark. Create an RDD with Scala we saw how to create an RDD within a Scala Spark application. RDDs however are considered as low level. In this post, we will review how to create a DataFrame
in Spark using the Scala API.
You can find the example snippets at Computational Statistics with Scala.
The Spark DataFrame
is inspired by the equivalent pandas DataFrame
. A DataFrame
in Spark is like a distributed in-memory table with named columns and schemas [1]. Similar to an RDD, a DataFrame
is also immutable and Spark keeps a lineage of all the transformations. Thus, when we add or change the names and types of the columns, a new DataFrame
is actually created.
A DataFrame
is a structured data container. Given this, a number of advantages exist such as better performance and space efficiencies. The DataFrame
is one of the high-level structured APIs and it is built on top of Spark SQL engine [1].
The following code snippet shows how to create a DataFrame
in Spark
package train.spark
import org.apache.spark.sql.SparkSession
object CreateDataFrame {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark DataFrame Demo")
.getOrCreate()
val csvFile = "/home/alex/qi3/learn_scala/scripts/spark/data/train.csv"
val df = spark.read.csv(csvFile)
// print the schema
df.printSchema()
}
}
The output is
21/10/15 15:22:10 WARN Utils: Your hostname, LT-2R0620-101 resolves to a loopback address: 127.0.1.1; using 192.168.0.71 instead (on interface wlp58s0)
21/10/15 15:22:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alex/MySoftware/spark-3.0.1-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/10/15 15:22:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
|-- _c4: string (nullable = true)
In the previous example, we saw how to create a DataFrame
from a CSV file. However, in the returned DataFrame
all columns are of type string which is not actual the case. One neat way to fix this is by declaring the underlying schema to be used. A schema in Spark defines the column names and the associated data types for a DataFrame
[1]. There are certain advantages that we get when we define the schema up front [1]:
- Spark does not have to infer the data types
- We can detect errors early if the data doesn't match the specified schema
- Spark does not have to create an extra job in order to read a portion of the file to ascertain the schema
Let's see how we can specify a schema. We have two way for doing so [1]:
- Define it programmatically
- Use a data definition language or DDL
The code snippet below shows the first approach.
package train.spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object CreateDataFrame {
def main(args: Array[String]) {
val csvFile = "/home/alex/qi3/learn_scala/scripts/spark/data/train.csv"
val appName: String = "Spark DataFrame Demo"
val conf = new SparkConf().setAppName(appName)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val customSchema = StructType(Array(
StructField("mu-1", DoubleType, false),
StructField("mu-2", DoubleType, false),
StructField("label", IntegerType, false)))
// specify a schema
val df_schema = sqlContext.read.format("csv")
.option("delimiter",",")
.schema(customSchema)
.load(csvFile)
df_schema.printSchema()
df_schema.groupBy("label").count().show()
df_schema.show(5)
}
}
root
|-- mu-1: double (nullable = true)
|-- mu-2: double (nullable = true)
|-- label: integer (nullable = true)
+-----+-----+
|label|count|
+-----+-----+
| null| 1|
| 1| 185|
| 3| 185|
| 4| 185|
| 2| 185|
| 0| 185|
+-----+-----+
+-----+-----+-----+
| mu-1| mu-2|label|
+-----+-----+-----+
| null| null| null|
|22.91|28.54| 0|
|17.26|30.72| 0|
|17.05|31.08| 0|
|24.05|26.27| 0|
+-----+-----+-----+
only showing top 5 rows
- Jules S. Damji, Brooke Wenig, Tathagata Das, Denny Lee,
Learning Spark. Lighting-fast data analytics
, O'Reilly, 2nd Edition.