Overview

In post Apache Spark. Application concepts we went over some basic but core concepts associated with Spark. In this post, we will introduce the most basic abstraction in Spark namely the RDD (or the Resilient Distributed Dataset) [2]. Although, modern applications most likely will be using the DataFrame and/or DataSet APIs, still the RDD data structure is what lies underneath the latter two and therefore always useful to know. Moreover, in this post we will see how to create a Spark RDD within a Scala application. As we will see, there are various methods to create an RDD in Spark. The following example is taken for Spark by {Examples}.

You can find the example snippets at Computational Statistics with Scala.

The RDD abstraction

The RDD is perhaps the most basic abstraction in Spark. An RDD is an immutable collection of objects that can be distributed across a cluster of computers. An RDD collection is divided into a number of partitions so that each node on a Spark cluster can independently perform computations. There are three concepts associated with an RDD [2]:

  • Dependencies
  • Partitions
  • Compute function

Partitions provide the ability to split the work and therefore to parallelize computation across executors. The compute function produces the data that will be stored in the RDD. Finally the dependencies, inform Spark how an RDD is constructed. This allows for RDD resiliency as Spark, if needed, is able to recreate the RDD from the dependencies [2].

Now that we have a very simplified overview of what an RDD is, let's how we can create one.

Create Spark RDD with Scala

There are two main methods available in Spark to create an RDD:

  • SparkContext.parallelize method
  • Read from a file

The first method is illustrated in the code listing example below

package train.spark

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object CreateRDD {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Hello Spark RDD")
    val sc = new SparkContext(conf)

    val data = Array(1,2,3,4,5,6,7,8,9,10)
    val rdd = sc.parallelize(data)
    rdd.foreach(println)

    println("Number of Partitions: "+rdd.getNumPartitions)
    println("Action: First element: "+rdd.first()) 
  }
}

Running the application produces something like the following

3
6
1
8
9
2
7
4
5
10
Number of Partitions: 4
Action: First element: 1

Note the the output may be different as it depends on which thread is accessing the standard output first. Note that the application above has to create a SparkContext first before we are able to create an RDD.


Remark

Creating a SparkContext is not necessary when we use the Spark shell as one such object is already created for us.


The second method is to read a file from disk. This is also shown in the snippet below.

package train.spark

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object CreateRDDFile {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Hello Spark RDD")
    val sc = new SparkContext(conf)

    // Should be some file on your system
    val csvFile = "/home/alex/qi3/learn_scala/scripts/spark/data/train.csv" 
    val csvRDD = sc.textFile(csvFile)

    println("Number of Partitions: "+csvRDD.getNumPartitions)

    // prints the header of the file
    println("Action: First element: "+csvRDD.first()) 
  }
}

Upon executing this code, we get

Number of Partitions: 2
Action: First element: #Duplicate: 0, Delete: 1, Normal-1: 2, TUF: 3, Normal-2: 4

However, we are interested in converting the contents of the file into floating point numbers so that we can feed them to a machine learning algorithm. We can do this as follows. we can use the map() function to convert the RDD[String] into an RDD[Array[Double]]

val doubleRDD = csvRDD.map(line => {line.split(",")})
                      .map( arrString => {Try(Array(arrString(0).toDouble, arrString(1).toDouble,                                                                   arrString(2).toDouble))})
                      .map(_ match {case Success(res) => res
                                         case Failure(res) => Array(-100, -100, -100)})

We can also use a schema in order to let Spark know the type of the data but this requires that we use a DataFrame instead and not an RDD.

Note also that Spark divides by default data into two partitions and distributes them across a cluster. The number of partitions can be specified while creating an RDD as shown below.

object CreateRDDFile {
  def main(args: Array[String]) {

    ...

    // Should be some file on your system
    val csvFile = "/home/alex/qi3/learn_scala/scripts/spark/data/train.csv" 
    val csvRDD = sc.textFile(csvFile, 4)

    ...
  }
}

Other methods

As an aside, we can create an RDD by using the following also:

  • JDBC
  • Cassandra
  • HBase
  • Elasticsearch

Transformations and actions

In Apache Spark. Application concepts we introduced the two types of operations one can apply on an RDD namely transformations and actions [1]. You can find more information on these two operations in RDD Programming Guide. Below we just give a brief overview of what each operation entails.

Transformations

Transformations in Spark transform a DataFrame into a new one. This is done without altering the original data. Hence a transformation is an immutable operation as far as the original data is concerned. Some examples of transformations are listed below [3]

  • map(function): It returns a new data set by operating on each element of the source RDD.
  • flatMap(function): Similar to map, but each item can be mapped to zero, one, or more items.
  • mapPartitions(function): Similar to map, but works on the partition level.
  • mapPartitionsWithIndex(function): Similar to mapPartitions, but provides a function with an Int value to indicate the index position of the partition.

  • filter(function): It returns a new RDD that contains only elements that satisfy the predicate.

  • union(otherDataset): It returns a new data set that contains the elements of the source RDD and the otherDataset RDD. Note that the participating RDDs should be of the same data type.

  • intersection(otherDataset): It returns a new data set that contains the intersection of elements from the source RDD and the argument RDD.

Actions

An action triggers the lazy evaluation of all the recorded transformations [1]. A list of actions is given below [3].

  • collect(): Returns all the elements of the data set are returned as an array to the driver program.
  • count(): Returns the number of elements in the data set.
  • reduce(function): It returns a data set by aggregating the elements of the RDD it is applied on. The aggregation is done by using the user provided function argument. The function should take two arguments and returns a single argument. Moreover it should be commutative and associative so that it can be operated in parallel.

  • first(): Returns the first element in the data set.

  • take(n): Returns the first n elements in the data set as an array.
  • takeOrdered(n, [ordering]): Return the first n elements of the RDD using either their natural order or a custom comparator.
  • takeSample(withReplacement, num, [seed]): Returns an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
  • saveAsTextFile(path): Write the elements of the RDD as a text file in the local file system, HDFS, or any another supported storage system.
  • foreach(function): Applies the function argument on each element in the RDD.

Summary

In this post we saw how to create an RDD in Spark application. Specifically, there are two ways to do so; using the SparkContext.parallelize function and reading from a file. RDDs are now considered as low level Spark programming. Moreover, we reiterated over the concept of transformations and actions and we saw some non-exclusive examples of both.

References

  1. RDD Programming Guide
  2. Jules S. Damji, Brooke Wenig, Tathagata Das, Deny Lee, Learning Spark. Lighting-fasts data analytics, 2nd Edition, O'Reilly.
  3. Subhashini Chellappan, Dharanitharan Ganesan, Practical Apache Spark. Using the Scala API, Apress