by Koert Kuipers, Tresata CTO

Anyone that has used map-reduce in production knows the key to scalability and performance in reduce operations is to push as much of the work to the map-side as possible. Scalding does this elegantly and automatically for you if you can express your operation as a Semigroup, which means you have a binary operator that is associative. This allows developers to focus on writing Semigroups (or Monoids, which are Semigroups that also have an identity element) and let Scalding handle how to run it efficiently. The resulting separation of algo design from the details of the efficient execution implementation is great. It raises the level of abstraction without sacrificing performance, and it also facilitates re-use on other platforms (Spark comes to mind).

The main collection of Semigroups and Monoids for Scalding sits in the Algebird project, which has no dependencies on Hadoop, Cascading or Scalding. An example of a very handy Monoid in here is CountMinSketch: a probabilistic data structure to approximate the number of times an element has appeared. CountMinSketch provides a hard lower bound and a probabilistic upper bound for each count.

We sometimes need to do probabilistic counting with hard lower and upper bounds and with a focus on the most frequent elements. The SpaceSaver algo (also known as StreamSummary) provides a way to do exactly this. See here: Efficient computation of frequent and top-K elements in data streams

To use SpaceSaver within Scalding (and potentially other distributed frameworks) we implemented the SpaceSaver semigroup at Tresata in Scala.

It is straightforward to use. For example to find the most frequent words in a small piece of text from Hamlet (admittedly not very interesting, it’s going to be words like “the” and “you”) in a single process you can do:

scala> import com.twitter.algebird._
import com.twitter.algebird._

scala> import

scala> val words = (
.flatMap{ line => line.split(“[\s,\.!;]+”)
.map{ _.toLowerCase }
.filter{ _ != “” } }
words: Iterator[java.lang.String] = non-empty iterator
scala> val sg = new SpaceSaverSemigroup[String]
sg: com.twitter.algebird.SpaceSaverSemigroup[String] = com.twitter.algebird.SpaceSaverSemigroup@38dffaa0
scala> (
.map(SpaceSaver(100, _))

In this example SpaceSavers holding a single element each are created, and then combined using the plus operator, after which we ask for the top 10 most frequent words. The factor 100 passed into the SpaceSaver constructor is the maximum number of words that will be tracked in memory at any given time. The first line of the response shows that the most frequent word is “of”, which showed up 47 times exactly. More interesting is the word “or” which shows up between 17 and 23 times: this demonstrates nicely that the answer is probabilistic but with hard bounds.

In Scalding the SpaceSaver data structure can be used in a similar fashion. For example a Scalding program to do the counting of words in Hamlet would be:

.flatMap(‘line -> ‘word){ line: String => line.split(“[\s,\.!;]+”).map{ _.toLowerCase }.filter{ _ != “” } }
.groupAll{ _.mapPlusMap(‘word -> ‘top20){ SpaceSaver(100, _ : String) }{ _.topK(10) }}
.flatMapTo(‘top20 -> (‘word, ‘min, ‘max)){ x : Seq[(String, Approximate[Long], Boolean)] =>{ x1 => (x1._1, x1._2.min, x1._2.max) } }

The Scalding version provides the obvious benefit of parallel execution.

Our implementation of SpaceSaver has recently been contributed to Algebird project and should be available soon for anyone to use in Scalding, Spark or even single machine processing. Enjoy!



Leave a reply

Your email address will not be published. Required fields are marked *

Go top