HLL Intersections

Why?

The intersection of two streams (of user ids) is a particularly important business need in the advertising industry. For instance, if you want to reach suburban moms but the cost of targeting those women on a particular inventory provider is too high, you might want to know about a cheaper inventory provider whose audience overlaps with the first provider. You may also want to know how heavily your car-purchaser audience overlaps with a certain metro area or a particular income range. These types of operations are critical for understanding where and how well you’re spending your advertising budget.

As we’ve seen before, HyperLogLog provides a time- and memory-efficient algorithm for estimating the number of distinct values in a stream. For the past two years, we’ve been using HLL at AK to do just that: count the number of unique users in a stream of ad impressions. Conveniently, HLL also supports the union operator ( \cup ) allowing us to trivially estimate the distinct value count of any composition of streams without sacrificing precision or accuracy. This piqued our interest because if we can “losslessly” compute the union of two streams and produce low-error cardinality estimates, then there’s a chance we can use that estimate along with the inclusion-exclusion principle to produce “directionally correct” cardinality estimates of the intersection of two streams. (To be clear, when I say “directionally correct” my criteria is “can an advertiser make a decision off of this number?”, or “can it point them in the right direction for further research?”. This often means that we can tolerate relative errors of up to 50%.)

The goals were:

  1. Get a grasp on the theoretical error bounds of intersections done with HLLs, and
  2. Come up with heuristic bounds around m, overlap, and the set cardinalities that could inform our usage of HLL intersections in the AK product.

Quick terminology review:

  • If I have set of integers A, I’m going to call the HLL representing it H_{A}.
  • If I have HLLs H_{A}, H_{B} and their union H_{A \cup B}, then I’m going to call the intersection cardinality estimate produced |H_{A \cap B}|.
  • Define the overlap between two sets as overlap(A, B) := \frac{|A \cap B|}{min(|A|, |B|)}.
  • Define the cardinality ratio \frac{max(|A|, |B|)}{min(|A|, |B|)} as a shorthand for the relative cardinality of the two sets.
  • We’ll represent the absolute error of an observation |H_{A}| as \Delta |H_{A}|.

That should be enough for those following our recent posts, but for those just jumping in, check out Appendices A and B at the bottom of the post for a more thorough review of the set terminology and error terminology.

Experiment

We fixed 16 overlap values (0.0001, 0.001, 0.01, 0.02, 0.05, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0) and 12 set cardinalities (100M, 50M, 10M, 5M, 1M, 500K, 100K, 50K, 10K, 5K, 1K, 300) and did 100 runs of each permutation of (overlap, |A|, |B|). A random stream of 64-bit integers hashed with Murmur3 was used to create the two sets such that they shared exactly min(|A|,|B|) \cdot overlap = |A \cap B|  elements. We then built the corresponding HLLs H_{A} and H_{B} for those sets and calculated the intersection cardinality estimate |H_{A} \cap H_{B}| and computed its relative error.

Given that we could only generate and insert about 2M elements/second per core, doing runs with set cardinalities greater than 100M was quickly ruled out for this blog post. However, I can assure you the results hold for much larger sets (up to the multiple billion-element range) as long as you heed the advice below.

Results

This first group of plots has a lot going on, so I’ll preface it by saying that it’s just here to give you a general feeling for what’s going on. First note that within each group of boxplots overlap increases from left to right (orange to green to pink), and within each plot cardinality ratio increases from left to right. Also note that the y-axis (the relative error of the observation) is log-base-10 scale. You can clearly see that as the set sizes diverge, the error skyrockets for all but the most similar (in both cardinality and composition) sets. I’ve drawn in a horizontal line at 50% relative error to make it easier to see what falls under the “directionally correct” criteria. You can (and should) click for a full-sized image.

Note: the error bars narrow as we progress further to the right because there are fewer observations with very large cardinality ratios. This is an artifact of the experimental design.

interAre_vs_cardF_all_800

A few things jump out immediately:

  • For cardinality ratio > 500, the majority of observations have many thousands of percent error.
  • When cardinality ratio is smaller than that and overlap > 0.4, register count has little effect on error, which stays very low.
  • When overlap \le 0.01, register count has little effect on error, which stays very high.

Just eyeballing this, the lesson I get is that computing intersection cardinality with small error (relative to the true value) is difficult and only works within certain constraints. Specifically,

  1. \frac{|A|}{|B|} < 100, and
  2. overlap(A, B) = \frac{|A \cap B|}{min(|A|, |B|)} \ge 0.05.

The intuition behind this is very simple: if the error of any one of the terms in your calculation is roughly as large as the true value of the result then you’re not going to estimate that result well. Let’s look back at the intersection cardinality formula. The left-hand side (that we are trying to estimate) is a “small” value, usually. The three terms on the right tend to be “large” (or at least “larger”) values. If any of the “large” terms has error as large as the left-hand side we’re out of luck.

Overlap Examples

So, let’s say you can compute the cardinality of an HLL with relative error of a few percent. If |H_{A}| is two orders of magnitude smaller than |H_{B}|, then the error alone of |H_{B}| is roughly as large as |A|.

|A \cap B| \le |A| by definition, so

|A \cap B| \le |A| \approx |H_{A}| \approx \Delta |H_{B}|.

In the best scenario, where A \cap B = A, the errors of |H_{B}| and |H_{A \cup B}| \approx |H_{B}| are both roughly the same size as what you’re trying to measure. Furthermore, even if |A| \approx |B| but the overlap is very small, then |A \cap B|  will be roughly as large as the error of all three right-hand terms.

On the bubble

Let’s throw out the permutations whose error bounds clearly don’t support “directionally correct” answers (overlap < 0.01 and \frac{|A|}{|B|} > 500 ) and those that trivially do (overlap > 0.4 ) so we can focus more closely on the observations that are “on the bubble”. Sadly, these plots exhibit a good deal of variance inherent in their smaller sample size. Ideally we’d have tens of thousands of runs of each combination, but for now this rough sketch will hopefully be useful. (Apologies for the inconsistent colors between the two plots. It’s a real bear to coordinate these things in R.) Again, please click through for a larger, clearer image.

interAre_vs_cardF_good_800

By doubling the number of registers, the variance of the relative error falls by about a quarter and moves the median relative error down (closer to zero) by 10-20 points.

In general, we’ve seen that the following cutoffs perform pretty well, practically. Note that some of these aren’t reflected too clearly in the plots because of the smaller sample sizes.

Register Count Data Structure Size Overlap Cutoff Cardinality Ratio Cutoff
8192 5kB 0.05 10
16384 10kB 0.05 20
32768 20kB 0.05 30
65536 41kB 0.05 100

Error Estimation

To get a theoretical formulation of the error envelope for intersection, in terms of the two set sizes and their overlap, I tried the first and simplest error propagation technique I learned. For variables Y, Z, ..., and X a linear combination of those (independent) variables, we have

\Delta X = \sqrt{ (\Delta Y)^2 + (\Delta Z)^2 + ...}

Applied to the inclusion-exclusion formula:

\begin{array}{ll} \displaystyle \Delta |H_{A \cap B}| &= \sqrt{ (\Delta |H_{A}|)^2 + (\Delta |H_{B}|)^2 + (\Delta |H_{A \cup B}|)^2} \\ &= \sqrt{ (\sigma\cdot |A|)^2 + (\sigma\cdot |B|)^2 + (\sigma\cdot |A \cup B|)^2} \end{array}

where

\sigma = \frac{1.04}{\sqrt{m}} as in section 4 (“Discussion”) of the HLL paper.

Aside: Clearly |H_{A \cup B}| is not independent of |H_{A}| + |H_{B}|, though |H_{A}| is likely independent of |H_{B}|. However, I do not know how to a priori calculate the covariance in order to use an error propagation model for dependent variables. If you do, please pipe up in the comments!

I’ve plotted this error envelope against the relative error of the observations from HLLs with 8192 registers (approximately 5kB data structure).

err_bounds_good_800

Despite the crudeness of the method, it provided a 95% error envelope for the data without significant differences across cardinality ratio or overlap. Specifically, at least 95% of observations satisfied

(|H_{A \cap B}| - |A \cap B|) < \Delta |H_{A \cap B}|

However, it’s really only useful in the ranges shown in the table above. Past those cutoffs the bound becomes too loose and isn’t very useful.

