Open Source Release: java-hll

We’re happy to announce our newest open-source project, java-hll, a HyperLogLog implementation in Java that is storage-compatible with the previously released postgresql-hll and js-hll implementations. And as the rule of three dictates, we’ve also extracted the storage specification that makes them interoperable into it’s own repository. Currently, all three implementations support reading storage specification v1.0.0, while only the PostgreSQL and Java implementations fully support writing v1.0.0. We hope to bring the JS implementation up to speed, with respect to serialization, shortly.

Open Source Release: js-hll

One of the first things that we wanted to do with HyperLogLog when we first started playing with it was to support and expose it natively in the browser. The thought of allowing users to directly interact with these structures — perform arbitrary unions and intersections on effectively unbounded sets all on the client — was exhilarating to us. We knew it could be done but we simply didn’t have the time.

Fast forward a few years to today. We had finally enough in the meager science/research budget to pick up an intern for a few months and as a side project I tasked him with turning our dream into a reality. Without further ado, we are pleased to announce the open-source release of AK’s HyperLogLog implementation for JavaScript, js-hll. We are releasing this code under the Apache License, Version 2.0 matching our other open source offerings.

We knew that we couldn’t just release a bunch of JavaScript code without allowing you to see it in action — that would be a crime. We passed a few ideas around and the one that kept bubbling to the top was a way to kill two birds with one stone. We wanted something that would showcase what you can do with HLL in the browser and give us a tool for explaining HLLs. It is typical for us to explain how HLL intersections work using a Venn diagram. You draw some overlapping circles with a broder that represents the error and you talk about how if that border is close to or larger than the intersection then you can’t say much about the size of that intersection. This works just ok on a whiteboard but what you really want is to just build a visualization that allows you to select from some sets and see the overlap. Maybe even play with the precision a little bit to see how that changes the result. Well, we did just that!

Click above to interact with the visualization

Click above to interact with the visualization

Note: There’s more interesting math in the error bounds that we haven’t explored. Presenting error bounds on a measurement that cannot mathematically be less than zero is problematic. For instance, if you have a ruler that can only measure to 1/2″ and you measure an object that truly is 1/8″ long you can say “all I know is this object measures under 0.25 inches”. Your object cannot measure less than 0 inches, so you would never say 0 minus some error bound. That is, you DO NOT say 0.0 ± 0.25 inches.  Similarly with set intersections there is no meaning to a negative intersection. We did some digging and just threw our hands up and tossed in what we feel are best practices. In the js-hll code we a) never show negative values and b) we call “spurious” any calculation that results in an answer within 20% of the error bound. If you have a better answer, we would love to hear it!

Open Source Release: js-murmur3-128

As you can imagine from of all of our blog posts about hashing that we hash a lot of things. While the various hashing algorithms may be well-defined, the devil is always in the details especially when working with multiple languages that have different ways of representing numbers. We’re happy to announce the open-source release of AK’s 128bit Murmur3 implementation for JavaScript, js-murmur3-128. We are releasing this code under the Apache License, Version 2.0 matching our other open source offerings.

Details

The goal of the implementation is to produce a hash value that is equivalent to the C++ and Java (Guava) versions for the same input and it must be usable in the browser. (Full disclosure: we’re still working through some signed/unsigned issues between the C++ and Java/JavaScript versions. The Java and JavaScript versions match exactly.)

Usage

Java (Guava):

final int seed = 0;
final byte[] bytes = { (byte)0xDE, (byte)0xAD, (byte)0xBE, (byte)0xEF,
                       (byte)0xFE, (byte)0xED, (byte)0xFA, (byte)0xCE };
com.google.common.hash.HashFunction hashFunction = com.google.common.hash.Hashing.murmur3_128(seed);
com.google.common.hash.HashCode hashCode = hashFunction.newHasher()
       .putBytes(bytes)
       .hash();
System.err.println(hashCode.asLong());

JavaScript:

var seed = 0;
var rawKey = new ArrayBuffer(8);
    var byteView = new Int8Array(rawKey);
        byteView[0] = 0xDE; byteView[1] = 0xAD; byteView[2] = 0xBE; byteView[3] = 0xEF;
        byteView[4] = 0xFE; byteView[5] = 0xED; byteView[6] = 0xFA; byteView[7] = 0xCE;
