Never trust a profiler

A week or so ago I had mentioned to Timon that for the first time a profiler had actually pointed me in a direction that directly lead to a positive increase in performance. Initially Timon just gave me that “you’re just a crotchety old man” look (which, in most cases, is the correct response). I pointed him to Josh Bloch’s Performance Anxiety presentation which dives into why it is so hard (in fact “impossible” in Josh’s words) to benchmark modern applications. It also references the interesting paper “Evaluating the Accuracy of Java Profilers”.

Just last week I was trying to track down a severe performance degradation in my snapshot recovery code. I was under some time pressure so I turned on my profiler to try to point me in the right direction. The result that it gave was clear, repeatable and unambiguous and pointed me into the linear probing algorithm of the hash table that I am using. Since I had recently moved to a new hash table (one that allowed for rehashing and resizing) it was possible that this was in fact the root of my performance problem but I had my doubts. (More on this in a future post.) I swapped out my hash table and re-profiled. The profiler again gave me a clear, repeatable and unambiguous result that my performance woes were solved so I moved on. When we were able to test the snapshot recovery code on production snapshots, we found that the performance problems still existed.

My profiler lied to me. Never trust your profiler.

Streaming Algorithms and Sketches

Here at Aggregate Knowledge we spend a lot of time thinking about how to do analytics on a massive amount of data. Rob recently posted about building our streaming datastore and the architecture that helps us deal with “big data”. Given a streaming architecture, the obvious question for the data scientist is “How do we fit in?”. Clearly we need to look towards streaming algorithms to match the speed and performance of our datastore.

A streaming algorithm is defined generally as having finite memory – significantly smaller than the data presented to it – and must process the input in one pass. Streaming algorithms start pretty simple, for instance counting the number of elements in the stream:

counter = 0
for event in stream:
    counter += 1

While eventually counter will overflow (and you can be somewhat clever about avoiding that) this is way better than the non-streaming alternative.

elements = list(stream)
counter = len(elements)

Pretty simple stuff. Even a novice programmer can tell you why the second method is way worse than the first. You can get more complicated and keep the same basic approach – computing the mean of a floating point number stream is almost as simple: keep around counter as above, and add a new variable, total_sum += value_new. Now that we’re feeling smart, what about the quantiles of the stream? Ah! Now that is harder.

While it may not be immediately obvious, you can prove (as Munro and Paterson did in 1980) that computing exact quantiles of a stream requires memory that is at least linear with respect to the size of the stream. So, we’re left approximating a solution to the quantiles problem. A first stab might be sampling where you keep every 1000th element. While this isn’t horrible, it has it’s downsides – if your stream is infinite, you’ll still run out of space. It’s a good thing there are much better solutions. One of the first and most elegant was proposed by Cormode and Muthukrishnan in 2003 where they introduce the Count-Min sketch data structure. (A nice reference for sketching data structures can be found here.)

Count-Min sketch works much like a bloom filter. You compose k empty tables and k hash functions. For each incoming element we simply hash it through each function and increment the appropriate element in the corresponding table. To find out how many times we have historically seen a particular element we simply hash our query and take the MINIMUM value that we find in the tables. In this way we limit the effects of hash collision, and clearly we balance the size of the Count-Min sketch with the accuracy we require for the final answer. Heres how it works:

The Count-Min sketch is an approximation to the histogram of the incoming data, in fact it’s really only probabilistic when hashes collide. In order to compute quantiles we want to find the “mass” of the histogram above/below a certain point. Luckily Count-Min sketches support range queries of the type “select count(*) where val between 1 and x;“. Now it is just a matter of finding the quantile of choice.

To actually find the quantiles is slightly tricky, but not that hard. You basically have to perform a binary search with the range queries. So to find the first decile value, and supposing you kept around the the number of elements you have seen in the stream, you would binary search through values of x until the return count of the range query is 1/10 of the total count.

Pretty neat, huh?

Custom Input/Output Formats in Hadoop Streaming

Like I’ve mentioned before, working with Hadoop’s documentation is not my favorite thing in the world, so I thought I’d provide a straightforward explanation of one of Hadoop’s coolest features – custom input/output formats in Hadoop streaming jobs.

Use Case

It is common for us to have jobs that get results across a few weeks, or months, and it’s convenient to look at the data by day, week, or month. Sometimes including the date (preferably in a nice standard format) in the output isn’t quite enough, and for whatever reason it’s just more convenient to have each output file correspond to some logical unit.

Suppose we wanted to count unique users in our logs by state, by day. The streaming job probably starts looking something like:

