In this post, we will review how to create a DataFrame
in Spark using the Scala API.
The Spark DataFrame
is inspired by the equivalent pandas DataFrame
. A DataFrame
is Spark is like a distributed in-memory table with named columns and schemas [1]. Similar to an RDD, a DataFrame
is also immutable and Spark keeps a lineage of all the transformations. Thus, when we add or change the names and types of the columns, a new DataFrame
is actually created.
package train.spark
import org.apache.spark.sql.SparkSession
object CreateDataFrame {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark DataFrame Demo")
.getOrCreate()
val csvFile = "/home/alex/qi3/learn_scala/scripts/spark/data/train.csv"
val df = spark.read.csv(csvFile)
// print the schema
df.printSchema()
}
}
The output is
21/10/15 15:22:10 WARN Utils: Your hostname, LT-2R0620-101 resolves to a loopback address: 127.0.1.1; using 192.168.0.71 instead (on interface wlp58s0)
21/10/15 15:22:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/alex/MySoftware/spark-3.0.1-bin-hadoop2.7/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/10/15 15:22:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
|-- _c4: string (nullable = true)
package train.spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object CreateDataFrame {
def main(args: Array[String]) {
val csvFile = "/home/alex/qi3/learn_scala/scripts/spark/data/train.csv"
val appName: String = "Spark DataFrame Demo"
val conf = new SparkConf().setAppName(appName)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val customSchema = StructType(Array(
StructField("mu-1", DoubleType, false),
StructField("mu-2", DoubleType, false),
StructField("label", IntegerType, false)))
// specify a schema
val df_schema = sqlContext.read.format("csv")
.option("delimiter",",")
.schema(customSchema)
.load(csvFile)
df_schema.printSchema()
df_schema.groupBy("label").count().show()
df_schema.show(5)
}
}
root
|-- mu-1: double (nullable = true)
|-- mu-2: double (nullable = true)
|-- label: integer (nullable = true)
+-----+-----+
|label|count|
+-----+-----+
| null| 1|
| 1| 185|
| 3| 185|
| 4| 185|
| 2| 185|
| 0| 185|
+-----+-----+
+-----+-----+-----+
| mu-1| mu-2|label|
+-----+-----+-----+
| null| null| null|
|22.91|28.54| 0|
|17.26|30.72| 0|
|17.05|31.08| 0|
|24.05|26.27| 0|
+-----+-----+-----+
only showing top 5 rows
Learning Spark. Lighting-fast data analytics
, O’Reilly, 2nd Edition.