console.log(murmur3.hash128(rawKey, seed));

HyperLogLog Engineering: Choosing The Right Bits

Author’s Note: this is just a quick post about an engineering hiccup we ran into while implementing HyperLogLog features that aren’t mentioned in the original paper. We have an introduction to the algorithm and several other posts on the topic if you’re interested.

Say you had two HyperLogLog data structures with 5-bit-wide registers, one with log_{2}m = 11 and the other with log_{2}m = 15, and wanted to compute their union. You could just follow my colleague Chris’ advice and “fold” the larger one down to the size of the smaller one and then proceed as usual taking the pairwise max of the registers. This turns out to be a more involved process than Chris makes it out to be if you designed your HLL implementation in a particular way. For instance, if you use the 15 least(/most) significant bits of the 64-bit hashed input to determine register index and the next 30 bits to determine the register value, you end up in a tricky situation when you truncate the last 4 bits of the index to get the new 11-bit index.

bit string bad

If you imagine feeding the same element into an HLL of the smaller size, then the 4 bits you truncated from the index would have actually been used in the computation of the register value.

bit string bad after fold

You couldn’t simply take the original register value you computed, you’d have to take into account the new prefix added to the register value bit string. If the prefix has a 1 in it, you would recompute the run of zeroes on just the prefix (because you know it contains a 1 and thus all the information you need), and if not, you’d add the length of the prefix to the original register value computed. Not a ton of work, but having clutter like this in algorithmic code distracts the reader from the true intention. So how do we avoid this?

Well, you could say that it’s very, very unlikely that you’ll ever need more than 30 bits for your register value, so you could assume that the register width would remain constant forever and use the bottom 30 bits for your register value and the next log_{2}m bits for your register index. That way you could just truncate the last 4 bits of the index and know that your register value would still be the same. On the other hand, if you’re Google, that may not be true. In that case, what you should do is use the log_{2}m least (/most) significant bits of your hashed value for the register index and the 30 most (/least) significant bits for the register value.

bit string

Now you can just truncate the register index and use the original register value.

bit string after fold

If you’re using a good hash function like MurmurHash3 that gives you 128 bits of entropy, you could simply compute the register index from the first 64-bit word in the hash and compute the register value from the second 64-bit word and completely ignore this problem up to a mind-bending log_{2}m = 64 and register width of 6 (aka the heat death of the universe).

I know it’s not always possible to anticipate this problem in the early stages of implementing and vetting an algorithm, but hopefully with a bit of research the next time someone looks to implement HLL they’ll see this and learn from our mistake.

Adventures in Concurrency

 

The Past

The Summarizer, our main piece of aggregation infrastructure, used to have a very simple architecture:

  1. RSyslog handed Netty some bytes.
  2. A Netty worker turned those bytes into a String.
  3. The Netty worker then peeled off the RSyslog envelope to reveal a payload and an event type. We call this combination an IMessage.
  4. The IMessage‘s payload got turned into an IInputEvent (basically a POJO).
  5. The IInputEvent was mapped to one or many summaries, based on its event type, and which would then be updated with the event.
Original Summarizer Architecture with Summarization Lock

Netty workers contend for a central summarization lock before updating summaries.

All of this work was done inside the Netty workers, and the synchronization of the summary objects was handled by a single lock on all of them. Some events were mapped to a single summary, others to a half dozen. Luckily the payloads were simple (CSV-formatted) and the summaries were even simpler. It didn’t really matter if the events hit one summary or ten, we could summarize events much faster than we could parse them. Under these conditions we could handle 200k messages per second, no sweat.

Slowly, new reporting features were added and the summaries became more complex. The number of operations per event increased, and throughput dropped to 100k/sec. Progressively, summarization supplanted parsing as the bottleneck.

Then we introduced a more complex, nested JSON event format (v2 messages) in order to support new product features. Complex, nested events meant ever more complex, nested summaries, which meant ever more time holding the single lock while updating them. Parsing time increased with the new payload format, but was still far faster than updating the summaries. Throughput for v1 messages had dipped to 60k/sec, and 10k/sec for the v2 messages.