hadoop jar /path/to/hadoop-streaming.jar \
        -input log_data/ \
        -output geo_output/ \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -cacheFile ip_geo_mapping.dat#geo.dat  

And the output from the job might look like:

2011-06-20,CA,1512301
2011-06-21,CA,1541111
2011-06-22,CA,1300001
...
2011-06-20,IL,23244
2011-06-21,IL,23357
2011-0-21,IL,12213
...

This is kind of a pain. If we do this for a month of data and all 50 states appear every day, that’s at least 1500 records – not quite so easy to eyeball. So, let’s ask Hadoop to give us a file per day, named YYYY-MM-DD.csv, that contains all the counts for that day. 30 files containing 50 records each is much more manageable.

Write Some Java

The first step is to write some Java. I know, this is a tutorial about writing Input/Output formats for Hadoop streaming jobs. Unfortunately, there is no way to write a custom output format other than in Java.

The good news is that once you’re set up to develop, both input and output formats tend to take minimal effort to write. Hadoop provides a class just for putting output records into different files based on the content of each record. Since we’re looking to split records based on the first field of each record, let’s subclass it.

public class DateFieldMultipleOutputFormat
    extends MultipleTextOutputFormat<Text, Text> {

    @Override
    protected String generateFileNameForKeyValue(Text key, Text value, String name) {
        String date = key.toString().split(",")[0];
        return date + ".csv";
    }
}

It’s a pretty simple exercise, even if you’ve never written a single line of Java. All the code does is take the first field of the key and use it as the output filename. Honestly, the hardest part is going to be setting up your IDE to work with Hadoop (the second hardest part was finding this blog post).

Use It

The most recent Hadoop documentation I can find, still has documentation on using custom Input/Output formats in Hadoop 0.14. Very helpful.

It turns out life is still easy. When you look at a less-misleading part of the Hadoop Streaming documentation, all the pieces you need are right there. There are flags, -inputformat and -outputformat that let you specify Java classes as your input and output format. They have to be in the Java classpath when Hadoop runs, which is taken care of by the -libjars generic Hadoop option. There is no need to compile a custom streaming jar or anything crazy (I found worse suggestions on StackOverflow while figuring this out).

Using this newfound wisdom, it’s pretty trivial to add the output format to the existing streaming job. The next version of the job is going to look something like:

hadoop jar /path/to/hadoop-streaming.jar \
        -libjars /path/to/custom-formats.jar \
        -input log_data/ \
        -output geo_output/ \
        -outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -cacheFile ip_geo_mapping.dat#geo.dat  

Pay Attention to the Details

If you write an output format like the one above and try to run a job, you’ll notice that some output records disappear. The overly-simple explanation is that Hadoop ends up opening the file for a specific date once for every reducer that date appears in, clobbering the data that was there before. Fortunately, it’s also easy to tell Hadoop how to send all the data from a date to the same reducer, so each file is opened exactly once. There isn’t even any Java involved.

All it takes is specifying the right partitioner class for the job on the command line. This partitioner is configured just like unix cut, so Data Scientists should have an easy time figuring out how to use it. To keep data from disappearing, tell the partitioner that the first field of the comma-delimited output record is the value to partition on.

With those options included, the final streaming job ends up looking like:

hadoop jar /path/to/hadoop-streaming.jar \
        -libjars /path/to/custom-formats.jar \
        -D map.output.key.field.separator=, \
        -D mapred.text.key.partitioner.options=-k1,1 \
        -input log_data/ \
        -output geo_output/ \
        -outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
        -cacheFile ip_geo_mapping.dat#geo.dat  

On the next run all the data should appear again, and the output directory should contain a file per day there is output. It’s not hard to take this example and get a little more complicated – it’d take minimal changes to make this job output to a file per state, for example. Not bad for a dozen lines of code and some command line flags.

For the bit hacker in you

My colleagues and I are wrapping up version 2 of our unique counting algorithms. The last pieces are the database tools that our analysts use — some custom functions in Postgres9 to manipulate our custom types. One step in our algorithm is to use the least-significant bit of a bit array. In Java we use a lookup technique (a 256-entry lookup table that you use to look up each byte) and in C we can use the bsf assembly instruction. (Both techniques can been researched further on Sean Eron Anderson’s awesome Bit Twiddling Hacks.)

Since I’m a hacker to the core I wanted to do a little more research to see what other folks were doing. Here are some additional references:

In the last reference, there is a response that mentions the fascinating technique of using the floating-point representation to extract the exponent:

