Reviewing Apache Spark: Part 2

In the last post we saw higher level view of whats SparkContext, RDD and how spark parallelizes data. We have little idea of how collect() of RDD should work, let's look further how

sc.parallelize(1 to 10, 2).collect()  

How Collect() Works

  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

So the signature is simple, it returns an Array of all the elements in RDD.
one line 1 we are calling withScope which we have already seen in the last post
on line 2 SparkContext's runJob method is doing everything for us here, it returns a Array of Array of elements Array[Array[T]], we just concatenate them and return the output.

runJob

Let's now dive into what runJob is doing.
runJob runs a job on all partition iterators and returns Array with result. To the method we pass the RDD, func that converts an Iterator to some other type. In runJob we clean the function, and all another runJob.

  • Clean function checks whether closure is serializable or not. In our case shouldn't be doing much. Let's take a deeper look into how/what its doing.
val results = new Array[U](partitions.size)  
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)  

we create a results array which will keep an resultant element for each partition.

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

In our example the func: (TaskContext, Iterator[T]) ignores the taskContext, and simply returns an Array of Elements of partition.
partitions is just a sequence of Range of elements from 0 to number Of partitions.

  • We first initialize the callSite variable in which we get the last function not outside of spark package.
  • then we call the runJob function of dagScheduler, which handles all job Scheduling.
  • progressBar is for showing progress in console.
  • the we do checkpointing which I have little idea about, but need to take a look.

Next we'll take a look into DagScheduler and how and what is checkpointing.