` `

### The Past

The Summarizer, our main piece of aggregation infrastructure, used to have a very simple architecture:

1. RSyslog handed Netty some bytes.
2. A Netty worker turned those bytes into a `String`.
3. The Netty worker then peeled off the RSyslog envelope to reveal a payload and an event type. We call this combination an `IMessage`.
4. The `IMessage`‘s payload got turned into an `IInputEvent` (basically a POJO).
5. The `IInputEvent` was mapped to one or many summaries, based on its event type, and which would then be updated with the event.

Netty workers contend for a central summarization lock before updating summaries.

All of this work was done inside the Netty workers, and the synchronization of the summary objects was handled by a single lock on all of them. Some events were mapped to a single summary, others to a half dozen. Luckily the payloads were simple (CSV-formatted) and the summaries were even simpler. It didn’t really matter if the events hit one summary or ten, we could summarize events much faster than we could parse them. Under these conditions we could handle 200k messages per second, no sweat.

Slowly, new reporting features were added and the summaries became more complex. The number of operations per event increased, and throughput dropped to 100k/sec. Progressively, summarization supplanted parsing as the bottleneck.

Then we introduced a more complex, nested JSON event format (v2 messages) in order to support new product features. Complex, nested events meant ever more complex, nested summaries, which meant ever more time holding the single lock while updating them. Parsing time increased with the new payload format, but was still far faster than updating the summaries. Throughput for v1 messages had dipped to 60k/sec, and 10k/sec for the v2 messages.

Moreover, the new features the v2 messages permitted weren’t simply an open-ended exercise: with them came the customers that demanded those features and their additional traffic.  The Summarizer simply wouldn’t stand up to the predicted traffic without some work. This post is an overview of the multithreaded solution we used and hopefully will provide some insight into the pitfalls of concurrency in Java 6 and 7.

### Objective

Get v1 message throughput back to 200k/sec and v2 throughput to 100k/sec, ideally on our production hardware. Simple enough, given that I knew the main bottlenecks were summarization and to a lesser extent the parsing of the `IMessage`s to `IInputEvent`s.

### Let’s put in some queues

The basic premise of concurrency is that you find the time-consuming bits of work, throw some queues and workers at them, and out comes performance, right? (Wrong! But I’ve got a whole narrative going here, so bear with me.) The natural places for inserting these queues seemed to be between steps 3 and 4, and steps 4 and 5. If parsing `IMessage`s to `IInputEvent`s and summarizing `IInputEvent`s are the only time-consuming work units, adding concurrency there should open up those bottlenecks. According to the “train book” I had three options for queues:

• `ArrayBlockingQueue` (henceforth ABQ) – bounded, backed by an array (duh), uses a single ReentrantLock
• `LinkedBlockingQueue` (henceforth LBQ)- bounded, backed by a linked list (duh), uses two ReentrantLocks (one for the head and one for the tail)
• `ConcurrentLinkedQueue` (henceforth CLQ)- unbounded, backed by a linked-list, uses no locks, instead relies on a work-stealing algorithm and CAS

`IMessage` to `IInputEvent` parsing and `IInputEvent` summarization are buffered by `BlockingQueue`s.

We added a message parsing queue to which the Netty workers would dump `IMessage`s. Message parser workers would take those `IMessage`s and turn them into `IInputEvent`s. They would then distribute those `IInputEvent`s to a summarization queue I added to each summarization worker. Since I didn’t want to lock each report object, I decided that only a single summarization worker would ever write to a particular summary. (Martin Thompson’s blog posts about the Single Writer Principle were inspiration for this.) That is, each summarization worker would be assigned (by round-robin at startup) one or many summaries to own exclusively. So, in total I added one multiple-producer, multiple-consumer (MPMC) message parsing queue and N multiple-producer, single-consumer (MPSC) summarization queues (one for each summarization worker).

### The First Bottleneck: Parsing

I slotted in the various queues available, replayed some traffic to get a feel for what was going on.

• The message parsing queue was always full, which confirmed my suspicion that parsing, not Netty, was the first bottleneck.
• The summarization queues were split between two groups: those that were always full and always empty. The reason was clear: some summarization workers were assigned high-volume summaries and others low-volume summaries.

This was my first lesson in queueing: queues are on average completely full or empty because it is nearly impossible to perfectly balance production/consumption rates. Which leads to the second lesson: CLQ (well, any unbounded queue) probably shouldn’t be used as a producer/consumer queue because “completely full” means “always growing” for an unbounded queue. Naturally, that’s an OK situation when consumption outpaces production, but in that scenario I wouldn’t have needed the queue in the first place. I needed back-pressure and only the blocking (in this case, bounded) queues could give me that.