Moreover, the new features the v2 messages permitted weren’t simply an open-ended exercise: with them came the customers that demanded those features and their additional traffic.  The Summarizer simply wouldn’t stand up to the predicted traffic without some work. This post is an overview of the multithreaded solution we used and hopefully will provide some insight into the pitfalls of concurrency in Java 6 and 7.

Objective

Get v1 message throughput back to 200k/sec and v2 throughput to 100k/sec, ideally on our production hardware. Simple enough, given that I knew the main bottlenecks were summarization and to a lesser extent the parsing of the IMessages to IInputEvents.

Let’s put in some queues

The basic premise of concurrency is that you find the time-consuming bits of work, throw some queues and workers at them, and out comes performance, right? (Wrong! But I’ve got a whole narrative going here, so bear with me.) The natural places for inserting these queues seemed to be between steps 3 and 4, and steps 4 and 5. If parsing IMessages to IInputEvents and summarizing IInputEvents are the only time-consuming work units, adding concurrency there should open up those bottlenecks. According to the “train book” I had three options for queues:

  • ArrayBlockingQueue (henceforth ABQ) – bounded, backed by an array (duh), uses a single ReentrantLock
  • LinkedBlockingQueue (henceforth LBQ)- bounded, backed by a linked list (duh), uses two ReentrantLocks (one for the head and one for the tail)
  • ConcurrentLinkedQueue (henceforth CLQ)- unbounded, backed by a linked-list, uses no locks, instead relies on a work-stealing algorithm and CAS
Queued Summarizer Architecture with BlockingQueues

IMessage to IInputEvent parsing and IInputEvent summarization are buffered by BlockingQueues.

We added a message parsing queue to which the Netty workers would dump IMessages. Message parser workers would take those IMessages and turn them into IInputEvents. They would then distribute those IInputEvents to a summarization queue I added to each summarization worker. Since I didn’t want to lock each report object, I decided that only a single summarization worker would ever write to a particular summary. (Martin Thompson’s blog posts about the Single Writer Principle were inspiration for this.) That is, each summarization worker would be assigned (by round-robin at startup) one or many summaries to own exclusively. So, in total I added one multiple-producer, multiple-consumer (MPMC) message parsing queue and N multiple-producer, single-consumer (MPSC) summarization queues (one for each summarization worker).

The First Bottleneck: Parsing

I slotted in the various queues available, replayed some traffic to get a feel for what was going on.

  • The message parsing queue was always full, which confirmed my suspicion that parsing, not Netty, was the first bottleneck.
  • The summarization queues were split between two groups: those that were always full and always empty. The reason was clear: some summarization workers were assigned high-volume summaries and others low-volume summaries.

This was my first lesson in queueing: queues are on average completely full or empty because it is nearly impossible to perfectly balance production/consumption rates. Which leads to the second lesson: CLQ (well, any unbounded queue) probably shouldn’t be used as a producer/consumer queue because “completely full” means “always growing” for an unbounded queue. Naturally, that’s an OK situation when consumption outpaces production, but in that scenario I wouldn’t have needed the queue in the first place. I needed back-pressure and only the blocking (in this case, bounded) queues could give me that.

In order to address the parsing bottleneck, I wanted to get a better grasp of the IMessage to IInputEvent throughput rate under different configurations. I constructed a test in which Netty workers would either:

  • do all the parsing work themselves, and then discard the message, or
  • would enqueue IMessages to the message parsing queue (either ABQ or LBQ), and parser workers would dequeue, parse, and then discard the IInputEvent. CLQ was not included here since it would consistently OOM as the queue grew without bound.

Each Netty worker would be responsible for a single connection and each connection could provide as many as 150k messages per second. Results for v1 and v2 message parsing were nearly identical, as were the results for Java 6/7, so they are presented here without distinction.

Netty-Only Message Parsing

When Netty did all of the parsing work, throughput maxed out at about 130k/sec.

 

Queued Message Parsing Throughput

