HLL talk at SFPUG

I had the pleasure of speaking at the SF PostgreSQL User Group’s meetup tonight about sketching, the history of HLL, and our implementation of HLL as a PG extension. My slides are embedded below and you can get a PDF copy here. Be sure to click the gear below to show speaker’s notes for context!

If video is made available, I’ll post an update with a link!

Open Source Release: java-hll

We’re happy to announce our newest open-source project, java-hll, a HyperLogLog implementation in Java that is storage-compatible with the previously released postgresql-hll and js-hll implementations. And as the rule of three dictates, we’ve also extracted the storage specification that makes them interoperable into it’s own repository. Currently, all three implementations support reading storage specification v1.0.0, while only the PostgreSQL and Java implementations fully support writing v1.0.0. We hope to bring the JS implementation up to speed, with respect to serialization, shortly.

Adventures in Concurrency

 

The Past

The Summarizer, our main piece of aggregation infrastructure, used to have a very simple architecture:

  1. RSyslog handed Netty some bytes.
  2. A Netty worker turned those bytes into a String.
  3. The Netty worker then peeled off the RSyslog envelope to reveal a payload and an event type. We call this combination an IMessage.
  4. The IMessage‘s payload got turned into an IInputEvent (basically a POJO).
  5. The IInputEvent was mapped to one or many summaries, based on its event type, and which would then be updated with the event.
Original Summarizer Architecture with Summarization Lock

Netty workers contend for a central summarization lock before updating summaries.

All of this work was done inside the Netty workers, and the synchronization of the summary objects was handled by a single lock on all of them. Some events were mapped to a single summary, others to a half dozen. Luckily the payloads were simple (CSV-formatted) and the summaries were even simpler. It didn’t really matter if the events hit one summary or ten, we could summarize events much faster than we could parse them. Under these conditions we could handle 200k messages per second, no sweat.

Slowly, new reporting features were added and the summaries became more complex. The number of operations per event increased, and throughput dropped to 100k/sec. Progressively, summarization supplanted parsing as the bottleneck.

Then we introduced a more complex, nested JSON event format (v2 messages) in order to support new product features. Complex, nested events meant ever more complex, nested summaries, which meant ever more time holding the single lock while updating them. Parsing time increased with the new payload format, but was still far faster than updating the summaries. Throughput for v1 messages had dipped to 60k/sec, and 10k/sec for the v2 messages.

Moreover, the new features the v2 messages permitted weren’t simply an open-ended exercise: with them came the customers that demanded those features and their additional traffic.  The Summarizer simply wouldn’t stand up to the predicted traffic without some work. This post is an overview of the multithreaded solution we used and hopefully will provide some insight into the pitfalls of concurrency in Java 6 and 7.

Objective

Get v1 message throughput back to 200k/sec and v2 throughput to 100k/sec, ideally on our production hardware. Simple enough, given that I knew the main bottlenecks were summarization and to a lesser extent the parsing of the IMessages to IInputEvents.

Let’s put in some queues

The basic premise of concurrency is that you find the time-consuming bits of work, throw some queues and workers at them, and out comes performance, right? (Wrong! But I’ve got a whole narrative going here, so bear with me.) The natural places for inserting these queues seemed to be between steps 3 and 4, and steps 4 and 5. If parsing IMessages to IInputEvents and summarizing IInputEvents are the only time-consuming work units, adding concurrency there should open up those bottlenecks. According to the “train book” I had three options for queues:

  • ArrayBlockingQueue (henceforth ABQ) – bounded, backed by an array (duh), uses a single ReentrantLock
  • LinkedBlockingQueue (henceforth LBQ)- bounded, backed by a linked list (duh), uses two ReentrantLocks (one for the head and one for the tail)
  • ConcurrentLinkedQueue (henceforth CLQ)- unbounded, backed by a linked-list, uses no locks, instead relies on a work-stealing algorithm and CAS
Queued Summarizer Architecture with BlockingQueues

IMessage to IInputEvent parsing and IInputEvent summarization are buffered by BlockingQueues.

We added a message parsing queue to which the Netty workers would dump IMessages. Message parser workers would take those IMessages and turn them into IInputEvents. They would then distribute those IInputEvents to a summarization queue I added to each summarization worker. Since I didn’t want to lock each report object, I decided that only a single summarization worker would ever write to a particular summary. (Martin Thompson’s blog posts about the Single Writer Principle were inspiration for this.) That is, each summarization worker would be assigned (by round-robin at startup) one or many summaries to own exclusively. So, in total I added one multiple-producer, multiple-consumer (MPMC) message parsing queue and N multiple-producer, single-consumer (MPSC) summarization queues (one for each summarization worker).

The First Bottleneck: Parsing

I slotted in the various queues available, replayed some traffic to get a feel for what was going on.

  • The message parsing queue was always full, which confirmed my suspicion that parsing, not Netty, was the first bottleneck.
  • The summarization queues were split between two groups: those that were always full and always empty. The reason was clear: some summarization workers were assigned high-volume summaries and others low-volume summaries.

