Reviewing Apache Spark: Part 1

Apache Spark™ is a fast and general engine for large-scale data processing. 1

Project Structure

Spark Stack

Code Structure

  • bin keeps all the executables for spark-submit, pyspark, spark-sql and spark-shell.
  • build contains executables for sbt and maven
  • common this contains some kind of networking code, implementation of algorithms like BloomFilter, CountMinSketch etc. And some code for unsafe operations, most likely the place for underlying implementation of off-heap storage.
  • core this is where code for core spark resides, and for this series this is the place we are interested in.
  • dev code used for releasing, merging code.
  • docs documentation
  • external this is where all the external integration lies for kafka, docker, flume.
  • launcher some kind of code for launching spark ?
  • project sbt config.
  • python
  • repl code for modifications to spark repl to make it work for spark.
  • sbin scripts for starting/stopping spark.
  • Spark Components sql, streaming, graphx, mllib
  • yarn code for running spark inside yarn.

Goal

The purpose of this series is getting started with Apache Spark's source code. Understanding how things are getting done internally. In the beginning I'm focussing on Spark Core, because thats the part which excites me the most. So we'll take a few examples in spark shell and try to take a look at code and make some changes to see how behaviour changes.

Local Setup for quick iterations

  • clone spark from https://github.com/apache/spark, cd spark
  • then ran build/sbt -Pyarn -Phadoop-2.7 this will start the sbt repl this will help us quickly make jars after making code changes by running assembly. These compiled files are used by spark-shell, which we will use to run code and see output logs.
  • Now I can quickly make code changes and see the effects. export SPARK_PREPEND_CLASSES=true and then start the spark REPL by running bin/spark-shell and set the logging level to INFO by running sc.setLogLevel("INFO") in REPL.

Get Started

I want to first start with a very simple example, All it will do is parallelize numbers between 1 to 10, and then collect them on driver node and return an array of them.

sc.parallelize(1 to 10, 2).collect()  

We want to investigate internally how this is done, what are all the components used by spark to perform this operation.

Cloc output for core subproject 86k lines of scala code, looks approachable.

SparkContext rough idea

  • Contains SparkConfig, Scheduler.
  • entry point for running spark jobs(runJob methods)
  • used to create RDDs, accumulator and doing broadcasting.

will come back to this sparkContext as this facilitates all the processing, scheduling.

Diving

Lets dive into SparkContext's parallelize method:

  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

I think withScope helps keep track of parent of rdd and other dependency information using RDDOperationScope and set using setLocalProperty on sparkContext.

private[spark] class RDDOperationScope(  
    val name: String,
    val parent: Option[RDDOperationScope] = None,
    val id: String = RDDOperationScope.nextScopeId().toString)

Here the name is our methods name, which is extracted from stackTrace. This is only done when its a children, and we choose either to ignore the parent's properties or mark it as parent.

what is an RDD

A Resilient Distributed Dataset is main data structure in Spark. Which can be give access to underlying collections in parallel. It contains

  • partitions in which data is stored
  • list of dependencies of this RDD, all parent RDDs.
  • all functions like map, flatMap, filter etc., and actions like collect etc. are implemented here.

Finally we create a ParallelCollectionRDD which extends RDD, and nothing gets materializes because spark is lazy, and only performs job when we call a action like collect on it.

ParallelCollectionRDD

Now lets take a quick look at ParallelCollectionRDD class, which implements these methods of RDD abstract class.

def getPartitions: Array[Partition]  

It returns an array of partition(in our case 2 partitions) by breaking input sequence into numOfSlices arrays. Some optimisations are done for Range and NumericRange.
This is the method that makes the RDD materialize. so while digging into collect() we should see this again.

def compute(s: Partition, context: TaskContext): Iterator[T]  

Input for this is partitions we create above, not sure what TaskContext does. after looking into TaskContextImpl, looks like it contains a things to do after a task is successful or fails.
It returns a iterator for this partition's sequence.

def getPreferredLocations(s: Partition): Seq[String]  

not sure what this does yet, getting preferred location for running this partition ?

ok so now we have a base ready to call operations on. Next will take a look into how collect() in implemented.