This is particularly convenient because you can tune the number of registers you need to allocate based on the most common intersection sizes/overlaps you see in your application. Obviously, I’d recommend everyone run these tests and do this analysis on their business data, and not on some contrived setup for a blog post. We’ve definitely seen that we can get away with far less memory usage than expected to successfully power our features, simply because we tuned and experimented with our most common use cases.

Next Steps

We hope to improve the intersection cardinality result by finding alternatives to the inclusion-exclusion formula. We’ve tried a few different approaches, mostly centered around the idea of treating the register collections themselves as sets, and in my next post we’ll dive into those and other failed attempts!


Appendix A: A Review Of Sets

Let’s say we have two streams of user ids, S_{A} and S_{B}. Take a snapshot of the unique elements in those streams as sets and call them A and B. In the standard notation, we’ll represent the cardinality, or number of elements, of each set as |A| and |B|.

Example: If A = \{1,2,10\} then |A| = 3.

If I wanted to represent the unique elements in both of those sets combined as another set I would be performing the union, which is represented by A \cup B.

Example: If A = \{1,2,3\}, B=\{2,3,4\} then A \cup B = \{1,2,3,4\}.

If I wanted to represent the unique elements that appear in both A and B I would be performing the intersection, which is represented by A \cap B.

Example: With A, B as above, A \cap B = \{2,3\}.

The relationship between the union’s cardinality and the intersection’s cardinality is given by the inclusion-exclusion principle. (We’ll only be looking at the two-set version in this post.) For reference, the two-way inclusion-exclusion formula is |A \cap B| = |A| + |B| - |A \cup B| .

Example: With A, B as above, we see that |A \cap B| = 2 and |A| + |B| - |A \cup B| = 3 + 3 - 4 = 2.

For convenience we’ll define the overlap between two sets as overlap(A, B) := \frac{|A \cap B|}{min(|A|, |B|)}.

Example: With A, B as above, overlap(A,B) = \frac{|A \cap B|}{min(|A|, |B|)} = \frac{2}{min(3,3)} = \frac{2}{3}.

Similarly, for convenience, we’ll define the cardinality ratio \frac{max(|A|, |B|)}{min(|A|, |B|)} as a shorthand for the relative cardinality of the two sets.

The examples and operators shown above are all relevant for exact, true values. However, HLLs do not provide exact answers to the set cardinality question. They offer estimates of the cardinality along with certain error guarantees about those estimates. In order to differentiate between the two, we introduce HLL-specific operators.

Consider a set A. Call the HLL constructed from this set’s elements H_{A}. The cardinality estimate given by the HLL algorithm for H_{A} is |H_{A}|.

Define the union of two HLLs H_{A} \cup H_{B} := H_{A \cup B}, which is also the same as the HLL created by taking the pairwise max of H_{A}‘s and H_{B}‘s registers.

Finally, define the intersection cardinality of two HLLs in the obvious way: |H_{A} \cap H_{B}| := |H_{A}| + |H_{B}| - |H_{A \cup B}|. (This is simply the inclusion-exclusion formula for two sets with the cardinality estimates instead of the true values.)

Appendix B: A (Very Brief) Review of Error

The simplest way of understanding the error of an estimate is simply “how far is it from the truth?”. That is, what is the difference in value between the estimate and the exact value, also known as the absolute error.

However, that’s only useful if you’re only measuring a single thing over and over again. The primary criteria for judging the utility of HLL intersections is relative error because we are trying to measure intersections of many different sizes. In order to get an apples-to-apples comparison of the efficacy of our method, we normalize the absolute error by the true size of the intersection. So, for some observation \hat{x} whose exact value is non-zero x, we say that the relative error of the observation is \frac{x-\hat{x}}{x}. That is, “by what percentage off the true value is the observation off?”

Example: If |A| = 100, |H_{A}| = 90 then the relative error is \frac{100 - 90}{100} = \frac{10}{100} = 10\%.

Adventures in Concurrency

 

The Past

The Summarizer, our main piece of aggregation infrastructure, used to have a very simple architecture:

  1. RSyslog handed Netty some bytes.
  2. A Netty worker turned those bytes into a String.
  3. The Netty worker then peeled off the RSyslog envelope to reveal a payload and an event type. We call this combination an IMessage.
  4. The IMessage‘s payload got turned into an IInputEvent (basically a POJO).
  5. The IInputEvent was mapped to one or many summaries, based on its event type, and which would then be updated with the event.
Original Summarizer Architecture with Summarization Lock

Netty workers contend for a central summarization lock before updating summaries.

All of this work was done inside the Netty workers, and the synchronization of the summary objects was handled by a single lock on all of them. Some events were mapped to a single summary, others to a half dozen. Luckily the payloads were simple (CSV-formatted) and the summaries were even simpler. It didn’t really matter if the events hit one summary or ten, we could summarize events much faster than we could parse them. Under these conditions we could handle 200k messages per second, no sweat.

Slowly, new reporting features were added and the summaries became more complex. The number of operations per event increased, and throughput dropped to 100k/sec. Progressively, summarization supplanted parsing as the bottleneck.

Then we introduced a more complex, nested JSON event format (v2 messages) in order to support new product features. Complex, nested events meant ever more complex, nested summaries, which meant ever more time holding the single lock while updating them. Parsing time increased with the new payload format, but was still far faster than updating the summaries. Throughput for v1 messages had dipped to 60k/sec, and 10k/sec for the v2 messages.

Moreover, the new features the v2 messages permitted weren’t simply an open-ended exercise: with them came the customers that demanded those features and their additional traffic.  The Summarizer simply wouldn’t stand up to the predicted traffic without some work. This post is an overview of the multithreaded solution we used and hopefully will provide some insight into the pitfalls of concurrency in Java 6 and 7.

Objective

Get v1 message throughput back to 200k/sec and v2 throughput to 100k/sec, ideally on our production hardware. Simple enough, given that I knew the main bottlenecks were summarization and to a lesser extent the parsing of the IMessages to IInputEvents.

Let’s put in some queues

The basic premise of concurrency is that you find the time-consuming bits of work, throw some queues and workers at them, and out comes performance, right? (Wrong! But I’ve got a whole narrative going here, so bear with me.) The natural places for inserting these queues seemed to be between steps 3 and 4, and steps 4 and 5. If parsing IMessages to IInputEvents and summarizing IInputEvents are the only time-consuming work units, adding concurrency there should open up those bottlenecks. According to the “train book” I had three options for queues:

  • ArrayBlockingQueue (henceforth ABQ) – bounded, backed by an array (duh), uses a single ReentrantLock
  • LinkedBlockingQueue (henceforth LBQ)- bounded, backed by a linked list (duh), uses two ReentrantLocks (one for the head and one for the tail)
  • ConcurrentLinkedQueue (henceforth CLQ)- unbounded, backed by a linked-list, uses no locks, instead relies on a work-stealing algorithm and CAS
Queued Summarizer Architecture with BlockingQueues

IMessage to IInputEvent parsing and IInputEvent summarization are buffered by BlockingQueues.

We added a message parsing queue to which the Netty workers would dump IMessages. Message parser workers would take those IMessages and turn them into IInputEvents. They would then distribute those IInputEvents to a summarization queue I added to each summarization worker. Since I didn’t want to lock each report object, I decided that only a single summarization worker would ever write to a particular summary. (Martin Thompson’s blog posts about the Single Writer Principle were inspiration for this.) That is, each summarization worker would be assigned (by round-robin at startup) one or many summaries to own exclusively. So, in total I added one multiple-producer, multiple-consumer (MPMC) message parsing queue and N multiple-producer, single-consumer (MPSC) summarization queues (one for each summarization worker).

The First Bottleneck: Parsing

I slotted in the various queues available, replayed some traffic to get a feel for what was going on.

  • The message parsing queue was always full, which confirmed my suspicion that parsing, not Netty, was the first bottleneck.
  • The summarization queues were split between two groups: those that were always full and always empty. The reason was clear: some summarization workers were assigned high-volume summaries and others low-volume summaries.

This was my first lesson in queueing: queues are on average completely full or empty because it is nearly impossible to perfectly balance production/consumption rates. Which leads to the second lesson: CLQ (well, any unbounded queue) probably shouldn’t be used as a producer/consumer queue because “completely full” means “always growing” for an unbounded queue. Naturally, that’s an OK situation when consumption outpaces production, but in that scenario I wouldn’t have needed the queue in the first place. I needed back-pressure and only the blocking (in this case, bounded) queues could give me that.