This was my first lesson in queueing: queues are on average completely full or empty because it is nearly impossible to perfectly balance production/consumption rates. Which leads to the second lesson: CLQ (well, any unbounded queue) probably shouldn’t be used as a producer/consumer queue because “completely full” means “always growing” for an unbounded queue. Naturally, that’s an OK situation when consumption outpaces production, but in that scenario I wouldn’t have needed the queue in the first place. I needed back-pressure and only the blocking (in this case, bounded) queues could give me that.

In order to address the parsing bottleneck, I wanted to get a better grasp of the IMessage to IInputEvent throughput rate under different configurations. I constructed a test in which Netty workers would either:

  • do all the parsing work themselves, and then discard the message, or
  • would enqueue IMessages to the message parsing queue (either ABQ or LBQ), and parser workers would dequeue, parse, and then discard the IInputEvent. CLQ was not included here since it would consistently OOM as the queue grew without bound.

Each Netty worker would be responsible for a single connection and each connection could provide as many as 150k messages per second. Results for v1 and v2 message parsing were nearly identical, as were the results for Java 6/7, so they are presented here without distinction.

Netty-Only Message Parsing

When Netty did all of the parsing work, throughput maxed out at about 130k/sec.

 

Queued Message Parsing Throughput

Message parsing throughput with a BlockingQueue between the Netty workers and the message parser workers. The facet labels are [Queue Implementation, Queue Size].

  • Without a queue, throughput topped out at 130k/s. With a queue and the right parser worker count, each of the four Netty workers could produce 60k/sec worth of IMessages. Notably, neither situation provoked anywhere near 100% saturation on (# netty worker + # parser worker) cores, so I have to believe that it’s simply a matter of having dedicated parsing threads that are not affected by the context switching required to read from the network. Say context switching takes up r% of your time, then 5 netty workers can do at most w_0 = 5(1-r/100) units of work. However, 4 Netty workers and 1 parser worker can do w_1 = 4(1-r/100) + 1 > w_0 units. The fact that 4 Netty workers + 1 parser worker yields about 130-150k/sec, which is a small gain over just 5 Netty workers, suggests this. It might also be a matter of code “locality”: by allowing each thread to focus on a narrower scope of work, better branch prediction or compilation may be possible.
  • ABQ, touted as the end all of high-performance Java queues, gave us “atrocious” throughput, compared to LBQ, if more than two consumer threads were hitting it. This was surprising until I poked an active VM with SIGQUIT a few hundred times only to find that most of the workers were waiting on the ABQ’s ReentrantLock. The difference between two and three consumers hammering that lock amounted to a 50% drop in throughput.
  • LBQ’s split lock seemed to degrade more gracefully in the presence of “extra” producers or consumers. Specifically, the overhead of GC and a linked-list (vs. array) was less than that produced by lock contention on ABQ’s single lock. More remarkably, 2-8 parser workers always produced better results than a single parser worker, so a misconfiguration here couldn’t really do worse than revert to the 1 worker scenario. ABQ was not so lenient, however, dropping to throughput numbers lower than the Netty-only setup after 2 parser workers.
  • Queue size was largely irrelevant compared to the impact of adding or removing even a single producer/consumer. As long as the queue is large enough to buffer jitters, there’s really little point in spending hours tuning it.

Progress! At least I knew I could parse the 200k messages per second I needed.

The Second Bottleneck: Summarization

I knew that I wouldn’t be able to preserve the full parsing throughput simply because the queuing/dequeuing latency of another queue would always be present. The tradeoff was going to be adding more summarization workers at the cost of more time spent by the message parsing workers distributing the newly parsed IInputEvents to all relevant summarization workers. Each event would likely be distributed to more than one summarization worker, which meant a sequential lock acquisition for each summarization worker.

The cost of delivery was affected by the number of message parser workers, the number of summaries, the number of summarization workers, as well as the fan-out factor of each particular event, and hence on the proportions of different events to each other in a “nominal” data stream. This seemed like too many variables to isolate and too brittle of a measurement to be of any use. Instead, I threw out the fine-grained rigor and just plotted as many things as I could. I ran all the queues at one size: 2048.

BlockingQueue V1 Throughput on Production Hardware

At high throughputs, adding more workers simply makes things worse. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

BlockingQueue V2 Throughput on Production Hardware

At lower throughputs, lock overhead becomes less of a factor. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

  • Again, the touted ABQ is matched or bested by LBQ in many configurations. It’s very, very interesting to me that GC on such fast-moving LBQs isn’t a massive issue. That said, for these tests I was running 30GB heaps, so the new generation was rather large, and the nodes of the linked list are extremely short-lived. Don’t write off LBQ, especially with higher producer/consumer counts!
  • Again, it’s simply stunning how much of a difference a single added or removed producer or consumer can make on the total throughput. Our production machines have enough hardware threads to cover the worker threads, so it’s unlikely that resource starvation is a problem here. It seems that application performance can suffer immensely from simple lock convoys caused by too many workers. Comparing the v1 and v2 plots, it’s clear that the queue lock(s) can’t support any more contention from new workers at high throughputs. Adding more workers at 100k/sec completely guts performance in a way that simply does not occur at 25k/sec. Lock overhead can destroy performance at high throughputs!
  • The “best” worker configurations for v1 are, from a performance perspective, incompatible with v2 workloads and vice versa. It is absolutely crucial to distinguish and separate the different types of workloads. Trying to run them together will lead to misleading and muddled results. Tease them apart and you can fairly easily optimize both. For instance, LBQ-LBQ seems to work best with 2 summarization workers for v1 workloads. However, that configuration sacrifices 50% of peak performance on v2 workloads, which worked best with 3 or 4 summarization workers. The way this separation is implemented in production is via a rule in our event routing layer: all v1 messages are routed to one Summarizer and all v2 messages are routed to another. If that kind of separation isn’t possible, it’s probably worth instantiating two different queues and balancing worker pools separately, instead of trying to lump all the events together.
  • Java 6 to Java 7 bought us nothing on this hardware. You may note that under some configurations, performance appears to dip slightly under Java 7, but that’s slightly misleading because I’ve used averages of throughputs in these plots for visual clarity. The performance “dip” easily falls within the jitter of the raw data.

The problem was that despite these improvements I hadn’t reached my stated goals. It was time to look a bit further afield than java.util.concurrent.

Disruptor

I’d mentioned that Martin Thompson’s Mechanical Sympathy blog had been inspiration for some of our design choices. It was also an excellent introduction to LMAX’s Disruptor, which can be used to simulate a graph of bounded queues. Since it advertised vast improvements in throughput over LBQ and ABQ, I decided to give it a shot.

Side note: Yes, I know the Disruptor is meant to be used when the actual bytes of data are in the RingBuffer‘s entries, as opposed to just references. No, I can’t do that easily because we have variable message sizes and using the max size as an upper bound for the entries would make the buffer either too small (in entry count) or too large to fit into L3, as advised. If I get desperate, I might consider re-architecting the application to move to a smaller message representation and move the deserialization into the “business logic” as suggested by the first link. The following results and analysis are NOT an endorsement or condemnation of the Disruptor under any kind of rigorous testing regimen. I wanted to see if I could use it as a slot-in replacement for our queues, nothing more, nothing less.

I tried out the Disruptor in the least invasive way I could think of: one Disruptor per summarization worker. Each summarization worker would have a RingBuffer<IInputEvent> that would be fed off of the various message parser workers. This fits nicely because it supports an easy MPSC configuration with the MultiThreadedClaimStrategy. I considered using it for the message parsing queue, but the hoops I’d having to jump through to stripe the RingBuffer to allow an MPMC configuration just seemed like overkill for a preliminary test. I tried out various WaitStrategys but the results shown below are from the ‘busy-spin’ strategy, which gave the best throughput.

Summarizer Architecture with Message Parsing Queue and Disruptor Summarization Queues

Disruptor as a slot-in replacement for an MPSC BlockingQueue.

Disruptor V1 Throughput on Production Hardware

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

Disruptor V2 Throughput on Production Hardware

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

The results here were unsurprising: the Disruptor did not magically fix my problems. It performed quite well on our production hardware, matching the best results for both v1 and v2 messages, but ended up utilizing more CPU resources despite being allocated an equal number of threads, regardless of WaitStrategy. This brings up another interesting point: I had the choice of using put/take or offer/poll on our BlockingQueues and ended up choosing put/takefor the same reason. A marginal increase in throughput didn’t seem worthwhile if the tradeoff was having every thread in a busy spin consuming 100% of its core. For us, even a 10% performance increase wasn’t enough to justify the decreased visibility into the “true” utilization of the CPU resources.

Hardware as a crutch

I was left in a pickle. Short of re-architecting around the “ideal” Disruptor workflow or reworking the way the summarization workers shared work (allowing N workers to 1 summary as well as 1 worker to N summaries) I was without a quick software option for fixing this. So, like the lazy clod that I am, I turned to hardware to buy some more time. I happened to have a faster, more modern CPU on hand, so I gave that a spin. The baseline v2 message throughput was 20k/sec, for reference.

V2 Throughput on Modern Hardware

The facet labels are [Message Queue Impl-Summarization Queue Impl, JDK].

Talk about hardware making a difference! Moving onto fresh hardware can literally double performance, without a doubling in clock speed.

  • Though the Disruptor configurations gave the best results, the “mundane” LBQ-LBQ ones only trailed them by 8%, using 2-4 fewer threads and nearly a full core’s less of CPU, at that. LBQ-LBQ also beat ABQ-ABQ out handily by about 10% in most configurations.
  • The performance benefits of the Java 7 Hotspot over Java 6 are clear on this newer hardware. I saw a 10-20% performance boost across the board. Notably its impact on the Disruptor configurations is more pronounced than on the others.

Note also that the optimal worker counts differ based on hardware, which is expected given the differences between Nehalem and Sandy Bridge. Every little bit of configuration seems to make a difference, and a meaningful one at that!

Major takeaways:

  1. Explore your configuration space: worker counts, JVMs, hardware. One step in any direction in any of those spaces can provide a meaningful performance boost.
  2. Separate your workloads! Tune for each workload!
  3. Don’t bother tuning queue size except for the purpose of jitter or keeping it in L3.
  4. Even if you don’t know what the black box at the bottom (or even the middle) of the stack is doing, you can still make progress! Experiment and plot and keep good notes!
  5. The Java 7 Hotspot offers a small but consistent performance improvement over Java 6 on newer hardware.

Efficient Field-Striped, Nested, Disk-backed Record Storage

At AK we deal with a torrent of data every day. We can report on the lifetime of a campaign which may encompass more than a year’s worth of data. To be able to efficiently access our data we are constantly looking at different approaches to storage, retrieval and querying. One approach that we have been interested in involves dissecting data into its individual fields (or “columns” if you’re thinking in database terms) so that we only need to access the fields that are pertinent to a query. This is not a new approach to dealing with large volumes of data – it’s the basis of column-oriented databases like HBase.

Much of our data contains nested structures and this causes things to start to get a little more interesting, since this no longer easily fits within the data-model of traditional column-stores. Our Summarizer uses an in-memory approach to nested, field-striped storage but we wanted to investigate this for our on-disk data. Google published the Dremel paper a few years ago covering this exact topic. As with most papers, it only provides a limited overview of the approach without covering many of the “why”s and trade-offs made. So, we felt that we needed to start from the ground up and investigate how nested, field-striped storage works in order to really understand the problem.

Due to time constraints we have only been able to scratch the surface. Since the community is obviously interested in a Dremel-like project, we want to make the work that we have done available. We apologize in advance for the rough edges.

Without further ado: Efficient Field-Striped, Nested, Disk-backed Record Storage (on GitHub).

Big Memory, Part 3

Author’s Note: This is part 3 of a series of posts about my adventures in building a “large”, in-memory hash table. Part 1 introduced our goals and our approach to the task at hand. This post is a summary of some candidate hash table “services”.

Goals

To recap, I need a hash table that can support the following:

  • 1.5 billion 64-bit keys, uniformly and randomly distributed
  • values between 16 bytes and 16 kilobytes, with sizes in a Zipfian distribution
  • deployed to one machine, all in-memory
  • sustained 200,000 writes per second over the course of many hours

The API should support a non-bulk, mutable, key-value interface with an append command.

The final requirement is that the source be obtainable. After all, this is just as much about finding a viable candidate as understanding how the results are achieved.

My approach to testing the initial viability of candidates was to replicate a subset of the required production load using some of our production logs. The test amounted to writing 212 million records to a bit over 78 million unique keys. Each record’s key is 8 bytes and its value 16 bytes. The value bytes are simply appended to the existing value corresponding to the key. This closely mimics our real write workload for the project.

Note that throughput and latency are the primary concerns here: we seek a consistently high write rate. Memory overhead, at this stage, is not under scrutiny. (This may strike some as odd, given the hard bounds on a single machine’s memory, but honestly the raw data set we’re seeking to store is easily within the bounds of the servers I described in my previous post. As long as nothing absurd is going on, we can afford to trade some memory for speed.)

Candidates

Given the API requirements, the candidates that immediately came to mind were:

  • Berkeley DB
  • Kyoto Cabinet
  • Redis
  • Memcached

Note that the scope here is restricted to hash table “services”, not hash table libraries. Specifically, I don’t want to manage memory, rehashing, growing, or shrinking. I’ll be covering libraries in the next post.

Under the hood these all use slightly different hashing and collision resolution schemes.

Berkeley DB uses an implementation of Litwin’s Extended Linear Hashing. In particular, it implements linear hashing using a hybrid split control: bucket overflow and load factor independently trigger splits. (Look for ffactor and do_expand in hash_page.c’s __ham_add_el().) Notably, BDB chains memory pages, not object pointers. This is a sensible optimization in a world where main memory is small and disk seeks are costly. However, the cost in code complexity is immense. For an idea of just how much attention to detail is required, download BDB’s source and check out hash_page.c’s __ham_replpair() and __ham_add_el(). It is fascinating to see how much work goes into managing the differences between small and large values. [1]

Kyoto Cabinet “boringly” uses the C++ stdlib’s std::unordered_map. I had trouble finding implementations other than GCC’s, so I can’t really speak to anything but that. The tr1/hashtable implementation uses chaining, with a prime bucket count and a max-load-factor-based rehashing policy. When the ratio of elements to buckets passes a certain threshold (1, I believe), a full stop-the-world rehash is performed. (_M_rehash() on line 1146) The resizing policy finds the smallest prime greater than twice the current number of buckets, and the table is resized. (_M_need_rehash() on line 455) The prime policy default can be seen here.

Redis implements its own hash table that uses chaining as well with a target load factor of 1. Interestingly, it rehashes the keys incrementally in the background, pushing updates to a new table while checking both the old and new tables for reads. The incremental work is spread over all subsequent reads and writes issued to the table. This is perhaps the easiest of the four implementations to fully understand on the first read.

Similar to BDB, memcached implements linear hashing, but it chains object pointers, not memory pages. It uses what the paper calls “load control[led]” splits, meaning that incremental rehashing occurs when the load factor exceeds a certain value. (In this case, 3/2.) Unlike Redis, it does the rehashing in an another thread in the background, not as a part of the read or write operations. assoc.c very nicely illustrates the gist of linear hashing with controlled splits; check out assoc_find() and assoc_insert(). Beware, assoc_expand() just sets up some state to signal incremental rehashing. The real guts of the rehashing is in assoc_maintenance_thread(). It is notable how much simpler the code for object-pointer chaining is than the page chaining used in BDB.

Ease of Use

Note: I’m talking specifically about ease of use from a developer perspective. I’m not qualified or interested in commenting on their operational merits here.

Without a doubt, the easiest candidate to set up, use, and analyze was Redis. Between the trivial build from source, the simplicity of the Jedis API, and the visibility provided by the INFO command, using Redis was a walk in the park. The redis.conf file has a lot of knobs but most of them can safely be ignored and the inline documentation is ample.

Kyoto Cabinet came in a close second. I forgot to set $JAVA_HOME before installing the Java bindings, which caused me some grief, but once I figured that bit out everything was right as rain. Instantiating and using it were painless if somewhat sparsely documented.

Memcached was actually a pain to use, not because of the daemon itself, but because of the client libraries available in Java. The fact that the append command required a CAS value in some clients and not in others was the main culprit. One qualm with the daemon itself is that an append command only succeeds if used on an existing key.

Finally, BDB was by far the most frustrating candidate. The configuration is arcane and poorly documented. The errors are undescriptive and often cryptic. Setting the proper combination of permissions for a client is exceedingly difficult unless you peruse the documentation with a very keen eye. The distinctions made between what configuration should be done on the EnvironmentConfig versus the DatabaseConfig is unclear and poorly documented. Despite specifying an in-memory hash database, a home directory for BDB is still required, even though it never touches it. One has to manually initialize the memory subsystem. Blah! Maybe I’m just uninitiated, but I don’t think I’ve ever been more frustrated with a piece of software. To boot, only the Heap, Queue, and Recno access methods support append puts, leaving me to manually do a get/append/put in the client. Even if BDB is fast enough, there’s absolutely no chance I’ll use it in production due to these limitations.

Results

I’ll briefly note that memcached was so slow that it didn’t complete the test suite in the two days I left it running. As such, I’ve removed it entirely from this comparison. I was probably doing something wrong vis-a-vis configuration of the client and server. Similarly, a simple un-pipelined Redis connection proved to be incredibly slow, at least an order of magnitude slower than BDB. As such, I reran the original Redis test with a pipelined connection, flushing every 10,000 records. Both versions of the test are included in the source for posterity.

These plots come from 30 runs over the data set, preceded by 10 warmup runs. The hash marks are the average value of the number of records processed per second at the particular record count, and the points are the actual observations with 10% alpha.

The first plot includes a baseline processing rate (‘xfer’ in the legend) which indicates how quickly the records can be read and prepared. The second simply excludes the baseline, for a clearer view. You can click through for larger versions of the plots.





You can find the source code used to run these comparisons on GitHub.

Notable bits

  • Despite the drastically different algorithms used by BDB and KC, their results were roughly equivalent. KC’s performance proved to be slightly smoother, and seems to have reached a stable point at around 170 million records while BDB continued to degrade. A concern is that they were the only two packages that were used through JNI. This may have limited performance, but I am disinclined to investigate further as we use the JVM in production which necessitates this cost when interacting with these services. That said, tr1/hashtable’s underlying algorithm is still quite attractive. It performed smoother despite not having a hint about the number of unique keys while BDB did.
  • Though Redis’ throughput proved to be about 50% greater than KC and BDB, the precipitous drops during (what I assume is) resizing are extremely worrisome. (I’m guessing it’s resizing since the distance between drop-offs roughly doubles each time.) The performance drop off just doesn’t jive with the goal of continuously high throughput. Equally worrying is the cost of at least doubling memory use during rehashing. Even though I mentioned this is a secondary concern in this comparison, it is an important operational problem.
  • The performance difference between tr1/hashtable (KC) and Redis is marked, given they both use chaining. I suspect this is either a result of pipelining or JNI overhead. The purpose of adding the pipelined version of the Redis test was to emulate a scenario where issuing commands did not carry network and serialization overhead. Perhaps it optimized Redis access unfairly by instead emulating a bulk command API as opposed to just mitigating protocol overhead.

Conclusions

It seems that the overhead of a general-purpose hash table “service” makes these options unsuitable for my needs. I’m honestly not sure whether it’s the broad feature sets, the JNI bridge, or the serialization/deserialization overhead of having a network server on top of the hash table, and as a practical matter I won’t pursue any further. All three problems can be circumvented by using a Java-resident hash table, so that is where we’ll go in the next post. -Xmx128g here we come!

Footnotes

[1] Per the Berkeley DB license I am including a link to Oracle’s site where one can find the full source of the database.

Never trust a profiler

A week or so ago I had mentioned to Timon that for the first time a profiler had actually pointed me in a direction that directly lead to a positive increase in performance. Initially Timon just gave me that “you’re just a crotchety old man” look (which, in most cases, is the correct response). I pointed him to Josh Bloch’s Performance Anxiety presentation which dives into why it is so hard (in fact “impossible” in Josh’s words) to benchmark modern applications. It also references the interesting paper “Evaluating the Accuracy of Java Profilers”.

Just last week I was trying to track down a severe performance degradation in my snapshot recovery code. I was under some time pressure so I turned on my profiler to try to point me in the right direction. The result that it gave was clear, repeatable and unambiguous and pointed me into the linear probing algorithm of the hash table that I am using. Since I had recently moved to a new hash table (one that allowed for rehashing and resizing) it was possible that this was in fact the root of my performance problem but I had my doubts. (More on this in a future post.) I swapped out my hash table and re-profiled. The profiler again gave me a clear, repeatable and unambiguous result that my performance woes were solved so I moved on. When we were able to test the snapshot recovery code on production snapshots, we found that the performance problems still existed.

My profiler lied to me. Never trust your profiler.

Big Memory, Part 1

Author’s note: This will be the first of a series of posts about my adventures in building a “large”, in-memory hash table. This first post will focus on a few philosophical notes that inspired this adventure. Research summaries, benchmarks, engineering notes, and so on will follow in future posts.

Memories

A few years ago, I recall being flabbergasted when I was told that Google had deployed a Perforce server with 256GB of RAM. Production machines at my then job had 16GB of RAM and I had certainly heard of 32- and 64-GB boxes, but 256GB struck me as an unthinkable amount. Our whole production database in RAM twice over! Wham! Pow! Smack!

Fast-forward to a month ago when I was told that we had two “leftover” boxes with a dozen cores and 256GB of RAM each. Impressive, yes, but a pleasant surprise at best. How the times have changed!

Brainstorming

The availability of the hardware got Rob and I thinking about novel things we could do in RAM. After some brainstorming, we came up with some basic tenets that should guide our exploration of the space.

We’re not in the business of saving lives.

We track ads online. Lots of them. Not all components of our system require perfect uptime and not all of our data has to be perfectly accurate. I think perhaps this scenario is more common than many are willing to admit or embrace, especially in the analytics community. My main beef with MapReduce is the a priori necessity of examining every last piece of data. Throw out what doesn’t matter! Live a little!

That said, “in-memory” does not mean unstable or lossy.

If your data fits in memory and you can easily reconstruct your data store by replaying the input stream, there’s really no reason to dismiss a volatile design. Hell, the extra speed you’re likely to pick up with an in-memory design can actually make your recoveries quicker than with a persistent solution. By the same token, writing a persistence layer for a data store is arguably the most complicated part. (See these two posts, for instance.) Mitigate the volatility of an in-memory solution by going back to the simplicity, transparency, and composability espoused by the Unix philosophy.

K.I.S.S.!

One thread does all the reading and writing, all in-memory, with only one I/O format: protobuf messages over 0MQ. See how fast that baby can go and how big she can grow before you get any fancier. Sure I could wave my hands about all kinds of fancy things like context switching, but that’s not the justification here. We’re really trying to test the limits of a relatively simple computation model, without working around it: how much can you do with a fast processor and gobs of RAM?

Benchmark with the future in mind.

Test at capacities that significantly exceed current needs. Push the envelope well past what roughly similar projects are doing. Stress it until it genuinely breaks.

Action

Since Rob has already started tackling our aggregation bottlenecks with the Summarizer (which he will surely write more about soon, nudge,nudge), I decided to try my hand at our custom attribution problem. We need a way to store user interaction streams and run attribution models over them quicker than in Hadoop, but not quite “in real-time”.

Practically, the problem amounts to storing a billion or so randomly- and uniformly- distributed 64-bit integer keys with a Zipfian distribution of values ranging between 16 bytes and 16 kilobytes of structured data. The combination of an extremely heavy write workload, zero key locality, and no durability requirements points to an in-memory hash table.

Next post, I’ll cover the research I did and am doing to familiarize myself with the problem of large, in-memory hash tables.

Custom Input/Output Formats in Hadoop Streaming

Like I’ve mentioned before, working with Hadoop’s documentation is not my favorite thing in the world, so I thought I’d provide a straightforward explanation of one of Hadoop’s coolest features – custom input/output formats in Hadoop streaming jobs.

Use Case

It is common for us to have jobs that get results across a few weeks, or months, and it’s convenient to look at the data by day, week, or month. Sometimes including the date (preferably in a nice standard format) in the output isn’t quite enough, and for whatever reason it’s just more convenient to have each output file correspond to some logical unit.

Suppose we wanted to count unique users in our logs by state, by day. The streaming job probably starts looking something like:

hadoop jar /path/to/hadoop-streaming.jar \
        -input log_data/ \
        -output geo_output/ \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -cacheFile ip_geo_mapping.dat#geo.dat  

And the output from the job might look like:

2011-06-20,CA,1512301
2011-06-21,CA,1541111
2011-06-22,CA,1300001
...
2011-06-20,IL,23244
2011-06-21,IL,23357
2011-0-21,IL,12213
...

This is kind of a pain. If we do this for a month of data and all 50 states appear every day, that’s at least 1500 records – not quite so easy to eyeball. So, let’s ask Hadoop to give us a file per day, named YYYY-MM-DD.csv, that contains all the counts for that day. 30 files containing 50 records each is much more manageable.

Write Some Java

The first step is to write some Java. I know, this is a tutorial about writing Input/Output formats for Hadoop streaming jobs. Unfortunately, there is no way to write a custom output format other than in Java.

The good news is that once you’re set up to develop, both input and output formats tend to take minimal effort to write. Hadoop provides a class just for putting output records into different files based on the content of each record. Since we’re looking to split records based on the first field of each record, let’s subclass it.

public class DateFieldMultipleOutputFormat
    extends MultipleTextOutputFormat<Text, Text> {

    @Override
    protected String generateFileNameForKeyValue(Text key, Text value, String name) {
        String date = key.toString().split(",")[0];
        return date + ".csv";
    }
}

It’s a pretty simple exercise, even if you’ve never written a single line of Java. All the code does is take the first field of the key and use it as the output filename. Honestly, the hardest part is going to be setting up your IDE to work with Hadoop (the second hardest part was finding this blog post).

Use It

The most recent Hadoop documentation I can find, still has documentation on using custom Input/Output formats in Hadoop 0.14. Very helpful.

It turns out life is still easy. When you look at a less-misleading part of the Hadoop Streaming documentation, all the pieces you need are right there. There are flags, -inputformat and -outputformat that let you specify Java classes as your input and output format. They have to be in the Java classpath when Hadoop runs, which is taken care of by the -libjars generic Hadoop option. There is no need to compile a custom streaming jar or anything crazy (I found worse suggestions on StackOverflow while figuring this out).

Using this newfound wisdom, it’s pretty trivial to add the output format to the existing streaming job. The next version of the job is going to look something like:

hadoop jar /path/to/hadoop-streaming.jar \
        -libjars /path/to/custom-formats.jar \
        -input log_data/ \
        -output geo_output/ \
        -outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -cacheFile ip_geo_mapping.dat#geo.dat  

Pay Attention to the Details

If you write an output format like the one above and try to run a job, you’ll notice that some output records disappear. The overly-simple explanation is that Hadoop ends up opening the file for a specific date once for every reducer that date appears in, clobbering the data that was there before. Fortunately, it’s also easy to tell Hadoop how to send all the data from a date to the same reducer, so each file is opened exactly once. There isn’t even any Java involved.

All it takes is specifying the right partitioner class for the job on the command line. This partitioner is configured just like unix cut, so Data Scientists should have an easy time figuring out how to use it. To keep data from disappearing, tell the partitioner that the first field of the comma-delimited output record is the value to partition on.

With those options included, the final streaming job ends up looking like:

hadoop jar /path/to/hadoop-streaming.jar \
        -libjars /path/to/custom-formats.jar \
        -D map.output.key.field.separator=, \
        -D mapred.text.key.partitioner.options=-k1,1 \
        -input log_data/ \
        -output geo_output/ \
        -outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
        -cacheFile ip_geo_mapping.dat#geo.dat  

On the next run all the data should appear again, and the output directory should contain a file per day there is output. It’s not hard to take this example and get a little more complicated – it’d take minimal changes to make this job output to a file per state, for example. Not bad for a dozen lines of code and some command line flags.

My Love/Hate Relationship with Hadoop

Hadoop

A few months ago, the need for some log file analysis popped up. As the junior Data Scientist, I had the genuine pleasure of waking up one morning to an e-mail from Matt and Rob letting me know that I was expected to be playing with terabytes of data as soon as possible. Exciting, to say the least.

The project seemed like a perfect fit for Hadoop specifically Amazon’s Elastic MapReduce (EMR). So, I grabbed the company card, signed up, and dove right in. It’s been quite a learning experience.

After a few months learning the particulars of Amazon’s flavor of cloud computing and Hadoop’s take on distributed computing, I’ve developed a relationship with Hadoop as complicated as any MapReduce job – I’ve learned to love and loathe it at the same time.

The Good

EMR is incredibly easy to interface with, despite some of Amazon’s tools being less-than stellar (I’m looking at you, Ruby CLI). The third-party APIs tend to be excellent. We’ve been using boto heavily.

Hadoop Streaming jobs are, like most everyone else on the internet will tell you, awesome for rapid prototyping and development. The rest of the Science team and I are not super concerned with speed for most of what we do in Hadoop, so we’re perfect users for Streaming jobs. We iterate on our models constantly, and Streaming makes it possible to easily test their behavior over whatever data we please.

The ability to include HIVE in an EMR workflow is yet another awesome bonus. It’s incredibly easy to boot up a cluster, install HIVE, and be doing simple SQL analytics in no time flat. External tables even make the data loading step a breeze.

The Bad

While Hadoop and EMR have let us do some very cool things that wouldn’t be possible otherwise, we’ve had some problems too.

I’ve blown up NameNodes, run into the S3 file size limit, and hit what feels like every pain point in-between while formatting and compressing our data. I’ve crashed every JVM that Hadoop has to offer, broken the HIVE query planner, and had Streaming jobs run out of memory both because they were badly designed, and because I didn’t tweak the right settings. In short, after just a few months, with what I would consider some fairly simple, standard use cases, I’ve run into every “standard” Hadoop problem, along with what feels like more than my fair share of non-standard problems.

While it should be no surprise to anyone that a lone data-scientist can wreak havoc on any piece of software, there was a certain flavor to an unsettling large amount of these crises that really started to bother me.

After running into the dfs.datanode.max.xcievers property problem mentioned in the post above, I put my finger on both what makes a problem quintessentially Hadoop-y and why a Hadoop problem isn’t a good one to have.

The Ugly

To fix any problem, you have to know about the problem. To know about a problem, you must have read the documentation or broken something enough times to start to pinpoint it.

Reading the documentation isn’t an option for learning about dfs.datanode.max.xcievers. It’s badly documented, there’s no default anywhere and it’s misspelled (i before e except after c). But once you know what’s going on it’s an easy fix to change a cluster’s configuration.

What’s so bad about a Hadoop problem is that causing enough issues to figure out a cause takes a large amount of time, in what I find to be the most disruptive way possible. It doesn’t take a large number of tries, or any particularly intelligent debugging effort, just a lot of sitting and waiting to see if you missed a configuration property or set one incorrectly. It doesn’t seem so bad at first, but since these problems often manifest only in extremely large data-sets, each iteration can take a significant amount of time, and you can be quite a ways through a job before they appear. Investigative work in such a stop and go pattern, mixed with the worst kind of system administration, is killing me. I don’t want to stop working in the middle of a cool thought because I had to adjust a value in an XML document from 1024 to 4096.

Never mind the hardware requirements Hadoop presents, or issues with HDFS or any of the legitimate, low level complaints people like Dale have. I don’t like working on Hadoop because you have to keep so much about Hadoop in the back of your mind for such little, rare gains. It’s almost as bad as having a small child (perhaps a baby elephant?) on my desk.

What’s Next

The easy solution is to insulate me, the analyst, from the engineering. We could throw cash at the problem and dedicate an engineer or three to keeping a cluster operable. We could build a cluster in our data center. But this isn’t practical for any small company, especially when the projects don’t require you to keep a cluster running 24/7. Not only could the company not afford it, but it would be a waste of time and money.

The hard solution is coming up with something better. The whole team at AK believes that there is a better way, that working with big data can still be agile.

If possible, I should be able to access a data-set quickly and cleanly. The size and complexity of the tools that enable me to work with big data should be minimized. The barrier to entry should be low. While there are projects and companies that are trying to make using Hadoop easier and easier, I think the fundamental problem is with the one-very-large-framework-fits-all approach to big data. While Hadoop, and batch processing in general, has it’s time and place, there’s no reason I should need an elephantine framework to count anything, or find the mean of a list of numbers.

The rest of AK seems to agree. We all think the solution has to incorporate batch processing, somehow, but still embrace clever ways to navigate a large, constantly flowing data set. The crazy people here even think that our solution can be reliable enough that a Data Scientist can’t be too smart (or just incompetent enough) to break it.

Working with large sets

SetsWe do a lot of work with unique user counting and we have developed some techniques for accurate counting in small bounded-size structures.  Periodically I like to make sure that all of our assumptions still hold as the world changes around us.  I was recently running a number of experiments on large sets to get our science folks some data to analyze.  It involved getting the cardinality, union and intersection of large sets of user ids (which are 64bit values) the brute-force way.

Since I spend a good deal of my time writing Java, I figured I would just quickly whip something up.  For set sizes of 1M or less, the “standard techniques” worked well enough — java.util.HashSet will do the trick.  In general, for larger collections of primitives it’s a good idea to use one of the 3rd party libraries that is specifically tailored to primitives such as Trove or Colt to cut down on the time and memory bloat of autoboxing primitives into objects.  (There are a number of postings around what are the “best” collections for a given circumstance such as this one on StackOverflow.)  You can get to around 10M entries in a traditional set before running union and intersection take prohibitively long due to the fact that it works element-by-element.

Working with sets with over 10M entries requires different techniques.  The most common approach is to use bit arrays.  Bit arrays are not only compact in size (in that each element only takes one bit of RAM), but they are very fast for doing set operations. The set operations are typically performed by taking chunks of the bit array and performing regular instructions on them. For example, the bit array can be chunked up into longs (64bit ‘words’) and then bitwise or or bitwise and operations are performed pairwise on the two ‘sets’. Java provides java.util.BitSet which does all of the heavy lifting for you.  (There are 3rd party bit arrays available too.)

When using a hash-based set, one simply gives it the element that is to be stored (in our case, that’s a 64bit user id). With a bit array the interface is basically setBit(index, value) and getBit(index).  The problem comes down to:  what is the index. A naive approach would simply use the bit array in the same way as the hash set — pass it the element. Unfortunately, this would require a bit array that is 264-1 bits long. If you were a little dangerous with your knowledge, you could exploit some of the RLE (run-length encoding) techniques for compressing your bit array such as Word Aligned Hybrid (WAH — there’s even a Java implementation as well as others found here). Another, call it ‘sane’, approach is to use a map together with the bit array to provide the index. The map is used to map from the element to a sequential index and that index is used in the bit array.

To insert (pseudo code):

index = elementToIndexMap.get(elementId)
if(index does not exist)
    index = sequence++
    elementToIndexMap.put(elementId, index)
endif
bitarray.setBit(index, true)

To retrieve (pseudo code):

index = elementToIndexMap.get(elementId)
if(index does not exist)
    return false/*assume not present means 'not set'*/
endif
return bitarray.getBit(index)

WIth this approach, you can easily get it 100M or even billions of elements in a set given that you have enough RAM.

Going beyond 1B requires other techniques and usually involve disk-based techniques. Unix join and sort for example can get you a long way.

Even though the examples that I gave were Java-based, the techniques presented here are universal. Underlying all of this is the necessity for a little profiling, a bit of understanding of the algorithms involved, and the realization that sometimes the default facilities provided are insufficient for all cases.