unsigned GetLowestBitPos(unsigned value)
{
   double d = value ^ (value - !!value); 
   return (((int*)&d)[1]>>20)-1023; 
}

This response refers you to another post that explains a bit more the technique. In that response, Tony Lee mentions one of my favorite constants 0x5f3759df. I spent many years doing graphics and gaming programming (some of my first “real” programs in college where doing visual simulations on SGIs) and I remember the first time that I came across 0x5f3759df in the Quake source code. I’m always amazed at the bit techniques that people will come up with to solve a problem.

I’ll end this post with a link to one of my favorite articles on the internet: a history of the origins of 0x5f3759df in the Quake source. It’s a wonderful walk down memory lane.

Batch Acknowledged Pipelines with ZeroMQ

ZeroMQParallel processing with a task ventilator is a common pattern with ZeroMQ.  The basics of this pattern are outlined in the “Divide and Conquer” section of the ZeroMQ guide.  The pattern consists of the following components:

  • A task ventilator that produces tasks.
  • A number of workers that do the processing work.
  • A sink that collects results from the worker processes.

This pattern works wonderfully as long as your consumers can outpace your producers. If you start producing tasks faster than you can process them, then messages will start backing up in ZeroMQ socket queues.  This will drive the memory utilization of the processes up, make a clean shutdown of the distributed processing system difficult, and result in a sizeable number of messages lost in the event of a crash.  This can be avoided by using ZMQ_REP and ZMQ_REQ sockets, but in that case you lose the speed advantage of a pipeline pattern.

To maintain the speed of a pipeline pattern while allowing for some control over the number of messages in flight at any given time, we can add batch acknowledgements to the basic ventilator / worker / sink pattern.  Accomplishing this only requires a few minor changes:

  • Add a pull socket to the ventilator for receiving acknowledgements
  • Add a push socket to the manager for sending acknowledgements
  • Add a batch size variable

So without further ado, let’s dive into some code.  I’m going to keep things simple for this example.  I’ll define a function for the ventilator, a function for the worker, and a function for the sink.  These will be started using multiprocessing.Process. For any Java programmers in the audience: Python has first class functions.  There is no requirement to wrap a function in a class.

The ventilator uses a ZMQ_PUSH socket to send tasks to listening workers, and a ZMQ_PULL socket to receive acknowledgements from the sink process.  The ventilator will send N messages (where N is the batch size) and then wait for an acknowledgement:

import zmq
from time import time
from multiprocessing import Process

def ventilator(batch_size, test_size):
    """task ventilator function"""

    """set up a zeromq context"""
    context = zmq.Context()

    """create a push socket for sending tasks to workers"""
    send_sock = context.socket(zmq.PUSH)
    send_sock.bind("tcp://*:5555")

    """create a pull socket for receiving acks from the sink"""
    recv_sock = context.socket(zmq.PULL)
    recv_sock.bind("tcp://*:5557")

    """initiate counter for tasks sent"""
    current_batch_count = 0

    """start the message loop"""
    for x in range(test_size):

        """send until we reach our batch limit"""
        while current_batch_count < batch_size:
            send_sock.send("task")
            current_batch_count += 1

        """reset the batch count"""
        current_batch_count = 0

        """wait for an acknowledgement and block while waiting -
           note this could be more sophisticated and provide
           support for other message types from the sink,
           but keeping it simple for this example"""
        msg = recv_sock.recv()

The workers use a ZMQ_PULL socket to receive tasks from the ventilator, and a ZMQ_PUSH socket to send results to the sink process:

def worker():
    """task worker function"""

    """set up a zeromq context"""
    context = zmq.Context()

    """create a pull socket for receiving tasks from the ventilator"""
    recv_socket = context.socket(zmq.PULL)
    recv_socket.connect("tcp://*:5555")

    """create a push socket for sending results to the sink"""
    send_socket = context.socket(zmq.PUSH)
    send_socket.connect("tcp://*:5556")

    """receive tasks and send results"""
    while True:
        task = recv_socket.recv()
        send_socket.send("result")

The sink process uses a ZMQ_PULL socket to receive results from the workers, and a ZMQ_PUSH socket to send batch acknowledgements to the ventilator process:

def sink(batch_size, test_size):
    """task sink function"""

    """set up a zmq context"""
    context = zmq.Context()

    """create a pull socket for receiving results from the workers"""
    recv_socket = context.socket(zmq.PULL)
    recv_socket.bind("tcp://*:5556")

    """create a push socket for sending acknowledgements to the ventilator"""
    send_socket = context.socket(zmq.PUSH)
    send_socket.connect("tcp://*:5557")

    result_count = 0
    batch_start_time = time()
    test_start_time = batch_start_time

    for x in range(test_size):
        """receive a result and increment the count"""
        msg = recv_socket.recv()
        result_count += 1

        """acknowledge that we've completed a batch"""
        if result_count == batch_size:
            send_socket.send("ACK")
            result_count = 0
            batch_start_time = time()

    duration = time() - test_start_time
    tps = test_size / duration
    print "messages per second: %s" % (tps)