In order to address the parsing bottleneck, I wanted to get a better grasp of the IMessage to IInputEvent throughput rate under different configurations. I constructed a test in which Netty workers would either:

  • do all the parsing work themselves, and then discard the message, or
  • would enqueue IMessages to the message parsing queue (either ABQ or LBQ), and parser workers would dequeue, parse, and then discard the IInputEvent. CLQ was not included here since it would consistently OOM as the queue grew without bound.

Each Netty worker would be responsible for a single connection and each connection could provide as many as 150k messages per second. Results for v1 and v2 message parsing were nearly identical, as were the results for Java 6/7, so they are presented here without distinction.

Netty-Only Message Parsing

When Netty did all of the parsing work, throughput maxed out at about 130k/sec.

 

Queued Message Parsing Throughput

Message parsing throughput with a BlockingQueue between the Netty workers and the message parser workers. The facet labels are [Queue Implementation, Queue Size].

  • Without a queue, throughput topped out at 130k/s. With a queue and the right parser worker count, each of the four Netty workers could produce 60k/sec worth of IMessages. Notably, neither situation provoked anywhere near 100% saturation on (# netty worker + # parser worker) cores, so I have to believe that it’s simply a matter of having dedicated parsing threads that are not affected by the context switching required to read from the network. Say context switching takes up r% of your time, then 5 netty workers can do at most w_0 = 5(1-r/100) units of work. However, 4 Netty workers and 1 parser worker can do w_1 = 4(1-r/100) + 1 > w_0 units. The fact that 4 Netty workers + 1 parser worker yields about 130-150k/sec, which is a small gain over just 5 Netty workers, suggests this. It might also be a matter of code “locality”: by allowing each thread to focus on a narrower scope of work, better branch prediction or compilation may be possible.
  • ABQ, touted as the end all of high-performance Java queues, gave us “atrocious” throughput, compared to LBQ, if more than two consumer threads were hitting it. This was surprising until I poked an active VM with SIGQUIT a few hundred times only to find that most of the workers were waiting on the ABQ’s ReentrantLock. The difference between two and three consumers hammering that lock amounted to a 50% drop in throughput.
  • LBQ’s split lock seemed to degrade more gracefully in the presence of “extra” producers or consumers. Specifically, the overhead of GC and a linked-list (vs. array) was less than that produced by lock contention on ABQ’s single lock. More remarkably, 2-8 parser workers always produced better results than a single parser worker, so a misconfiguration here couldn’t really do worse than revert to the 1 worker scenario. ABQ was not so lenient, however, dropping to throughput numbers lower than the Netty-only setup after 2 parser workers.
  • Queue size was largely irrelevant compared to the impact of adding or removing even a single producer/consumer. As long as the queue is large enough to buffer jitters, there’s really little point in spending hours tuning it.

Progress! At least I knew I could parse the 200k messages per second I needed.

The Second Bottleneck: Summarization

I knew that I wouldn’t be able to preserve the full parsing throughput simply because the queuing/dequeuing latency of another queue would always be present. The tradeoff was going to be adding more summarization workers at the cost of more time spent by the message parsing workers distributing the newly parsed IInputEvents to all relevant summarization workers. Each event would likely be distributed to more than one summarization worker, which meant a sequential lock acquisition for each summarization worker.

The cost of delivery was affected by the number of message parser workers, the number of summaries, the number of summarization workers, as well as the fan-out factor of each particular event, and hence on the proportions of different events to each other in a “nominal” data stream. This seemed like too many variables to isolate and too brittle of a measurement to be of any use. Instead, I threw out the fine-grained rigor and just plotted as many things as I could. I ran all the queues at one size: 2048.

BlockingQueue V1 Throughput on Production Hardware

At high throughputs, adding more workers simply makes things worse. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

BlockingQueue V2 Throughput on Production Hardware

At lower throughputs, lock overhead becomes less of a factor. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

  • Again, the touted ABQ is matched or bested by LBQ in many configurations. It’s very, very interesting to me that GC on such fast-moving LBQs isn’t a massive issue. That said, for these tests I was running 30GB heaps, so the new generation was rather large, and the nodes of the linked list are extremely short-lived. Don’t write off LBQ, especially with higher producer/consumer counts!
  • Again, it’s simply stunning how much of a difference a single added or removed producer or consumer can make on the total throughput. Our production machines have enough hardware threads to cover the worker threads, so it’s unlikely that resource starvation is a problem here. It seems that application performance can suffer immensely from simple lock convoys caused by too many workers. Comparing the v1 and v2 plots, it’s clear that the queue lock(s) can’t support any more contention from new workers at high throughputs. Adding more workers at 100k/sec completely guts performance in a way that simply does not occur at 25k/sec. Lock overhead can destroy performance at high throughputs!
  • The “best” worker configurations for v1 are, from a performance perspective, incompatible with v2 workloads and vice versa. It is absolutely crucial to distinguish and separate the different types of workloads. Trying to run them together will lead to misleading and muddled results. Tease them apart and you can fairly easily optimize both. For instance, LBQ-LBQ seems to work best with 2 summarization workers for v1 workloads. However, that configuration sacrifices 50% of peak performance on v2 workloads, which worked best with 3 or 4 summarization workers. The way this separation is implemented in production is via a rule in our event routing layer: all v1 messages are routed to one Summarizer and all v2 messages are routed to another. If that kind of separation isn’t possible, it’s probably worth instantiating two different queues and balancing worker pools separately, instead of trying to lump all the events together.
  • Java 6 to Java 7 bought us nothing on this hardware. You may note that under some configurations, performance appears to dip slightly under Java 7, but that’s slightly misleading because I’ve used averages of throughputs in these plots for visual clarity. The performance “dip” easily falls within the jitter of the raw data.

The problem was that despite these improvements I hadn’t reached my stated goals. It was time to look a bit further afield than java.util.concurrent.

Disruptor

I’d mentioned that Martin Thompson’s Mechanical Sympathy blog had been inspiration for some of our design choices. It was also an excellent introduction to LMAX’s Disruptor, which can be used to simulate a graph of bounded queues. Since it advertised vast improvements in throughput over LBQ and ABQ, I decided to give it a shot.

Side note: Yes, I know the Disruptor is meant to be used when the actual bytes of data are in the RingBuffer‘s entries, as opposed to just references. No, I can’t do that easily because we have variable message sizes and using the max size as an upper bound for the entries would make the buffer either too small (in entry count) or too large to fit into L3, as advised. If I get desperate, I might consider re-architecting the application to move to a smaller message representation and move the deserialization into the “business logic” as suggested by the first link. The following results and analysis are NOT an endorsement or condemnation of the Disruptor under any kind of rigorous testing regimen. I wanted to see if I could use it as a slot-in replacement for our queues, nothing more, nothing less.

I tried out the Disruptor in the least invasive way I could think of: one Disruptor per summarization worker. Each summarization worker would have a RingBuffer<IInputEvent> that would be fed off of the various message parser workers. This fits nicely because it supports an easy MPSC configuration with the MultiThreadedClaimStrategy. I considered using it for the message parsing queue, but the hoops I’d having to jump through to stripe the RingBuffer to allow an MPMC configuration just seemed like overkill for a preliminary test. I tried out various WaitStrategys but the results shown below are from the ‘busy-spin’ strategy, which gave the best throughput.

Summarizer Architecture with Message Parsing Queue and Disruptor Summarization Queues

Disruptor as a slot-in replacement for an MPSC BlockingQueue.

Disruptor V1 Throughput on Production Hardware

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

Disruptor V2 Throughput on Production Hardware

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

The results here were unsurprising: the Disruptor did not magically fix my problems. It performed quite well on our production hardware, matching the best results for both v1 and v2 messages, but ended up utilizing more CPU resources despite being allocated an equal number of threads, regardless of WaitStrategy. This brings up another interesting point: I had the choice of using put/take or offer/poll on our BlockingQueues and ended up choosing put/takefor the same reason. A marginal increase in throughput didn’t seem worthwhile if the tradeoff was having every thread in a busy spin consuming 100% of its core. For us, even a 10% performance increase wasn’t enough to justify the decreased visibility into the “true” utilization of the CPU resources.

Hardware as a crutch

I was left in a pickle. Short of re-architecting around the “ideal” Disruptor workflow or reworking the way the summarization workers shared work (allowing N workers to 1 summary as well as 1 worker to N summaries) I was without a quick software option for fixing this. So, like the lazy clod that I am, I turned to hardware to buy some more time. I happened to have a faster, more modern CPU on hand, so I gave that a spin. The baseline v2 message throughput was 20k/sec, for reference.

V2 Throughput on Modern Hardware

The facet labels are [Message Queue Impl-Summarization Queue Impl, JDK].

Talk about hardware making a difference! Moving onto fresh hardware can literally double performance, without a doubling in clock speed.

  • Though the Disruptor configurations gave the best results, the “mundane” LBQ-LBQ ones only trailed them by 8%, using 2-4 fewer threads and nearly a full core’s less of CPU, at that. LBQ-LBQ also beat ABQ-ABQ out handily by about 10% in most configurations.
  • The performance benefits of the Java 7 Hotspot over Java 6 are clear on this newer hardware. I saw a 10-20% performance boost across the board. Notably its impact on the Disruptor configurations is more pronounced than on the others.

Note also that the optimal worker counts differ based on hardware, which is expected given the differences between Nehalem and Sandy Bridge. Every little bit of configuration seems to make a difference, and a meaningful one at that!

Major takeaways:

  1. Explore your configuration space: worker counts, JVMs, hardware. One step in any direction in any of those spaces can provide a meaningful performance boost.
  2. Separate your workloads! Tune for each workload!
  3. Don’t bother tuning queue size except for the purpose of jitter or keeping it in L3.
  4. Even if you don’t know what the black box at the bottom (or even the middle) of the stack is doing, you can still make progress! Experiment and plot and keep good notes!
  5. The Java 7 Hotspot offers a small but consistent performance improvement over Java 6 on newer hardware.

No BS Data Salon #3

On Saturday Aggregate Knowledge hosted the third No BS Data Salon on databases and data infrastructure. A handful of people showed up to hear Scott Andreas of Boundary talk about distributed, streaming service architecture, and I also gave a talk about AK’s use of probabilistic data structures.

The smaller group made for some fantastic, honest conversation about the different approaches to streaming architectures, the perils of distributing analytics workloads in a streaming setting, and the challenges of pushing scientific and engineering breakthroughs all the way through to product innovation.

We’re all looking forward to the next event, which will be in San Francisco, in a month or two. If you have topics you’d like to see covered, we’d love to hear from you in the comments below!

As promised, I’ve assembled something of a “References” section to my talk, which you can find below.

(Hyper)LogLog

Random

  • Sean Gourley’s talk on human-scale analytics and decision-making
  • Muthu Muthukrishnan’s home page, where research on streaming in general abounds
  • A collection of C and Java implementations of different probabilistic sketches

Attendees of the third No BS Data Salon

No BS Data Salon #2

On Saturday, our illustrious Chief Scientist Matt Curcio sat on the Frameworks, Tools, and Techniques for Scaling up Machine Learning panel at the second No BS Data Salon hosted by MetaMarkets. The discussion ranged from scaling the human aspect of ML and analytics to brass tacks about the difficulties of actually performing ML on web scale data sets.

The theme for this Salon was analytics, and just like last time the focus was on real use cases and a no-nonsense open discussion. We heard great presentations from Sean Gourley of Quid, Ian Wong of Square, and Metamarkets’ own Nelson Ray. Sean gave a great talk about the future of human-scale vs. machine-scale decision-making and analytics. Ian gave an informative overview of the challenges of growing a risk analysis team at Square. Nelson threw down with an awesome technical presentation about how to “A/B test anything”.

This second event in the series was certainly a worthwhile follow-up to the first! Thanks again to Metamarkets for the food, drink, and welcome. You won’t find better hosts than Mike and Nisha!

No BS Data Salon #2 attendees during Sean Gourley's PresentationNo BS Data Salon #2 attendees during Sean Gourley's PresentationNo BS Data Salon #2 attendees caffeinating in between presentations

Big Memory, Part 4

Author’s Note: This is part 4 of a series of posts about my adventures in building a “large”, in-memory hash table. This post is a summary of some pure Java hash table libraries.

Background

In my last post, I discussed the results of rerunning Nick Welch’s benchmark of C/C++ hash tables. However, since we use the JVM in production, those results were purely academic. I need to either verify or strike down my suspicions in the Java world.

For this test, I’m only concerned with primitive longlong mappings. For those following along, this is a departure from the methodology used in part 3. Here’s why.

Goals

I was hoping to replicate some fraction of the amazing throughput seen in the C/C++ benchmark. (A million inserts per second wouldn’t be a bad start!) More importantly, I sought to learn something about the relative performance of different schemes on the JVM. Would the bit-masking used in power-of-2-sized tables prove faster than the modulus operator used by prime-sized tables? Would the extra pointer hops of an externally chained implementation be more or less costly than the probe chains of open addressing schemes? Would Java even be able to handle a 128GiB hash table? What effect would GC have? What implementation tricks could be learned?

I’ll save you some time right now if you’re actually looking for the exact answers to all those questions: I don’t have most of them. I’ll hopefully get to them in later posts. The available, open-source primitive hash maps simply did not cover enough of the algorithm space to reliably demonstrate the isolated effect of a particular design choice such as power-of-2- vs prime-sized bucket arrays.

Instead, this post’s goals are more modest: I wish to answer the following questions:

  • What is the effect of load factor on the performance of these maps?
  • Given the out-of-the-box implementations available, which designs are the most appealing?
  • What optimizations can be applied to their implementation, given my requirements?
  • Are any of these suitable for my needs right out of the box?

Candidates

Lib. Ver. Rel. Date Class Coll. Res. No. Buckets Hash Fun. Bytes/Entry
mahout-collections 1.0.0 2010 OpenLongLongHashMap double hashing prime [1] 17
Trove 3.0.1 2011 TLongLongHashMap double hashing prime [2] 17
PCJ 1.2 2003 LongKeyLongOpenHashMap double hashing prime [2] 17
fast-util 6.4.1 2011 Long2LongOpenHashMap linear probing 2n MurmurHash3 17
hppc 0.4.1 2011 LongLongOpenHashMap linear probing 2n MurmurHash3 17
PCJ 1.2 2003 LongKeyLongChainedHashMap external chaining prime [2] 48
JDK 1.6.0_27 2011 HashMap<Long, Long> external chaining 2n [3] 100NOTE

[1] value ^ (value >> 32) [2] value ^ (value >>> 32) [3] h ^= (h >>> 20) ^ (h >>> 12); h ^ (h >>> 7) ^ (h >>> 4);

Setup

  • 2 warmup runs, 2 observation runs for each combination of (Class, Load Factor, Size Hint)
  • Centos 5.4, dual-Xeon X7542  (2.67GHz, 6 cores each), 256 GB of RAM
  • 4.72 billion 64-bit integer (976 million uniques) keys inserted with random long values, read from disk
  • baseline read/parse rate was roughly 10 million records per second
  • JVM flags: -server -Xmx128g -XX:+PrintGCTimeStamps
  • Code is on GitHub, added to the benchmark used in part 3. See the section of the README that discusses the “libraries” comparison.

The runs completed are listed below, as are some odd failures that occurred in testing.

The test pushed several hash tables to their breaking point, since they either imposed artificial limits on the backing array length (usually 230 ~ 1.074B) or they ran into Java’s built-in limit, Integer.MAX_VALUE, of 231-1 ~ 2.147B. Extremely high load factors were required to even get some tables to finish the benchmark, which I understand is suboptimal.

In general, I tried to run three different maximum load factors where possible: 0.50, 0.75, and 0.91. The first two are common default load factors for the hash tables in question, and the last one is the highest load factor such that 976,000,000/loadFactor < 230. What’s important to note here is that I actively avoided resizes during the runs by setting an initial capacity corresponding to the load factor. The number of “usable” buckets was held constant (at 976M) while the number of empty buckets varied in order to test the effect of load factor on performance.

Results

The broad strokes are that all of these libraries live between the 1M to 2M inserts per second range, with performance degrading as they approach their maximum load factors, some more than others. The drastically different bottom quartiles are what stand out to me.

Worst-cases for the chained maps are close to full stops whereas the open addressing maps’ throughputs fall by about 25%. HashMap<Long, Long> and LongKeyLongChainedHashMap clearly fall apart in a serious way on a regular basis, roughly every 50 million new keys. It’s not clear to me that the code would allow resizes/rehashes given the initial capacity provided, so my guess was that these were stop-the-world GCs.

I reran the test with -XX:+PrintGCTimeStamps -verbose:gc to verify, and 78% of the real time of the two observation runs was consumed by GC, with regular pauses up to 100 seconds. For contrast, Long2LongOpenHashMap‘s GCs amounted to less than 1% of real time with no pause greater than 0.2 seconds (with the exception of the Full GC that occurred between observation runs). I computed those percentages by dividing the sum of the reported GC durations by the ‘real’ value from time(1).

It makes sense that GC pauses would uniquely affect the externally-chained implementations, since they produce so many objects. (All the open addressing implementations only create a few handfuls of objects per map instance.) In any case, that problem alone disqualifies it for my uses: such significant dips in throughput are unacceptable. Indeed, any externally-chained map will be suspect at these sizes because of the need to create a new object per entry. Perhaps different collectors can mitigate this problem, but given the extremely sound performance of the open addressing implementations, I’m inclined to table the externally chained maps for a while. The operational advantage of simply not worrying about tuning GC is huge for me.

The two most interesting maps proved to be HPPC’s LongLongOpenHashMap and fastutil’s Long2LongOpenHashMap. Both were blazing fast and quite stable, with minor throughput degradation only after 700 million keys. LongKeyLongOpenHashMap, TLongLongHashMap, and OpenLongLongHashMap began their degradation right from the get-go, dropping by over 50% over the course of the test. fastutil’s implementation’s variance was also extremely low, which is a very admirable quality from an operational perspective.

It’s fascinating that a design as straightforward as power-of-two bucket counts and linear probing could be so amazingly effective. It’s the classic example of a ‘flawed’ open addressing scheme: it’s susceptible to primary clustering, the number of probes shoots up as you go past 90% occupancy, and it’s unforgiving of mediocre hash functions.

After thinking about it, though, it makes great sense that it performed so well here. Both favorites use a byte[] to store the state of each bucket, which means that each 64-byte cache line has on average the next 32 bucket statuses pre-cached. This means that walking the linear probe chain is effectively free for the next 32 buckets (compared to going to L1 or, heaven-forbid, main memory). At 90% occupancy, the average successful lookup requires 5.5 probes while unsuccessful lookups require 50 probes. This means that on average we’re fetching at most two cache lines for the probing. Compare this to double hashing, which by design is meant to probe far from the original hash, which requires a line fetch per probe. At 2.5/10 average probes for successful/unsuccessful lookups, you’ll be doing far more fetches with double hashing than with linear probing. If you roll your own status array on top of a long[] and some masking tricks, you could potentially fit 256 2-bit statuses on each cache line, further improving linear probing’s efficiency. ($ cat /sys/devices/system/cpu/cpu0/cache/index0/size to get your cache line size in bytes.)

If you combine linear hashing with a good hash function, like Murmur3, you can avoid primary clustering in practice. What I find curious is that so many of the double-hashing libraries use such a weak hash function. Sure it’s only 2 ops, but is the hash function the real bottleneck in their code? Can they really not afford the extra 6 ops? I doubt it. I’m going to add some instrumentation to a few of these implementations to do probe length histograms to verify what I’ve speculated. Stay tuned.

Conclusions

In order to meet the requirements I’ve enumerated in my previous posts, I’m going to need more than 976 million occupied buckets. I’m going to need 1.5 billion, and a fraction of that again for empty buckets. Unfortunately, 230 is just a bit over 1.073 billion, and well short of 1.5 billion. This means that I can’t use one of the two favorites from my testing (fastutil’s Long2LongOpenHashMap) out of the box. Luckily, HPPC’s implementation does not have an artificial cap on the maximum number of buckets, so moving forward that will be a viable out-of-the-box option. (Edit: It has the same 230 cap as fastutil.) In order to complete my testing, I’ll likely dig up another month’s worth of data and run HPPC and modified variants of the other open addressing hash tables up to about 2 billion keys.


Mapping types  [UP]

In part 3 of this series of posts, I tested out several hash table “services” to see if any of them would give me the end-to-end storage functionality I needed for this project. In that test, I used an append semantic for the performance comparison because I was hoping that one of those “services” would solve all of my storage needs in one shot. Ideally, such a service would provide a fast algorithm, efficient memory management, and solid monitoring facilities. Sadly, all of them failed on the first count and I was forced to move onto a more hands-on approach.

This means worrying about the actual storage implementation and its interaction with the JVM’s GC. If I take the naive route and simply allocate a new long[] for every user’s activity, the memory allocation time alone could cripple the application. It has become clear that storing 1.5 billion long-lived long[]s (about 80GB’s worth) on a high-occupancy heap will cause unacceptable GC pauses. As such, I’ll be storing the user activity chains in a slab allocator (of my own construction) and using the hash table to translate between user IDs and their storage reference. Thus, I’m transitioning from benchmarking appends to puts, and I’m now mapping from long to long instead of long to long[]. Since the slab allocator will return roughly random longs as storage references, I’ve simulated the values of the hash table with a call to Random#nextLong().

An Aside on HashMap<Long, Long> and External Chaining  [UP]

Unlike the other candidates, HashMap<Long, Long> stores boxed keys and values, wrapping each pair in an Entryobject.

    public final class Long extends Number implements Comparable {
        ...
        private final long value;
        ...
    }

    static class Entry implements Map.Entry {
        final K key;
        V value;
        Entry next;
        final int hash;
        ...
    }

On a 64-bit JVM with a starting heap greater than 32GB, each of our 976 million unique users (wrapped in an Entryinstance) requires:

  • 8 bytes for the pointer to the Entry from the backing Entry[],
  • 16 bytes for the object header of the Entry,
  • 8 bytes for the pointer to final K key,
  • 8 bytes for the pointer to V value,
  • 8 byte pointer to Entry next,
  • 4 bytes for int hash,
  • 16 bytes for the object header of final K key,
  • 8 bytes for the underlying long of final K key,
  • 16 bytes for the object header of V value, and
  • 8 bytes for the underlying long of V value

for a grand total of 100 bytes per unique key. This means that at the default 0.75 load factor, you’re looking at 97,600,000,000 bytes for the used buckets and 2,600,000,000 bytes for the empty buckets. 93 GiB later and I haven’t even stored the events themselves. Since boxing has its own CPU costs, I’m not expecting much in terms of performance or memory footprint from HashMap. It’s included here mostly as a baseline, but also to demonstrate it’s unsuitability for this scale of work. The same goes for LongKeyLongChainedHashMap, minus the boxing. Even at a more modest 48 bytes per entry, it’s still using 55 GiB plus the cost of maintaing a billion items on the heap. External chaining is simply unviable at these sizes.

Runs  [UP]

For each load factor, the initial capacity of the table was hinted to be at least 976,000,000/loadFactor in order to avoid resizes, where possible. However, Trove and fastutil automatically do this computation by requesting the number of unique elements expected, instead of a lower bound on the initial bucket count. They were given 976,000,000 as their starting size hint regardless of load factor. As a last resort, 976,000,000 was used as the size hint when a map could or would not finish a test run.

The “Constructor” column indicates whether the constructor expects the initial capacity (“IC”) or the expected elements count (“EE”).

Lib. Class Constructor LF Size Hint
mahout-collections OpenLongLongHashMap IC 0.91 1,073M
0.75 1,302M
0.50 1,952M
Trove TLongLongHashMap EE 0.91 976M
0.75 976M
0.50 976M
PCJ LongKeyLongOpenHashMap IC 0.91 1,073M
0.75 1,302M
0.50 1,952M
fast-util Long2LongOpenHashMap EE 0.91 976M
hppc LongLongOpenHashMap IC 0.91 1,073M
PCJ LongKeyLongChainedHashMap IC 0.91 976M
JDK HashMap<Long, Long> IC 0.91 976M
0.75 1,302M
0.50 1,952M

Oddities  [UP]

When running initial tests, the configuration (TLongLongHashMap, 0.50, 976M) forced a catastrophic six-hour-long rehash at roughly 1.78 billion records in. At first I suspected a pathological probe chain followed by a rehash, however, I could not reproduce this particular case despite using identical data and identical runtime parameters. This is utterly horrifying from an operational perspective, but I’m willing to write this off as some artifact of the machine or perhaps the screen session. In another initial test, the configuration (LongKeyLongChainedHashMap, 0.50, 976M) crashed the Parallel Mark-Sweep garbage collector thread. Again, I couldn’t reproduce the problem despite my best efforts, so I’m writing it off as an artifact. It could have been the 128GB heap, it could have been the hundreds of millions of objects, it could have been anything. Sadly, I don’t even have a reproducible test case to give the JVM team, so I don’t foresee much progress on the bug.

Big Memory, Part 3.5: Google sparsehash!

Author’s Note: This is part 3.5 of a series of posts about my adventures in building a “large”, in-memory hash table. This post is a handful of observations I made while running someone else’s benchmark of C/C++ hash table implementations.

I ran across Nick Welch’s Hash Table Benchmarks during my research and decided to rerun a subset of his benchmark for much larger key counts. (You can find my fork of his code on GitHub.)

The differences between the benchmarks he ran and the ones I ran are:

  • I’m using sparsehash 1.11 (vs. 1.5), Boost 1.41 (vs. 1.38), and Qt 4.6 (vs. 4.5).
  • I removed Ruby’s hash implementation because of its abysmal performance in his benchmark.
  • I increased the insertion count to 1.5 billion, and the step size to 100 million from 40 million/2 million.
  • I only provided the Random Inserts: Execution Time (integers) and the Memory Usage (integers) tests.

I did a best of three, using the recommended ionice/nice run command on a m2.4xlarge EC2 instance. (I added a dummy entry for Ruby’s hash so that the colors of my plots would match the originals.)

As Nick noted, Google’s sparsehash maps are the real deal. They sustain their performance across a broad range of key counts, offering strong tradeoffs between memory and speed. When memory usage must be low and allocation smooth, sparse_hash_map is the clear choice. Interestingly, it’s simply quadratic probing built on top of a dynamic array. Dynamic arrays only store addressed entries, so no matter how large the array address space may be, you only pay for the entries you’re using, plus some bookkeeping overhead. The dynamic array mitigates the memory cost of maintaining the low load factor required to operate a fast, large hash table with quadratic probing. At sparse_hash_map‘s default load factor of 0.8, a conventional array-backed quadratic probing scheme ‘wastes’ 20% of allocated memory. This benchmark maps to and from 64-bit integers, so at the claimed 1-2 bits of overhead per entry the dynamic array implementation only ‘wastes’ about 1% of allocated memory. It also allocates memory one entry at a time, so each insert is slower but requires constant time. Rehashes are the only allocation hiccup in this scheme.

A good review of the algorithm of the sparsetable used to back the sparse_hash_map is provided by the Google folks here, under insert() and in the source in sparsetable.cppsparsehashtable.h, and sparse_hash_map. You’ll also find an interesting claim about their particular choice of quadratic probing with triangular numbers, which I’ve been unable to find evidence for elsewhere. Regardless of the theoretical details, these benchmark numbers are enough to convince me of the efficacy of that choice.

In the opposite case where throughput is of paramount importance, dense_hash_map shines. It maintains a 0.5 load factor and stores entries in a regular C array, which means it has to pay for all the buckets it addresses (not just the ones it uses) and also leaves half of them empty. This avoids the costly bitmap population counts and the constant reallocation needed by the sparse_hash_map at the cost of infrequent, lumpy allocation and a large per-key overhead. So lumpy, in fact, that it couldn’t quite make it to the 1.5 billion key mark on the provided 68GB of memory.

The algorithm behind sparse_hash_map seemed to suit my needs perfectly: smooth allocation, compact footprint, and still plenty fast despite being the slowest of the group. I dug in a little further to get a feel for the throughput profile and ended up modifying the benchmark to report the amount of time between every million inserts up to 1.5 billion. I ran 4 warmup runs and 16 observation runs. This gave a familiar view, but with a twist. The throughput costs of the different allocation strategies were exposed:

(Note: Both plots share the bottom plot’s legend.)

The top plot shows the number of inserts per second, measured over 1-million-record intervals. Overall, the results are as expected: the sparse_hash_map and dense_hash_map sandwiching the other three, which may as well have been the same library. (They practically are: the versions of Boost, Qt, and GCC that I used are all prime-sized, chained hash tables.) The throughputs are pretty stable overall: dense_hash_map at around 4.5M inserts/sec, the unordered_maps and QHash at about 3M, and sparse_hash_map at about 1M.

The bottom plot shows the slowest million-record intervals, plotting the number of seconds needed to insert 1 million records. These correspond to the dips in throughput of the top plot. I can only assume these are the observations that spanned a resizing of the table. sparse_hash_map was the worst offender, taking more than three minutes to resize and rehash the table in the worst case, while dense_hash_map had the best worst-case at 43 seconds. The other three predictably fell somewhere in between. (See the five rightmost points on the bottom plot.)

A brief aside: it’s curious that GCC’s std::unordered_map performed roughly 20% faster after its final reallocation (near the 700M record mark) than both Boost’s unordered_map and Qt’s QHash. I’ll have to dig in later to figure out what’s actually happening there, but I suspect it has to do with the simple fact that it reallocates to a much larger memory footprint than the other two, leaving it with a much lower load factor through that section of the test.

In any case, the valuable lesson for me was that unless you properly allocate the initial table size, you will be forced to suffer catastrophically slow resizes, even in C/C++. But what recourse do you have if your table crosses the rehash point between pruning? This also raises important questions about most open-addressing schemes’ need to periodically rehash to clear tombstone entries. Is it even possible to initially allocate a table under these schemes that will be able to balance insertions and deletions without either being incredibly wasteful or forcing a rehash? I suppose it depends deeply on your workload. There are some promising proposals such as hopscotch hashing that are meant to accomodate very high load factors, but it remains to be seen if the algorithm is resilient to long periods of constant use and my particular workload. Another issue I foresee with open addressing is the concurrence of a pathological probing sequence with a hot key (my workload is append-heavy and highly variable from key to key). When combined, you could see meaningful but hard-to-diagnose dips in throughput, which is a worst-nightmare of sorts. The extra pointers needed for a chained hash table are looking more and more palatable if it provides better worst-case scenarios.

All this means that I’ll have to spend a great deal of time analyzing our traffic to solve for the correct size such that the deletions and insertions tally up correctly. I’ll also have to take into account maintaining an acceptable load factor if I use open addressing so that “standard” operational throughput doesn’t suffer either. This has wide-ranging implications for the garbage-collection/trimming algorithm I choose as well as the backing storage, but I’ll get into that in the next few posts.

Big Memory, Part 3

Author’s Note: This is part 3 of a series of posts about my adventures in building a “large”, in-memory hash table. Part 1 introduced our goals and our approach to the task at hand. This post is a summary of some candidate hash table “services”.

Goals

To recap, I need a hash table that can support the following:

  • 1.5 billion 64-bit keys, uniformly and randomly distributed
  • values between 16 bytes and 16 kilobytes, with sizes in a Zipfian distribution
  • deployed to one machine, all in-memory
  • sustained 200,000 writes per second over the course of many hours

The API should support a non-bulk, mutable, key-value interface with an append command.

The final requirement is that the source be obtainable. After all, this is just as much about finding a viable candidate as understanding how the results are achieved.

My approach to testing the initial viability of candidates was to replicate a subset of the required production load using some of our production logs. The test amounted to writing 212 million records to a bit over 78 million unique keys. Each record’s key is 8 bytes and its value 16 bytes. The value bytes are simply appended to the existing value corresponding to the key. This closely mimics our real write workload for the project.

Note that throughput and latency are the primary concerns here: we seek a consistently high write rate. Memory overhead, at this stage, is not under scrutiny. (This may strike some as odd, given the hard bounds on a single machine’s memory, but honestly the raw data set we’re seeking to store is easily within the bounds of the servers I described in my previous post. As long as nothing absurd is going on, we can afford to trade some memory for speed.)

Candidates

Given the API requirements, the candidates that immediately came to mind were:

  • Berkeley DB
  • Kyoto Cabinet
  • Redis
  • Memcached

Note that the scope here is restricted to hash table “services”, not hash table libraries. Specifically, I don’t want to manage memory, rehashing, growing, or shrinking. I’ll be covering libraries in the next post.

Under the hood these all use slightly different hashing and collision resolution schemes.

Berkeley DB uses an implementation of Litwin’s Extended Linear Hashing. In particular, it implements linear hashing using a hybrid split control: bucket overflow and load factor independently trigger splits. (Look for ffactor and do_expand in hash_page.c’s __ham_add_el().) Notably, BDB chains memory pages, not object pointers. This is a sensible optimization in a world where main memory is small and disk seeks are costly. However, the cost in code complexity is immense. For an idea of just how much attention to detail is required, download BDB’s source and check out hash_page.c’s __ham_replpair() and __ham_add_el(). It is fascinating to see how much work goes into managing the differences between small and large values. [1]

Kyoto Cabinet “boringly” uses the C++ stdlib’s std::unordered_map. I had trouble finding implementations other than GCC’s, so I can’t really speak to anything but that. The tr1/hashtable implementation uses chaining, with a prime bucket count and a max-load-factor-based rehashing policy. When the ratio of elements to buckets passes a certain threshold (1, I believe), a full stop-the-world rehash is performed. (_M_rehash() on line 1146) The resizing policy finds the smallest prime greater than twice the current number of buckets, and the table is resized. (_M_need_rehash() on line 455) The prime policy default can be seen here.

Redis implements its own hash table that uses chaining as well with a target load factor of 1. Interestingly, it rehashes the keys incrementally in the background, pushing updates to a new table while checking both the old and new tables for reads. The incremental work is spread over all subsequent reads and writes issued to the table. This is perhaps the easiest of the four implementations to fully understand on the first read.

Similar to BDB, memcached implements linear hashing, but it chains object pointers, not memory pages. It uses what the paper calls “load control[led]” splits, meaning that incremental rehashing occurs when the load factor exceeds a certain value. (In this case, 3/2.) Unlike Redis, it does the rehashing in an another thread in the background, not as a part of the read or write operations. assoc.c very nicely illustrates the gist of linear hashing with controlled splits; check out assoc_find() and assoc_insert(). Beware, assoc_expand() just sets up some state to signal incremental rehashing. The real guts of the rehashing is in assoc_maintenance_thread(). It is notable how much simpler the code for object-pointer chaining is than the page chaining used in BDB.

Ease of Use

Note: I’m talking specifically about ease of use from a developer perspective. I’m not qualified or interested in commenting on their operational merits here.

Without a doubt, the easiest candidate to set up, use, and analyze was Redis. Between the trivial build from source, the simplicity of the Jedis API, and the visibility provided by the INFO command, using Redis was a walk in the park. The redis.conf file has a lot of knobs but most of them can safely be ignored and the inline documentation is ample.

Kyoto Cabinet came in a close second. I forgot to set $JAVA_HOME before installing the Java bindings, which caused me some grief, but once I figured that bit out everything was right as rain. Instantiating and using it were painless if somewhat sparsely documented.

Memcached was actually a pain to use, not because of the daemon itself, but because of the client libraries available in Java. The fact that the append command required a CAS value in some clients and not in others was the main culprit. One qualm with the daemon itself is that an append command only succeeds if used on an existing key.

Finally, BDB was by far the most frustrating candidate. The configuration is arcane and poorly documented. The errors are undescriptive and often cryptic. Setting the proper combination of permissions for a client is exceedingly difficult unless you peruse the documentation with a very keen eye. The distinctions made between what configuration should be done on the EnvironmentConfig versus the DatabaseConfig is unclear and poorly documented. Despite specifying an in-memory hash database, a home directory for BDB is still required, even though it never touches it. One has to manually initialize the memory subsystem. Blah! Maybe I’m just uninitiated, but I don’t think I’ve ever been more frustrated with a piece of software. To boot, only the Heap, Queue, and Recno access methods support append puts, leaving me to manually do a get/append/put in the client. Even if BDB is fast enough, there’s absolutely no chance I’ll use it in production due to these limitations.

Results

I’ll briefly note that memcached was so slow that it didn’t complete the test suite in the two days I left it running. As such, I’ve removed it entirely from this comparison. I was probably doing something wrong vis-a-vis configuration of the client and server. Similarly, a simple un-pipelined Redis connection proved to be incredibly slow, at least an order of magnitude slower than BDB. As such, I reran the original Redis test with a pipelined connection, flushing every 10,000 records. Both versions of the test are included in the source for posterity.

These plots come from 30 runs over the data set, preceded by 10 warmup runs. The hash marks are the average value of the number of records processed per second at the particular record count, and the points are the actual observations with 10% alpha.

The first plot includes a baseline processing rate (‘xfer’ in the legend) which indicates how quickly the records can be read and prepared. The second simply excludes the baseline, for a clearer view. You can click through for larger versions of the plots.





You can find the source code used to run these comparisons on GitHub.

Notable bits

  • Despite the drastically different algorithms used by BDB and KC, their results were roughly equivalent. KC’s performance proved to be slightly smoother, and seems to have reached a stable point at around 170 million records while BDB continued to degrade. A concern is that they were the only two packages that were used through JNI. This may have limited performance, but I am disinclined to investigate further as we use the JVM in production which necessitates this cost when interacting with these services. That said, tr1/hashtable’s underlying algorithm is still quite attractive. It performed smoother despite not having a hint about the number of unique keys while BDB did.
  • Though Redis’ throughput proved to be about 50% greater than KC and BDB, the precipitous drops during (what I assume is) resizing are extremely worrisome. (I’m guessing it’s resizing since the distance between drop-offs roughly doubles each time.) The performance drop off just doesn’t jive with the goal of continuously high throughput. Equally worrying is the cost of at least doubling memory use during rehashing. Even though I mentioned this is a secondary concern in this comparison, it is an important operational problem.
  • The performance difference between tr1/hashtable (KC) and Redis is marked, given they both use chaining. I suspect this is either a result of pipelining or JNI overhead. The purpose of adding the pipelined version of the Redis test was to emulate a scenario where issuing commands did not carry network and serialization overhead. Perhaps it optimized Redis access unfairly by instead emulating a bulk command API as opposed to just mitigating protocol overhead.

Conclusions

It seems that the overhead of a general-purpose hash table “service” makes these options unsuitable for my needs. I’m honestly not sure whether it’s the broad feature sets, the JNI bridge, or the serialization/deserialization overhead of having a network server on top of the hash table, and as a practical matter I won’t pursue any further. All three problems can be circumvented by using a Java-resident hash table, so that is where we’ll go in the next post. -Xmx128g here we come!

Footnotes

[1] Per the Berkeley DB license I am including a link to Oracle’s site where one can find the full source of the database.

Big Memory, Part 2

Author’s Note: This is part 2 of a series of posts about my adventures in building a “large”, in-memory hash table. Part 1 introduced our goals and our approach to the task at hand. This post is a summary of some of the research I’ve done to familiarize myself with the problem.

Background

Our current approach to custom attribution models is very simple: pay Amazon thousands of dollars a month and “do it in the cloud” with Elastic MapReduce. In Hadoop, we partition the data by user, sort by time, identify their conversion events, and run an attribution model on these conversion-terminated “chains” of events. This is both costly and more cumbersome than we’d like.

A faster, cheaper, and arguably more transparent approach might be to pipe the live events to a service that could buffer and assemble these chains in memory and output “completed” chains (when a conversion event arrived) to a separate service to do the model computation.

We’ve come to the conclusion that a large in-memory hash table could be suitable to the task. Our specifications for said hash table are:

  • 1.5 billion 64-bit keys, uniformly and randomly distributed
  • values between 16 bytes and 16 kilobytes
  • deployed to one machine, all in-memory
  • sustained 200,000 updates per second over the course of a 14-hour “internet day”

Before jumping into building one of these, I thought I’d learn a bit about hash tables themselves.

The Research

Naturally, my research began with Wikipedia. The article on hash tables is a fairly comprehensive overview. From there, I read handful of papers and articles to dig a little deeper. Below are a selection that helped me immensely.

Dynamic Hashing

  • Fagin’s Extendible Hashing – A Fast Access Method for Dynamic Files

    One of those seminal IBM papers that everyone seems to reference. It provides some interesting historical context for the introduction of dynamic hashing. The central thesis is an insight as impressive now as it was then: by separating the hash space from the storage addressing space, a hash table can be made incrementally extendible.

  • Seltzer and Yigit’s A New Hashing Package for UNIX

    An older overview of hashing algorithms for use in and out of main memory. Includes exquisite insight into the implementation concerns the authors took into account while building a general hashing library for UNIX.

  • Rathi, Lu, and Hedrick’s Performance Comparison of Extendible Hashing And Linear Hashing Techniques

    An old but very useful comparison of linear and extendible hashing that demonstrates certain periodic performance characteristics that may make one or the other unsuitable for your application.

  • Baeza-Yates and Soza-Pollman’s Analysis of Linear Hashing Revisited

    A theoretical analysis of different overflow control functions in linear hashing. Lots of math, but very clearly demonstrates the differences between global versus local overflow resolution functions and the impact of page sizes.

Hash Functions

  • Jenkins Hash

    A solid general-purpose hash whose source and documentation are a masterwork of explication and thoroughness.

  • (Minimal) Perfect Hashing: some theory, some practice

    For when you have all of your keys ahead of time and want 100% occupancy.

Collision Resolution

  • Pagh and Rodler‘s Cuckoo Hashing

    The original cuckoo hashing paper that compares cuckoo hashing against chaining methods and linear probing. Includes a nice section at the end recapping earlier hashing schemes and their historical context.

  • Erlingsson, Manasse, and McSherry’s A Cool and Practical Alternative to Hash Tables

    They present an empirical analysis of parametrized cuckoo hashing (in number of hash functions and bucket size). There’s an interesting bit at the end discussing dynamic expansion by adding bins and/or new hash functions.

  • Lehman and Panigrahy’s 3.5-Way Cuckoo Hashing for the Price of 2-and-a-Bit

    Describes a parametrized cuckoo hashing scheme with overlapping bins. Improves likely utilization by several percent.

  • Herhily, Shavit, and Tzafrir’s Hopscotch Hashing

    Incorporates ideas from linear probing, cuckoo, and chaining techniques to avoid any of their individual pitfalls.

  • Panigrahy’s Efficient Hashing with Lookups in two Memory Accesses

    Provides a lucid graph-theoretic description of the problem of collision resolution. The solution proposed is all-theory, so don’t bother looking for a practical result therein.

  • Dietzfelbinger and Schellbach’s On Risks of Using Cuckoo Hashing with Simple Universal Hash Classes

    Discusses the unsuitability of certain linear and multiplicative hash functions for use with cuckoo hashing, using a graph theoretic argument.

Observations

After my academic explorations, I started to look for candidate data stores. In doing this, I began digging into the history of key-value stores and hash table implementations. A few things jumped out immediately:

  • Engineering effort seems to have been diverted from hash table development to distributed hash table development, in the past 5 years.
  • Dynamic hashing innovation seems to have stopped at linear and extendible hashing.
  • No benchmarks I ran into exceeded 100M insertions. In fact, this benchmark is the only one that I found that exceeded 10M insertions.

The first seems obvious given the meteoric rise in data captured from the web and the relatively fixed decrease in RAM price and increase in density. With dozens or hundreds of terabytes of “online” data, one can hardly be blamed for steering toward mid-range commodity servers en masse. However, this approach comes at a cost: coordinating and maintaining a cluster of servers is no mean feat. In fact, I consider the consensus and commitment protocols that make said coordination possible significantly more challenging to understand, let alone implement, than any of the hashing subjects mentioned above. (Just look at the Wikipedia entry for Paxos!) Similarly, hot-node issues and debugging distributed systems strike me as being an order of magnitude harder to solve than the problem of building a “better” hash table. To be clear, I’m not arguing that these two things solve the same problems. Rather, given the choice of implementing a “huge”, performant hash table in memory or the algorithms to support a clustered solution, I would choose the former.

Despite the fact that the progress of Dynamo, Cassandra, Riak, and Voldemort took most of the headlines from 2005 to 2010, work still progressed on in-memory and disk-based non-distributed hash tables like Tokyo Cabinet and Kyoto Cabinet, Redis, and even the venerable Berekely DB. (If you’re at all interested in the history of “NoSQL” data stores, you should check out this handy timeline.) That said, little in terms of novel hash table technology came from these efforts. As far as I know, BDB still uses a variant of linear hashing, Redis uses standard chaining, and Kyoto Cabinet falls back on std::unordered_map for its in-memory hash table.

This brings us to the other two points: indeed, how could traditional hash table development cease (practically) in light of the advances of DHTs? With “web-scale” data sets even a single node’s data storage needs should easily exceed anything seen 5 years prior, right? In fairness, some work has been done in the last few years to add concurrency to linear hashing as well as some work on optimizing hash table algorithms for modern cache hierarchies, but this doesn’t feel like the same kind of fundamental, basic result as, say, the introduction of extendible hashing.

I suppose the fact that there has been little visible engineering progress on this front is a testament to the quality of the existing algorithms and code. Either that or existing workloads have not yet exceeded that high watermark of 100M entries and we’re just waiting for the next jump to inspire new work in the field.

Next post: a roundup of existing candidates, benchmarks, and observations about their ease-of-use.

Big Memory, Part 1

Author’s note: This will be the first of a series of posts about my adventures in building a “large”, in-memory hash table. This first post will focus on a few philosophical notes that inspired this adventure. Research summaries, benchmarks, engineering notes, and so on will follow in future posts.

Memories

A few years ago, I recall being flabbergasted when I was told that Google had deployed a Perforce server with 256GB of RAM. Production machines at my then job had 16GB of RAM and I had certainly heard of 32- and 64-GB boxes, but 256GB struck me as an unthinkable amount. Our whole production database in RAM twice over! Wham! Pow! Smack!

Fast-forward to a month ago when I was told that we had two “leftover” boxes with a dozen cores and 256GB of RAM each. Impressive, yes, but a pleasant surprise at best. How the times have changed!

Brainstorming

The availability of the hardware got Rob and I thinking about novel things we could do in RAM. After some brainstorming, we came up with some basic tenets that should guide our exploration of the space.

We’re not in the business of saving lives.

We track ads online. Lots of them. Not all components of our system require perfect uptime and not all of our data has to be perfectly accurate. I think perhaps this scenario is more common than many are willing to admit or embrace, especially in the analytics community. My main beef with MapReduce is the a priori necessity of examining every last piece of data. Throw out what doesn’t matter! Live a little!

That said, “in-memory” does not mean unstable or lossy.

If your data fits in memory and you can easily reconstruct your data store by replaying the input stream, there’s really no reason to dismiss a volatile design. Hell, the extra speed you’re likely to pick up with an in-memory design can actually make your recoveries quicker than with a persistent solution. By the same token, writing a persistence layer for a data store is arguably the most complicated part. (See these two posts, for instance.) Mitigate the volatility of an in-memory solution by going back to the simplicity, transparency, and composability espoused by the Unix philosophy.

K.I.S.S.!

One thread does all the reading and writing, all in-memory, with only one I/O format: protobuf messages over 0MQ. See how fast that baby can go and how big she can grow before you get any fancier. Sure I could wave my hands about all kinds of fancy things like context switching, but that’s not the justification here. We’re really trying to test the limits of a relatively simple computation model, without working around it: how much can you do with a fast processor and gobs of RAM?

Benchmark with the future in mind.

Test at capacities that significantly exceed current needs. Push the envelope well past what roughly similar projects are doing. Stress it until it genuinely breaks.

Action

Since Rob has already started tackling our aggregation bottlenecks with the Summarizer (which he will surely write more about soon, nudge,nudge), I decided to try my hand at our custom attribution problem. We need a way to store user interaction streams and run attribution models over them quicker than in Hadoop, but not quite “in real-time”.

Practically, the problem amounts to storing a billion or so randomly- and uniformly- distributed 64-bit integer keys with a Zipfian distribution of values ranging between 16 bytes and 16 kilobytes of structured data. The combination of an extremely heavy write workload, zero key locality, and no durability requirements points to an in-memory hash table.

Next post, I’ll cover the research I did and am doing to familiarize myself with the problem of large, in-memory hash tables.