Notebook: Apache Spark thoughts

""

Posted by Ping on February 1, 2021

This is a notebook of some thoughts when I played with Spark, thought this might be a good way to share.

GroupByKey VS ReduceByKey VS AggregateByKey VS CombineByKey

  • GroupByKey: group the dataset based on keys, resulting shuffling and thus heavy network communication. (Try to avoid)

  • ReduceByKey: datasets are combined on each partition, ONLY one output per key on each partition is sent over network. (shuffling less than GroupByKey, use instead of AggregateByKey when input and output are same type.)

  • CombineByKey: similar to AggregateByKey, it asks for 3 parameters(functions), createCombiner, mergeValue, mergeCombiners, with the exception that the 1st parameters is a function to create the initial value for the accumulator on the key. The process works as follows: the CombineByKey goes through every element in a partition, if the key has not seen before, it calls the createCombiner to create the initial value for the accumulator for that key, otherwise if the key has been seen before, it calls mergeValue instead, with the current value for the accumulator and the new value.Once above work finished on all partitions independently, the mergeCombiner merges the results from each partition, when two or more partitions have an accumulator for the same key, then they are merged.

  • AggregateByKey: combined elements are of different type than the returned type. It requires an initial value as an accumulator, a sequential function to be performed in every partition, a combiner operation to be performed across different partitions. (Use when aggregation + input & output RDDs have different types)

Partitioning

The core idea is to spread the data evenly across all partitions.

  • Hash partitioning For pair RDD:
    val purchase = purchaseRdd.map(p => (p.customerId, p.price)).groupByKey()

The partition p is calculated:
p = k.hashcode() % nrPartitions

  • Range partitioning For RDDs which have an ordering keys defined, range partitioning might be more efficient to use.

Actually some functions use the RangePartitioner under the hood, for example, sortByKey(), check Spark source code below:

1
2
3
4
5
6
7
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

Partitioned RDDs or Dataframes should be persisted, otherwise the repartitioning is repeatedly applied each time the partitioned RDDs or dataframes is used.