Message parsing throughput with a BlockingQueue between the Netty workers and the message parser workers. The facet labels are [Queue Implementation, Queue Size].

  • Without a queue, throughput topped out at 130k/s. With a queue and the right parser worker count, each of the four Netty workers could produce 60k/sec worth of IMessages. Notably, neither situation provoked anywhere near 100% saturation on (# netty worker + # parser worker) cores, so I have to believe that it’s simply a matter of having dedicated parsing threads that are not affected by the context switching required to read from the network. Say context switching takes up r% of your time, then 5 netty workers can do at most w_0 = 5(1-r/100) units of work. However, 4 Netty workers and 1 parser worker can do w_1 = 4(1-r/100) + 1 > w_0 units. The fact that 4 Netty workers + 1 parser worker yields about 130-150k/sec, which is a small gain over just 5 Netty workers, suggests this. It might also be a matter of code “locality”: by allowing each thread to focus on a narrower scope of work, better branch prediction or compilation may be possible.
  • ABQ, touted as the end all of high-performance Java queues, gave us “atrocious” throughput, compared to LBQ, if more than two consumer threads were hitting it. This was surprising until I poked an active VM with SIGQUIT a few hundred times only to find that most of the workers were waiting on the ABQ’s ReentrantLock. The difference between two and three consumers hammering that lock amounted to a 50% drop in throughput.
  • LBQ’s split lock seemed to degrade more gracefully in the presence of “extra” producers or consumers. Specifically, the overhead of GC and a linked-list (vs. array) was less than that produced by lock contention on ABQ’s single lock. More remarkably, 2-8 parser workers always produced better results than a single parser worker, so a misconfiguration here couldn’t really do worse than revert to the 1 worker scenario. ABQ was not so lenient, however, dropping to throughput numbers lower than the Netty-only setup after 2 parser workers.
  • Queue size was largely irrelevant compared to the impact of adding or removing even a single producer/consumer. As long as the queue is large enough to buffer jitters, there’s really little point in spending hours tuning it.

Progress! At least I knew I could parse the 200k messages per second I needed.

The Second Bottleneck: Summarization

I knew that I wouldn’t be able to preserve the full parsing throughput simply because the queuing/dequeuing latency of another queue would always be present. The tradeoff was going to be adding more summarization workers at the cost of more time spent by the message parsing workers distributing the newly parsed IInputEvents to all relevant summarization workers. Each event would likely be distributed to more than one summarization worker, which meant a sequential lock acquisition for each summarization worker.

The cost of delivery was affected by the number of message parser workers, the number of summaries, the number of summarization workers, as well as the fan-out factor of each particular event, and hence on the proportions of different events to each other in a “nominal” data stream. This seemed like too many variables to isolate and too brittle of a measurement to be of any use. Instead, I threw out the fine-grained rigor and just plotted as many things as I could. I ran all the queues at one size: 2048.

BlockingQueue V1 Throughput on Production Hardware

At high throughputs, adding more workers simply makes things worse. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

BlockingQueue V2 Throughput on Production Hardware

At lower throughputs, lock overhead becomes less of a factor. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

  • Again, the touted ABQ is matched or bested by LBQ in many configurations. It’s very, very interesting to me that GC on such fast-moving LBQs isn’t a massive issue. That said, for these tests I was running 30GB heaps, so the new generation was rather large, and the nodes of the linked list are extremely short-lived. Don’t write off LBQ, especially with higher producer/consumer counts!
  • Again, it’s simply stunning how much of a difference a single added or removed producer or consumer can make on the total throughput. Our production machines have enough hardware threads to cover the worker threads, so it’s unlikely that resource starvation is a problem here. It seems that application performance can suffer immensely from simple lock convoys caused by too many workers. Comparing the v1 and v2 plots, it’s clear that the queue lock(s) can’t support any more contention from new workers at high throughputs. Adding more workers at 100k/sec completely guts performance in a way that simply does not occur at 25k/sec. Lock overhead can destroy performance at high throughputs!
  • The “best” worker configurations for v1 are, from a performance perspective, incompatible with v2 workloads and vice versa. It is absolutely crucial to distinguish and separate the different types of workloads. Trying to run them together will lead to misleading and muddled results. Tease them apart and you can fairly easily optimize both. For instance, LBQ-LBQ seems to work best with 2 summarization workers for v1 workloads. However, that configuration sacrifices 50% of peak performance on v2 workloads, which worked best with 3 or 4 summarization workers. The way this separation is implemented in production is via a rule in our event routing layer: all v1 messages are routed to one Summarizer and all v2 messages are routed to another. If that kind of separation isn’t possible, it’s probably worth instantiating two different queues and balancing worker pools separately, instead of trying to lump all the events together.
  • Java 6 to Java 7 bought us nothing on this hardware. You may note that under some configurations, performance appears to dip slightly under Java 7, but that’s slightly misleading because I’ve used averages of throughputs in these plots for visual clarity. The performance “dip” easily falls within the jitter of the raw data.

The problem was that despite these improvements I hadn’t reached my stated goals. It was time to look a bit further afield than java.util.concurrent.

Disruptor

I’d mentioned that Martin Thompson’s Mechanical Sympathy blog had been inspiration for some of our design choices. It was also an excellent introduction to LMAX’s Disruptor, which can be used to simulate a graph of bounded queues. Since it advertised vast improvements in throughput over LBQ and ABQ, I decided to give it a shot.

Side note: Yes, I know the Disruptor is meant to be used when the actual bytes of data are in the RingBuffer‘s entries, as opposed to just references. No, I can’t do that easily because we have variable message sizes and using the max size as an upper bound for the entries would make the buffer either too small (in entry count) or too large to fit into L3, as advised. If I get desperate, I might consider re-architecting the application to move to a smaller message representation and move the deserialization into the “business logic” as suggested by the first link. The following results and analysis are NOT an endorsement or condemnation of the Disruptor under any kind of rigorous testing regimen. I wanted to see if I could use it as a slot-in replacement for our queues, nothing more, nothing less.

I tried out the Disruptor in the least invasive way I could think of: one Disruptor per summarization worker. Each summarization worker would have a RingBuffer<IInputEvent> that would be fed off of the various message parser workers. This fits nicely because it supports an easy MPSC configuration with the MultiThreadedClaimStrategy. I considered using it for the message parsing queue, but the hoops I’d having to jump through to stripe the RingBuffer to allow an MPMC configuration just seemed like overkill for a preliminary test. I tried out various WaitStrategys but the results shown below are from the ‘busy-spin’ strategy, which gave the best throughput.

Summarizer Architecture with Message Parsing Queue and Disruptor Summarization Queues

Disruptor as a slot-in replacement for an MPSC BlockingQueue.

Disruptor V1 Throughput on Production Hardware

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

Disruptor V2 Throughput on Production Hardware

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

The results here were unsurprising: the Disruptor did not magically fix my problems. It performed quite well on our production hardware, matching the best results for both v1 and v2 messages, but ended up utilizing more CPU resources despite being allocated an equal number of threads, regardless of WaitStrategy. This brings up another interesting point: I had the choice of using put/take or offer/poll on our BlockingQueues and ended up choosing put/takefor the same reason. A marginal increase in throughput didn’t seem worthwhile if the tradeoff was having every thread in a busy spin consuming 100% of its core. For us, even a 10% performance increase wasn’t enough to justify the decreased visibility into the “true” utilization of the CPU resources.

Hardware as a crutch

I was left in a pickle. Short of re-architecting around the “ideal” Disruptor workflow or reworking the way the summarization workers shared work (allowing N workers to 1 summary as well as 1 worker to N summaries) I was without a quick software option for fixing this. So, like the lazy clod that I am, I turned to hardware to buy some more time. I happened to have a faster, more modern CPU on hand, so I gave that a spin. The baseline v2 message throughput was 20k/sec, for reference.

V2 Throughput on Modern Hardware

The facet labels are [Message Queue Impl-Summarization Queue Impl, JDK].

Talk about hardware making a difference! Moving onto fresh hardware can literally double performance, without a doubling in clock speed.

  • Though the Disruptor configurations gave the best results, the “mundane” LBQ-LBQ ones only trailed them by 8%, using 2-4 fewer threads and nearly a full core’s less of CPU, at that. LBQ-LBQ also beat ABQ-ABQ out handily by about 10% in most configurations.
  • The performance benefits of the Java 7 Hotspot over Java 6 are clear on this newer hardware. I saw a 10-20% performance boost across the board. Notably its impact on the Disruptor configurations is more pronounced than on the others.

Note also that the optimal worker counts differ based on hardware, which is expected given the differences between Nehalem and Sandy Bridge. Every little bit of configuration seems to make a difference, and a meaningful one at that!

Major takeaways:

  1. Explore your configuration space: worker counts, JVMs, hardware. One step in any direction in any of those spaces can provide a meaningful performance boost.
  2. Separate your workloads! Tune for each workload!
  3. Don’t bother tuning queue size except for the purpose of jitter or keeping it in L3.
  4. Even if you don’t know what the black box at the bottom (or even the middle) of the stack is doing, you can still make progress! Experiment and plot and keep good notes!
  5. The Java 7 Hotspot offers a small but consistent performance improvement over Java 6 on newer hardware.

Efficient Field-Striped, Nested, Disk-backed Record Storage

At AK we deal with a torrent of data every day. We can report on the lifetime of a campaign which may encompass more than a year’s worth of data. To be able to efficiently access our data we are constantly looking at different approaches to storage, retrieval and querying. One approach that we have been interested in involves dissecting data into its individual fields (or “columns” if you’re thinking in database terms) so that we only need to access the fields that are pertinent to a query. This is not a new approach to dealing with large volumes of data – it’s the basis of column-oriented databases like HBase.

Much of our data contains nested structures and this causes things to start to get a little more interesting, since this no longer easily fits within the data-model of traditional column-stores. Our Summarizer uses an in-memory approach to nested, field-striped storage but we wanted to investigate this for our on-disk data. Google published the Dremel paper a few years ago covering this exact topic. As with most papers, it only provides a limited overview of the approach without covering many of the “why”s and trade-offs made. So, we felt that we needed to start from the ground up and investigate how nested, field-striped storage works in order to really understand the problem.

Due to time constraints we have only been able to scratch the surface. Since the community is obviously interested in a Dremel-like project, we want to make the work that we have done available. We apologize in advance for the rough edges.

Without further ado: Efficient Field-Striped, Nested, Disk-backed Record Storage (on GitHub).

Netty’s CodecEmbedder

We love Netty. It’s a great full-featured network framework for Java. One of the features that rounds out the framework is the CodecEmbedder. It allows you to test your encoders and decoders without any fuss using a offer-poll paradigm. For example, to test our Rsyslog decoder, we simply:

ChannelBuffer messageBuffer =
    ChannelBuffers.copiedBuffer("2011-06-30T00:00:03-07:00 some.host.agkn.net EVT_NM column1,column2\n", CharsetUtil.UTF_8);
RsyslogDecoder decoder = new RsyslogDecoder();
DecoderEmbedder<IRsyslogMessage> embedder = new DecoderEmbedder<IRsyslogMessage>(decoder);
    embedder.offer(messageBuffer);

IRsyslogMessage message = embedder.poll();
assertNotNull(message, "Decoded message");
assertEquals(message.getTimestamp(), "2011-06-30T00:00:03-07:00", "Timestamp");
assertEquals(message.getHostname(), "some.host.agkn.net", "Host");
assertEquals(message.getProgramname(), "EVT_NM", "Programname");
assertEquals(message.getBody(), "column1,column2", "Body");

One gotcha to watch out for (which always manages to bite me in the butt, and is the impetus for writing this post) is that handlers will only process the type of data that they understand. Data of other types is passed along completely untouched. For example, while the following successfully compiles, it throws a java.lang.ClassCastException: java.lang.String cannot be cast to IRsyslogMessage at embedder.poll():

RsyslogDecoder decoder = new RsyslogDecoder();
DecoderEmbedder<IRsyslogMessage> embedder = new DecoderEmbedder<IRsyslogMessage>(decoder);
    embedder.offer("2011-06-30T00:00:03-07:00 some.host.agkn.net EVT_NM column1,column2\n");

IRsyslogMessage message = embedder.poll();
assertNotNull(message, "Decoded message");

The ChannelPipeline that backs the embedder can handle any type of input object. In the above case the object offer‘d is a string which is simply passed through the RsyslogDecoder untouched and tries unsuccessfully to pop out of the poll as an IRsyslogMessage. As long as you always make sure that your offered object is understood by one of your handlers then the embedder will work as you expect.

Big Data Ain’t Fat Data: A Case Study

We’ve always had a hunch that our users stick to the same geographic region. Sure, there’s the occasional jet-setter that takes their laptop from New York to Los Angeles (or like Rob, goes Chicago to San Francisco) on a daily or weekly basis, but they’re the exception and not the rule. Knowing how true this is can simplify the way we work with user-centric data across multiple data centers.

When Rob asked me to find this out for sure, my first instinct was to groan and fire up Hive on an Elastic MapReduce cluster, but after a second, I heard Matt’s voice in my head saying, “Big Data isn’t Fat Data”. Why bother with Hadoop?

The Setup

If I was solving this problem on a small data-set, it’d be pretty straight-forward. I could write a Python script in about 10 minutes that would take care of the problem. It would probably look something like:

users = {}

for line in sys.stdin:
    user, data_center = parse(line)
    try:
        users[user].append(data_center)
    except KeyError:
        users[user] = [data_center]

total_users = len(users)
multiple_dc_users = len([u for u in users if len(users[u]) > 1])

Easy peasy. However, explicitly storing such a large hash-table gets a little problematic once you start approaching medium-sized data (1GB+). Your memory needs grow pretty rapidly – with M users and N data centers, storage is O(MN) – , and things start to get a little slow in Python. At this point there are two options. You can brute force the problem by throwing hardware at it, either with a bigger machine or with something like Hadoop. Or, we can put on our Computer Science and Statistics hats and get a little bit clever.

What if we turn the problem sideways? Above, we’re keeping a hash table that holds a set of data-center for each user. Instead, let’s keep a set of users per data-center, splitting the problem up into multiple hash tables. This lets us keep a small, fixed number of tables – since I’d hope any company knows exactly how many data centers they have – and spread the load across them, hopefully making the load on each table more tolerable. We can then check how many sets each user falls into, and call it a day.

data_centers = dict([(dc, set()) for dc in AK_DATA_CENTERS])

for line in sys.stdin:
    user, data_center = parse(line)
    data_centers[data_center].add(user)

# Get the total users by intersecting all of the data center sets
...

# Get all users who are in exactly one set by taking symmetric differences (XOR) of data-center sets
# and count the size of that set.
...

While this approach theoretically has better performance with the same O(MN) space requirements, with big enough data the space requirements of the problem totally dominate whatever improvement this approach would provide. In other words, it doesn’t matter how small each hash table is, you can’t fit 80GB of user IDs into the 8GB of RAM on your laptop.

It’s looking pretty bleak for the Clever Way of doing things, since what we really want is a magic hash table that can store our 80GB of user IDs in the memory on our laptops.

Bloom Filters

Enter Bloom Filters. A bloom filter is a fixed-size set data structure with two minor features/drawbacks:

  1. You can never ask a Bloom Filter for the set of elements it contains.
  2. Membership queries have a small, controllable, false-positive probability. Bloom filters will never return false negatives.

With a little bit of work, it’s pretty easy to substitute Bloom Filters for plain old hash tables in our sideways approach above. There’s a slight tweak we have to make to our algorithm to accommodate the fact that we can’t ever query a bloom filter for the elements it contains, but the idea remains the same.

The Payoff

Suppose now we’re keeping a bloom-filter of users per data center. The only thing we have to work around is the fact that we’ll never be able to recover the list of users we’ve added to each set. So, we’ll just deal with users each time we see them instead of deferring our counting to the end.

With that idea in the bag, there are really only a few things to worry about when a request comes in for a given data center.

  • Check the bloom filter for that data center to see if the user has been to that one before
  • Check the other bloom filters to see how many other data-centers that user has been to before
  • Count the number of total data-centers that user has seen before. If the user is new to this data center, and the user has seen exactly one other data center before, increment the multiple data center user counter
  • If the user has never seen any of your data centers before, that user is a completely new user. Increment the total number of users seen.
  • If the user has already seen this data-center, this user is a repeat. Do nothing!

We ran our version of this overnight. It took us one core, 8GB of RAM, and just under than 4 hours to count the number of users who hit multiple data centers in a full week worth of logs.

Not bad!

Never trust a profiler

A week or so ago I had mentioned to Timon that for the first time a profiler had actually pointed me in a direction that directly lead to a positive increase in performance. Initially Timon just gave me that “you’re just a crotchety old man” look (which, in most cases, is the correct response). I pointed him to Josh Bloch’s Performance Anxiety presentation which dives into why it is so hard (in fact “impossible” in Josh’s words) to benchmark modern applications. It also references the interesting paper “Evaluating the Accuracy of Java Profilers”.

Just last week I was trying to track down a severe performance degradation in my snapshot recovery code. I was under some time pressure so I turned on my profiler to try to point me in the right direction. The result that it gave was clear, repeatable and unambiguous and pointed me into the linear probing algorithm of the hash table that I am using. Since I had recently moved to a new hash table (one that allowed for rehashing and resizing) it was possible that this was in fact the root of my performance problem but I had my doubts. (More on this in a future post.) I swapped out my hash table and re-profiled. The profiler again gave me a clear, repeatable and unambiguous result that my performance woes were solved so I moved on. When we were able to test the snapshot recovery code on production snapshots, we found that the performance problems still existed.

My profiler lied to me. Never trust your profiler.

Streaming Algorithms and Sketches

Here at Aggregate Knowledge we spend a lot of time thinking about how to do analytics on a massive amount of data. Rob recently posted about building our streaming datastore and the architecture that helps us deal with “big data”. Given a streaming architecture, the obvious question for the data scientist is “How do we fit in?”. Clearly we need to look towards streaming algorithms to match the speed and performance of our datastore.

A streaming algorithm is defined generally as having finite memory – significantly smaller than the data presented to it – and must process the input in one pass. Streaming algorithms start pretty simple, for instance counting the number of elements in the stream:

counter = 0
for event in stream:
    counter += 1

While eventually counter will overflow (and you can be somewhat clever about avoiding that) this is way better than the non-streaming alternative.

elements = list(stream)
counter = len(elements)

Pretty simple stuff. Even a novice programmer can tell you why the second method is way worse than the first. You can get more complicated and keep the same basic approach – computing the mean of a floating point number stream is almost as simple: keep around counter as above, and add a new variable, total_sum += value_new. Now that we’re feeling smart, what about the quantiles of the stream? Ah! Now that is harder.

While it may not be immediately obvious, you can prove (as Munro and Paterson did in 1980) that computing exact quantiles of a stream requires memory that is at least linear with respect to the size of the stream. So, we’re left approximating a solution to the quantiles problem. A first stab might be sampling where you keep every 1000th element. While this isn’t horrible, it has it’s downsides – if your stream is infinite, you’ll still run out of space. It’s a good thing there are much better solutions. One of the first and most elegant was proposed by Cormode and Muthukrishnan in 2003 where they introduce the Count-Min sketch data structure. (A nice reference for sketching data structures can be found here.)

Count-Min sketch works much like a bloom filter. You compose k empty tables and k hash functions. For each incoming element we simply hash it through each function and increment the appropriate element in the corresponding table. To find out how many times we have historically seen a particular element we simply hash our query and take the MINIMUM value that we find in the tables. In this way we limit the effects of hash collision, and clearly we balance the size of the Count-Min sketch with the accuracy we require for the final answer. Heres how it works:

The Count-Min sketch is an approximation to the histogram of the incoming data, in fact it’s really only probabilistic when hashes collide. In order to compute quantiles we want to find the “mass” of the histogram above/below a certain point. Luckily Count-Min sketches support range queries of the type “select count(*) where val between 1 and x;“. Now it is just a matter of finding the quantile of choice.

To actually find the quantiles is slightly tricky, but not that hard. You basically have to perform a binary search with the range queries. So to find the first decile value, and supposing you kept around the the number of elements you have seen in the stream, you would binary search through values of x until the return count of the range query is 1/10 of the total count.

Pretty neat, huh?

Follow

Get every new post delivered to your Inbox.

Join 209 other followers