Scala’s parallel collections and the aggregate method

In this post we look at the aggregate method available in the Scala collection library. It may look a little intimidating at first but can really be your friend when you can do something in parallel using Scala’s parallel collections.

As an example we will count the words in a file. Our sample file has 5,000 lines with 10 words in each line separated by a space.

In this example we read in all the lines in the file into a List like this:

val lines = Source.fromFile("/home/markus/tmp/bla.txt").getLines.toList

This may not be the most efficient way to read files, especially very large one, but it will do here to show how the aggregate method works:

We use the following helper method and regular expression to count the words in a single line:

val singleWordRegex = java.util.regex.Pattern.compile("\\w+")

def countWords(line: String): Int = {
  var wordCount = 0
  val matcher = singleWordRegex.matcher(line)
  while (matcher.find) wordCount += 1
  wordCount
}

A typical way to use Scala to count the words in the whole file would be:

val sum = lines.foldLeft(0)((x, line) => x + countWords(line))
// sum = 50000

Using foldLeft is a standard way to do this in Scala.

Let’s now look at the aggregate method and do the same:


val sum = lines.aggregate(0)((x, line) => x + countWords(line), _ + _)
// sum = 50000

This looks very similar to foldLeft, except for the "_ + _" at the end, so you may wonder what the exact difference is between foldLeft and aggregate. Let’s look at the method declaration and implementation of the aggregate method in Scala’s scala.collection.immutable.List class. This class inherits the aggregate method from trait TraversableOnce where the method is defined like this (if you use Eclipse, just put your cursor over the method and press F3 to go to the method definition):

def aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B =
  foldLeft(z)(seqop)
// line break added for readability

If you look closely you may think that I am kidding because here aggregate is just a call to foldLeft. The 3rd parameter (the "_ + _" in the example above) is just ignored here.
So for collections like scala.collection.immutable.List that inherit from TraversableOnce, calling aggregate is just like foldLeft and you won’t get any advantage and a small disadvantage as your code is slightly more complicated to read.

So what’s the point of aggregate?

Let’s look at this code:

val sum = lines.par.aggregate(0)((x, line) => x + countWords(line), _ + _)
// sum = 50000

It is almost the same as the example above with one small but important difference: par. This method turnes the List lines into a parallel collection (to be more specific in this case it will be a scala.collection.parallel.immutable.ParSeq[String]).

If you look at the implementation of aggregate again (press F3 again in Eclipse), you will see this (for Scala 2.9.0.1):

def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
  executeAndWaitResult(new Aggregate(z, seqop, combop, splitter))
}

I am not going into the details here (but do this if you are interested, it’s an interesting read), but as you can see now the 3rd parameter (combop) is indeed used and it is no longer a simple call to foldLeft.

So now let’s talk about what aggregate does when called on a parallel collection.
Like foldLeft it has a start value (the 1st parameter) and it accumulates values using the content of the collection and the provided function called seqop. So far this is just like foldLeft. But as a parallel collection it can use use several threads to improve performance. For this the collection is split into parts and the seqop is called on the elements of each separate part of the collection. Once this is done, combop (short for combination operator) is used to combine the separate results we gained with seqop.
If you have several processors as almost all computers now do, you can use true parallelism and gain a lot of performance from using parallel collections. You don’t need to care about the details of how the collections are split and how they are distributed over the available processes as the library does this for you. Under the hood, Scala uses some of the powerful mechanisms from the java.util.concurrent framework (btw: learning about java.util.concurrent is a must for all Java and Scala developers).

You should always make performance tests when using parallel collections. For simple things and small collections, the overhead of distributing them over the different cores may not give you a performance gain and may even slow your application down. Parallel collections are perfect if you have large collections, do calculations on the elements that take some time and the algorithm can be split into sub tasks (not all algorithms can’t).

I made a small test that uses foldLeft, aggregate on a non parallel collection and aggregate on a parallel collection. I ran all examples 10,000 times (to help the JVM to warm up and optimize) and calculated the average time it took in nano seconds. Here are the results (I have a 4 core machine):

Here are the results in nano seconds:

method average minimum
foldLeft 4259846.0 4046714.0
aggregate non parallel 4189567.0 4057543.0
aggregate parallel 2047955.0 1073732.0

As excepted the aggregate method on the non parallel collection is about as fast as the foldLeft. But on a parallel collection it is much faster!
In this case the average time was about 2 times as fast. I also checked the minimum time and there the parallel version was about 4 times (as expected with 4 cores) as fast as the non parallel version of aggregate and foldLeft.

Of course aggregate is just one useful method for Scala’s parallel collection. There is MUCH MORE available and as a Scala developer you should definitely learn more about them.

More information about the aggregate can be found the the API docs:
http://www.scala-lang.org/api/current/scala/collection/parallel/ParIterableLike.html

For more information about Scala’s parallel collection, see this video:
Scala Parallel Collections by Aleksandar Prokopec

and this post in infoq:
Scala 2.9.0 Introduces Parallel Collections

The aggregate method is just a small part of Scala’s powerful collection library. Scala has one of the best collection libraries of all programming languages out there and in addition to Scala’s own collection library you can also use all of Java’s collections in Scala giving you almost everything you will need in your daily work as a software developer.
Scala’s collection alone are worth learning the language. When looking for a new language always look at the available collections. If they suck, find another language! Scala’s collection DO NOT suck!

Leave a Comment


NOTE - You can use these HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Comment Spam Protection by WP-SpamFree