Close

Spark Columnar is a proof of concept project that uses Shapeless to optimize the in-memory data layout for RDDs in Spark.

The basic idea is that a user-facing RDD of tuples and/or case classes is backed by another RDD in which there is only one item per partition, that represents all the tuples and/or case classes for that partition. The compute method of the user-facing RDD transforms the one item in the partition of the backing RDD into many items, like a flatMap operation (very similar to MappedRDD). By making sure that any calls to persist and related operations on the user-facing RDD get routed to the backing RDD we can get Spark to persist these partitions with only one item each, which gives us full control over how the data is organized per partition inside that one item.

Arrays are an efficient in-memory storage format for primitives. So an efficient way to persist a user-facing RDD[(Int, Int)] would be to use a backing RDD that stores each partition as a single item which contains two arrays of ints. Something like this: RDD[(Array[Int], Array[Int])]. For one-off situations this can be coded up quickly. But it is not trivial to generalize this. This is where shapeless comes in, a library for generic programming in Scala. Shapeless has a data structure called hlist which is like a generalization of tuples, allowing us to write transformation like that of an RDD[(Int, Int)] to an RDD[(Array[Int], Array[Int])] while abstracting over arity and type. This works for tuples as well as case classes. Shapeless has its complexity (a lot of implicit typeclasses involved), but when it all works the usage is
as simple as this:

val x: RDD[(Int, Int)] = ...
val y: RDD[(Int, Int)] = ColumnarRDD(x)

 

At this point y should behave the same as x, but have a smaller footprint when cached/persisted.

Keep in mind that there is some serious overhead in this process. For tuples or case classes of size n, for each partition in the ColumnarRDD n arrays are constructed and filled up. After that for every iteration reading from the ColumnarRDD n array lookups are done and a tuple or case class is constructed on the fly. So we would not advise to use this unless the efficient in-memory representation is very important for the RDD. Also this is currently just a proof of concept that has not been tested yet in any real project/scenario.

In a test program where we created an RDD[(Int, Int, Int, Int)] with 10,000 items, and then created a ColumnarRDD from it, and cached both in memory, the ColumnarRDD took up 7 times as little memory as the original.

ColumnarRDD does not currently work in the Spark REPL. The issue is that the implicit shapeless Generic typeclasses are generated by macros that basically define a new anonymous class on the spot in your code. So if you create a ColumnarRDD in the REPL that is also where your new anonymous Generic subclass will be defined, and this does not deserialize well since the receiving side will not know this class.

Leave a reply

Your email address will not be published. Required fields are marked *

Go top