The main routine for the test allows for a configurable batch size, test size, and number of workers:

if __name__ == '__main__':
    num_workers = 4
    batch_size = 100
    test_size = 1000000

    workers = {}
    ventilator = Process(target=ventilator, args=(batch_size, test_size,))
    sink = Process(target=sink, args=(batch_size, test_size,))

    sink.start()

    for x in range(num_workers):
        workers[x] = Process(target=worker, args=())
        workers[x].start()

    ventilator.start()

For the test, I sent 1,000,000 small messages, in batch sizes of 1,000,000, 1,000, 100, 50, and 10:

batched acknowledgement results

There are two main conclusions I draw from these results.  The first is that the pipeline pattern obtains an extremely high throughput due to the fact that it does not have to ACK messages.  The second is that if your workers are performing work that takes any amount of time, you can limit the batch size to a rather small amount of messages without impacting your overall throughput.  Even with a batch size of 10, a throughput of around 81k messages a second was obtained.  As long as the combined messages per second your workers can process is less than the raw throughput of your pipeline for a given batch size, batched acknowledgements will give you some degree of flow control without compromising your throughput.

The “batched pipeline pattern” allows tradeoffs to be made between acceptable message loss and throughput, and offers a great deal of control that a simple pipeline pattern with no batching does not offer. In my experience it is an extremely useful starting point a distributed system for processing streaming data.

Working with large sets

SetsWe do a lot of work with unique user counting and we have developed some techniques for accurate counting in small bounded-size structures.  Periodically I like to make sure that all of our assumptions still hold as the world changes around us.  I was recently running a number of experiments on large sets to get our science folks some data to analyze.  It involved getting the cardinality, union and intersection of large sets of user ids (which are 64bit values) the brute-force way.

Since I spend a good deal of my time writing Java, I figured I would just quickly whip something up.  For set sizes of 1M or less, the “standard techniques” worked well enough — java.util.HashSet will do the trick.  In general, for larger collections of primitives it’s a good idea to use one of the 3rd party libraries that is specifically tailored to primitives such as Trove or Colt to cut down on the time and memory bloat of autoboxing primitives into objects.  (There are a number of postings around what are the “best” collections for a given circumstance such as this one on StackOverflow.)  You can get to around 10M entries in a traditional set before running union and intersection take prohibitively long due to the fact that it works element-by-element.

Working with sets with over 10M entries requires different techniques.  The most common approach is to use bit arrays.  Bit arrays are not only compact in size (in that each element only takes one bit of RAM), but they are very fast for doing set operations. The set operations are typically performed by taking chunks of the bit array and performing regular instructions on them. For example, the bit array can be chunked up into longs (64bit ‘words’) and then bitwise or or bitwise and operations are performed pairwise on the two ‘sets’. Java provides java.util.BitSet which does all of the heavy lifting for you.  (There are 3rd party bit arrays available too.)

When using a hash-based set, one simply gives it the element that is to be stored (in our case, that’s a 64bit user id). With a bit array the interface is basically setBit(index, value) and getBit(index).  The problem comes down to:  what is the index. A naive approach would simply use the bit array in the same way as the hash set — pass it the element. Unfortunately, this would require a bit array that is 264-1 bits long. If you were a little dangerous with your knowledge, you could exploit some of the RLE (run-length encoding) techniques for compressing your bit array such as Word Aligned Hybrid (WAH — there’s even a Java implementation as well as others found here). Another, call it ‘sane’, approach is to use a map together with the bit array to provide the index. The map is used to map from the element to a sequential index and that index is used in the bit array.

To insert (pseudo code):

index = elementToIndexMap.get(elementId)
if(index does not exist)
    index = sequence++
    elementToIndexMap.put(elementId, index)
endif
bitarray.setBit(index, true)

To retrieve (pseudo code):

index = elementToIndexMap.get(elementId)
if(index does not exist)
    return false/*assume not present means 'not set'*/
endif
return bitarray.getBit(index)

WIth this approach, you can easily get it 100M or even billions of elements in a set given that you have enough RAM.

Going beyond 1B requires other techniques and usually involve disk-based techniques. Unix join and sort for example can get you a long way.