In order to address the parsing bottleneck, I wanted to get a better grasp of the `IMessage` to `IInputEvent` throughput rate under different configurations. I constructed a test in which Netty workers would either:

• do all the parsing work themselves, and then discard the message, or
• would enqueue `IMessage`s to the message parsing queue (either ABQ or LBQ), and parser workers would dequeue, parse, and then discard the `IInputEvent`. CLQ was not included here since it would consistently OOM as the queue grew without bound.

Each Netty worker would be responsible for a single connection and each connection could provide as many as 150k messages per second. Results for v1 and v2 message parsing were nearly identical, as were the results for Java 6/7, so they are presented here without distinction.

When Netty did all of the parsing work, throughput maxed out at about 130k/sec.

Message parsing throughput with a `BlockingQueue` between the Netty workers and the message parser workers. The facet labels are [Queue Implementation, Queue Size].

• Without a queue, throughput topped out at 130k/s. With a queue and the right parser worker count, each of the four Netty workers could produce 60k/sec worth of `IMessage`s. Notably, neither situation provoked anywhere near 100% saturation on (# netty worker + # parser worker) cores, so I have to believe that it’s simply a matter of having dedicated parsing threads that are not affected by the context switching required to read from the network. Say context switching takes up $r$% of your time, then 5 netty workers can do at most $w_0 = 5(1-r/100)$ units of work. However, 4 Netty workers and 1 parser worker can do $w_1 = 4(1-r/100) + 1 > w_0$ units. The fact that 4 Netty workers + 1 parser worker yields about 130-150k/sec, which is a small gain over just 5 Netty workers, suggests this. It might also be a matter of code “locality”: by allowing each thread to focus on a narrower scope of work, better branch prediction or compilation may be possible.
• ABQ, touted as the end all of high-performance Java queues, gave us “atrocious” throughput, compared to LBQ, if more than two consumer threads were hitting it. This was surprising until I poked an active VM with `SIGQUIT` a few hundred times only to find that most of the workers were waiting on the ABQ’s `ReentrantLock`. The difference between two and three consumers hammering that lock amounted to a 50% drop in throughput.
• LBQ’s split lock seemed to degrade more gracefully in the presence of “extra” producers or consumers. Specifically, the overhead of GC and a linked-list (vs. array) was less than that produced by lock contention on ABQ’s single lock. More remarkably, 2-8 parser workers always produced better results than a single parser worker, so a misconfiguration here couldn’t really do worse than revert to the 1 worker scenario. ABQ was not so lenient, however, dropping to throughput numbers lower than the Netty-only setup after 2 parser workers.
• Queue size was largely irrelevant compared to the impact of adding or removing even a single producer/consumer. As long as the queue is large enough to buffer jitters, there’s really little point in spending hours tuning it.

Progress! At least I knew I could parse the 200k messages per second I needed.

### The Second Bottleneck: Summarization

I knew that I wouldn’t be able to preserve the full parsing throughput simply because the queuing/dequeuing latency of another queue would always be present. The tradeoff was going to be adding more summarization workers at the cost of more time spent by the message parsing workers distributing the newly parsed `IInputEvent`s to all relevant summarization workers. Each event would likely be distributed to more than one summarization worker, which meant a sequential lock acquisition for each summarization worker.

The cost of delivery was affected by the number of message parser workers, the number of summaries, the number of summarization workers, as well as the fan-out factor of each particular event, and hence on the proportions of different events to each other in a “nominal” data stream. This seemed like too many variables to isolate and too brittle of a measurement to be of any use. Instead, I threw out the fine-grained rigor and just plotted as many things as I could. I ran all the queues at one size: 2048.

At high throughputs, adding more workers simply makes things worse. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

At lower throughputs, lock overhead becomes less of a factor. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

• Again, the touted ABQ is matched or bested by LBQ in many configurations. It’s very, very interesting to me that GC on such fast-moving LBQs isn’t a massive issue. That said, for these tests I was running 30GB heaps, so the new generation was rather large, and the nodes of the linked list are extremely short-lived. Don’t write off LBQ, especially with higher producer/consumer counts!
• Again, it’s simply stunning how much of a difference a single added or removed producer or consumer can make on the total throughput. Our production machines have enough hardware threads to cover the worker threads, so it’s unlikely that resource starvation is a problem here. It seems that application performance can suffer immensely from simple lock convoys caused by too many workers. Comparing the v1 and v2 plots, it’s clear that the queue lock(s) can’t support any more contention from new workers at high throughputs. Adding more workers at 100k/sec completely guts performance in a way that simply does not occur at 25k/sec. Lock overhead can destroy performance at high throughputs!
• The “best” worker configurations for v1 are, from a performance perspective, incompatible with v2 workloads and vice versa. It is absolutely crucial to distinguish and separate the different types of workloads. Trying to run them together will lead to misleading and muddled results. Tease them apart and you can fairly easily optimize both. For instance, LBQ-LBQ seems to work best with 2 summarization workers for v1 workloads. However, that configuration sacrifices 50% of peak performance on v2 workloads, which worked best with 3 or 4 summarization workers. The way this separation is implemented in production is via a rule in our event routing layer: all v1 messages are routed to one Summarizer and all v2 messages are routed to another. If that kind of separation isn’t possible, it’s probably worth instantiating two different queues and balancing worker pools separately, instead of trying to lump all the events together.
• Java 6 to Java 7 bought us nothing on this hardware. You may note that under some configurations, performance appears to dip slightly under Java 7, but that’s slightly misleading because I’ve used averages of throughputs in these plots for visual clarity. The performance “dip” easily falls within the jitter of the raw data.

The problem was that despite these improvements I hadn’t reached my stated goals. It was time to look a bit further afield than java.util.concurrent.

### Disruptor

I’d mentioned that Martin Thompson’s Mechanical Sympathy blog had been inspiration for some of our design choices. It was also an excellent introduction to LMAX’s Disruptor, which can be used to simulate a graph of bounded queues. Since it advertised vast improvements in throughput over LBQ and ABQ, I decided to give it a shot.

Side note: Yes, I know the Disruptor is meant to be used when the actual bytes of data are in the `RingBuffer`‘s entries, as opposed to just references. No, I can’t do that easily because we have variable message sizes and using the max size as an upper bound for the entries would make the buffer either too small (in entry count) or too large to fit into L3, as advised. If I get desperate, I might consider re-architecting the application to move to a smaller message representation and move the deserialization into the “business logic” as suggested by the first link. The following results and analysis are NOT an endorsement or condemnation of the Disruptor under any kind of rigorous testing regimen. I wanted to see if I could use it as a slot-in replacement for our queues, nothing more, nothing less.

I tried out the Disruptor in the least invasive way I could think of: one `Disruptor` per summarization worker. Each summarization worker would have a `RingBuffer<IInputEvent>` that would be fed off of the various message parser workers. This fits nicely because it supports an easy MPSC configuration with the `MultiThreadedClaimStrategy.` I considered using it for the message parsing queue, but the hoops I’d having to jump through to stripe the RingBuffer to allow an MPMC configuration just seemed like overkill for a preliminary test. I tried out various `WaitStrategy`s but the results shown below are from the ‘busy-spin’ strategy, which gave the best throughput.

Disruptor as a slot-in replacement for an MPSC `BlockingQueue`.

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

The results here were unsurprising: the Disruptor did not magically fix my problems. It performed quite well on our production hardware, matching the best results for both v1 and v2 messages, but ended up utilizing more CPU resources despite being allocated an equal number of threads, regardless of `WaitStrategy`. This brings up another interesting point: I had the choice of using `put`/`take` or `offer`/`poll` on our `BlockingQueue`s and ended up choosing `put`/`take`for the same reason. A marginal increase in throughput didn’t seem worthwhile if the tradeoff was having every thread in a busy spin consuming 100% of its core. For us, even a 10% performance increase wasn’t enough to justify the decreased visibility into the “true” utilization of the CPU resources.

### Hardware as a crutch

I was left in a pickle. Short of re-architecting around the “ideal” Disruptor workflow or reworking the way the summarization workers shared work (allowing N workers to 1 summary as well as 1 worker to N summaries) I was without a quick software option for fixing this. So, like the lazy clod that I am, I turned to hardware to buy some more time. I happened to have a faster, more modern CPU on hand, so I gave that a spin. The baseline v2 message throughput was 20k/sec, for reference.

The facet labels are [Message Queue Impl-Summarization Queue Impl, JDK].

Talk about hardware making a difference! Moving onto fresh hardware can literally double performance, without a doubling in clock speed.

• Though the Disruptor configurations gave the best results, the “mundane” LBQ-LBQ ones only trailed them by 8%, using 2-4 fewer threads and nearly a full core’s less of CPU, at that. LBQ-LBQ also beat ABQ-ABQ out handily by about 10% in most configurations.
• The performance benefits of the Java 7 Hotspot over Java 6 are clear on this newer hardware. I saw a 10-20% performance boost across the board. Notably its impact on the Disruptor configurations is more pronounced than on the others.

Note also that the optimal worker counts differ based on hardware, which is expected given the differences between Nehalem and Sandy Bridge. Every little bit of configuration seems to make a difference, and a meaningful one at that!

### Major takeaways:

1. Explore your configuration space: worker counts, JVMs, hardware. One step in any direction in any of those spaces can provide a meaningful performance boost.