Even though the examples that I gave were Java-based, the techniques presented here are universal. Underlying all of this is the necessity for a little profiling, a bit of understanding of the algorithms involved, and the realization that sometimes the default facilities provided are insufficient for all cases.

Very large heaps in Java

Java HeapI’ve been wanting to write up a juicy post on how we deal with very large heaps in Java to reduce GC pauses.  Unfortunately I keep getting side tracked getting the data together.  The latest bump in the road is due to a JVM bug of sorts.

Backstory:  Todd Lipcon’s twitter post pointed me to the JVM option -XX:PrintFLSStatistics=1 to be able to get out some good information about heap fragmentation. He was even kind enough to provide the Python and R scripts! I figured that it would be a few minutes of fiddling and I’d have some good data for a post. No such luck. Our JVM GC/heap options are -XX:+UseConcMarkSweepGC -Xms65g -Xmx65g. When -XX:PrintFLSStatistics=1 is used with this, the following output is seen:

Statistics for BinaryTreeDictionary:
------------------------------------
Total Free Space: -1824684952
Max   Chunk Size: -1824684952
Number of Blocks: 1
Av.  Block  Size: -1824684952
Tree      Height: 1

A few seconds of digging into the Hotspot source reveals:

void BinaryTreeDictionary::reportStatistics() const {
  verify_par_locked();
  gclog_or_tty-&gt;print("Statistics for BinaryTreeDictionary:\n"
         "------------------------------------\n");
  size_t totalSize = totalChunkSize(debug_only(NULL));
  size_t    freeBlocks = numFreeBlocks();
  gclog_or_tty-&gt;print("Total Free Space: %d\n", totalSize);
  gclog_or_tty-&gt;print("Max   Chunk Size: %d\n", maxChunkSize());
  gclog_or_tty-&gt;print("Number of Blocks: %d\n", freeBlocks);
  if (freeBlocks &gt; 0) {
    gclog_or_tty-&gt;print("Av.  Block  Size: %d\n", totalSize/freeBlocks);
  }
  gclog_or_tty-&gt;print("Tree      Height: %d\n", treeHeight());
}

in hotspot/src/share/vm/gc_implementation/concurrentMarkSweep/binaryTreeDictionary.cpp. (“%d” just doesn’t cut it with a “long”‘s worth of data.)  I filed a hotspot bug so hopefully it will be fixed in some release in the not-too-distant-future.

I can work around this but it has slowed down my getting to the juicy blog post. Stay tuned!

2eight7

2-eight-7When I was writing my post yesterday I was reflecting on how much time we were spending making our code int friendly — specifically, dealing with the problems when you’re working with values around Integer.MAX_VALUE or 2147483647 (231-1). I likened it to i18n (internationalization) or l10n (localization). Much in that same vein, I’d like to coin the term “2eight7” to represent the problems one runs into when working with a signed integer and any code that depends (implicitly or explicitly) on it.

Let the debates begin!

Billions of anything

In most programming languages an int is 32 bits wide providing for 4294967295 (232-1) values or 2147483647 (231-1) if signed. In the case of Java, which we use for a number of components in our infrastructure, many of the fundamental components use int‘s: array indexes, NIO, IO, most collections (as they are commonly based on arrays), etc. When you’re working with billions of anything, its easy to run into these bounds which result in subtle bugs that are hard to track down due to exceptions that aren’t what they seem. The most common cases that we run into are due to the roll-over that occurs when you add any positive value to 2147483647 — the value becomes negative (since Java’s int‘s are signed). Sometimes this will result in an ArrayIndexOutOfBounds exception or sometimes it will result in a seemingly impossible callpath from deep inside of some java.* class.

I remember working on my first few i18N (internationalization) and l10n (localization) projects where I learned the do’s and don’ts of how to write code that worked seamlessly (or at least was easy to work with) in multiple locales. Working with “big data” feels exactly like that — you have to slowly build up a set of techniques:  instead of a single array, you need to keep around arrays of arrays (since each dimension is limited to 2147483647 elements); you have to know how to shard your collections so that they do not exceed the maximum allowed capacity (e.g. HashMap is limited to 1073741824 (230) buckets); if(value > Integer.MAX_VALUE) doesn’t do what you think it does (and most of the time it’s hard to tell that that’s the code that you wrote). The list goes on.

One interesting development was “announced” at EclipseCon: there’s talk about “big data” support in Java 9 (ref Reinhold Already Talking About Java 9 for example). This is something that I will keep my eye on. Unfortunately, it wont help us for the next few years.