### The Past

The Summarizer, our main piece of aggregation infrastructure, used to have a very simple architecture:

1. RSyslog handed Netty some bytes.
2. A Netty worker turned those bytes into a String.
3. The Netty worker then peeled off the RSyslog envelope to reveal a payload and an event type. We call this combination an IMessage.
4. The IMessage‘s payload got turned into an IInputEvent (basically a POJO).
5. The IInputEvent was mapped to one or many summaries, based on its event type, and which would then be updated with the event.

Netty workers contend for a central summarization lock before updating summaries.

All of this work was done inside the Netty workers, and the synchronization of the summary objects was handled by a single lock on all of them. Some events were mapped to a single summary, others to a half dozen. Luckily the payloads were simple (CSV-formatted) and the summaries were even simpler. It didn’t really matter if the events hit one summary or ten, we could summarize events much faster than we could parse them. Under these conditions we could handle 200k messages per second, no sweat.

Slowly, new reporting features were added and the summaries became more complex. The number of operations per event increased, and throughput dropped to 100k/sec. Progressively, summarization supplanted parsing as the bottleneck.

Then we introduced a more complex, nested JSON event format (v2 messages) in order to support new product features. Complex, nested events meant ever more complex, nested summaries, which meant ever more time holding the single lock while updating them. Parsing time increased with the new payload format, but was still far faster than updating the summaries. Throughput for v1 messages had dipped to 60k/sec, and 10k/sec for the v2 messages.

Moreover, the new features the v2 messages permitted weren’t simply an open-ended exercise: with them came the customers that demanded those features and their additional traffic.  The Summarizer simply wouldn’t stand up to the predicted traffic without some work. This post is an overview of the multithreaded solution we used and hopefully will provide some insight into the pitfalls of concurrency in Java 6 and 7.

### Objective

Get v1 message throughput back to 200k/sec and v2 throughput to 100k/sec, ideally on our production hardware. Simple enough, given that I knew the main bottlenecks were summarization and to a lesser extent the parsing of the IMessages to IInputEvents.

### Let’s put in some queues

The basic premise of concurrency is that you find the time-consuming bits of work, throw some queues and workers at them, and out comes performance, right? (Wrong! But I’ve got a whole narrative going here, so bear with me.) The natural places for inserting these queues seemed to be between steps 3 and 4, and steps 4 and 5. If parsing IMessages to IInputEvents and summarizing IInputEvents are the only time-consuming work units, adding concurrency there should open up those bottlenecks. According to the “train book” I had three options for queues:

• ArrayBlockingQueue (henceforth ABQ) – bounded, backed by an array (duh), uses a single ReentrantLock
• LinkedBlockingQueue (henceforth LBQ)- bounded, backed by a linked list (duh), uses two ReentrantLocks (one for the head and one for the tail)
• ConcurrentLinkedQueue (henceforth CLQ)- unbounded, backed by a linked-list, uses no locks, instead relies on a work-stealing algorithm and CAS

IMessage to IInputEvent parsing and IInputEvent summarization are buffered by BlockingQueues.

We added a message parsing queue to which the Netty workers would dump IMessages. Message parser workers would take those IMessages and turn them into IInputEvents. They would then distribute those IInputEvents to a summarization queue I added to each summarization worker. Since I didn’t want to lock each report object, I decided that only a single summarization worker would ever write to a particular summary. (Martin Thompson’s blog posts about the Single Writer Principle were inspiration for this.) That is, each summarization worker would be assigned (by round-robin at startup) one or many summaries to own exclusively. So, in total I added one multiple-producer, multiple-consumer (MPMC) message parsing queue and N multiple-producer, single-consumer (MPSC) summarization queues (one for each summarization worker).

### The First Bottleneck: Parsing

I slotted in the various queues available, replayed some traffic to get a feel for what was going on.

• The message parsing queue was always full, which confirmed my suspicion that parsing, not Netty, was the first bottleneck.
• The summarization queues were split between two groups: those that were always full and always empty. The reason was clear: some summarization workers were assigned high-volume summaries and others low-volume summaries.

This was my first lesson in queueing: queues are on average completely full or empty because it is nearly impossible to perfectly balance production/consumption rates. Which leads to the second lesson: CLQ (well, any unbounded queue) probably shouldn’t be used as a producer/consumer queue because “completely full” means “always growing” for an unbounded queue. Naturally, that’s an OK situation when consumption outpaces production, but in that scenario I wouldn’t have needed the queue in the first place. I needed back-pressure and only the blocking (in this case, bounded) queues could give me that.

In order to address the parsing bottleneck, I wanted to get a better grasp of the IMessage to IInputEvent throughput rate under different configurations. I constructed a test in which Netty workers would either:

• do all the parsing work themselves, and then discard the message, or
• would enqueue IMessages to the message parsing queue (either ABQ or LBQ), and parser workers would dequeue, parse, and then discard the IInputEvent. CLQ was not included here since it would consistently OOM as the queue grew without bound.

Each Netty worker would be responsible for a single connection and each connection could provide as many as 150k messages per second. Results for v1 and v2 message parsing were nearly identical, as were the results for Java 6/7, so they are presented here without distinction.

When Netty did all of the parsing work, throughput maxed out at about 130k/sec.

Message parsing throughput with a BlockingQueue between the Netty workers and the message parser workers. The facet labels are [Queue Implementation, Queue Size].

• Without a queue, throughput topped out at 130k/s. With a queue and the right parser worker count, each of the four Netty workers could produce 60k/sec worth of IMessages. Notably, neither situation provoked anywhere near 100% saturation on (# netty worker + # parser worker) cores, so I have to believe that it’s simply a matter of having dedicated parsing threads that are not affected by the context switching required to read from the network. Say context switching takes up $r$% of your time, then 5 netty workers can do at most $w_0 = 5(1-r/100)$ units of work. However, 4 Netty workers and 1 parser worker can do $w_1 = 4(1-r/100) + 1 > w_0$ units. The fact that 4 Netty workers + 1 parser worker yields about 130-150k/sec, which is a small gain over just 5 Netty workers, suggests this. It might also be a matter of code “locality”: by allowing each thread to focus on a narrower scope of work, better branch prediction or compilation may be possible.
• ABQ, touted as the end all of high-performance Java queues, gave us “atrocious” throughput, compared to LBQ, if more than two consumer threads were hitting it. This was surprising until I poked an active VM with SIGQUIT a few hundred times only to find that most of the workers were waiting on the ABQ’s ReentrantLock. The difference between two and three consumers hammering that lock amounted to a 50% drop in throughput.
• LBQ’s split lock seemed to degrade more gracefully in the presence of “extra” producers or consumers. Specifically, the overhead of GC and a linked-list (vs. array) was less than that produced by lock contention on ABQ’s single lock. More remarkably, 2-8 parser workers always produced better results than a single parser worker, so a misconfiguration here couldn’t really do worse than revert to the 1 worker scenario. ABQ was not so lenient, however, dropping to throughput numbers lower than the Netty-only setup after 2 parser workers.
• Queue size was largely irrelevant compared to the impact of adding or removing even a single producer/consumer. As long as the queue is large enough to buffer jitters, there’s really little point in spending hours tuning it.

Progress! At least I knew I could parse the 200k messages per second I needed.

### The Second Bottleneck: Summarization

I knew that I wouldn’t be able to preserve the full parsing throughput simply because the queuing/dequeuing latency of another queue would always be present. The tradeoff was going to be adding more summarization workers at the cost of more time spent by the message parsing workers distributing the newly parsed IInputEvents to all relevant summarization workers. Each event would likely be distributed to more than one summarization worker, which meant a sequential lock acquisition for each summarization worker.

The cost of delivery was affected by the number of message parser workers, the number of summaries, the number of summarization workers, as well as the fan-out factor of each particular event, and hence on the proportions of different events to each other in a “nominal” data stream. This seemed like too many variables to isolate and too brittle of a measurement to be of any use. Instead, I threw out the fine-grained rigor and just plotted as many things as I could. I ran all the queues at one size: 2048.

At high throughputs, adding more workers simply makes things worse. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

At lower throughputs, lock overhead becomes less of a factor. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

• Again, the touted ABQ is matched or bested by LBQ in many configurations. It’s very, very interesting to me that GC on such fast-moving LBQs isn’t a massive issue. That said, for these tests I was running 30GB heaps, so the new generation was rather large, and the nodes of the linked list are extremely short-lived. Don’t write off LBQ, especially with higher producer/consumer counts!
• Again, it’s simply stunning how much of a difference a single added or removed producer or consumer can make on the total throughput. Our production machines have enough hardware threads to cover the worker threads, so it’s unlikely that resource starvation is a problem here. It seems that application performance can suffer immensely from simple lock convoys caused by too many workers. Comparing the v1 and v2 plots, it’s clear that the queue lock(s) can’t support any more contention from new workers at high throughputs. Adding more workers at 100k/sec completely guts performance in a way that simply does not occur at 25k/sec. Lock overhead can destroy performance at high throughputs!
• The “best” worker configurations for v1 are, from a performance perspective, incompatible with v2 workloads and vice versa. It is absolutely crucial to distinguish and separate the different types of workloads. Trying to run them together will lead to misleading and muddled results. Tease them apart and you can fairly easily optimize both. For instance, LBQ-LBQ seems to work best with 2 summarization workers for v1 workloads. However, that configuration sacrifices 50% of peak performance on v2 workloads, which worked best with 3 or 4 summarization workers. The way this separation is implemented in production is via a rule in our event routing layer: all v1 messages are routed to one Summarizer and all v2 messages are routed to another. If that kind of separation isn’t possible, it’s probably worth instantiating two different queues and balancing worker pools separately, instead of trying to lump all the events together.
• Java 6 to Java 7 bought us nothing on this hardware. You may note that under some configurations, performance appears to dip slightly under Java 7, but that’s slightly misleading because I’ve used averages of throughputs in these plots for visual clarity. The performance “dip” easily falls within the jitter of the raw data.

The problem was that despite these improvements I hadn’t reached my stated goals. It was time to look a bit further afield than java.util.concurrent.

### Disruptor

I’d mentioned that Martin Thompson’s Mechanical Sympathy blog had been inspiration for some of our design choices. It was also an excellent introduction to LMAX’s Disruptor, which can be used to simulate a graph of bounded queues. Since it advertised vast improvements in throughput over LBQ and ABQ, I decided to give it a shot.

Side note: Yes, I know the Disruptor is meant to be used when the actual bytes of data are in the RingBuffer‘s entries, as opposed to just references. No, I can’t do that easily because we have variable message sizes and using the max size as an upper bound for the entries would make the buffer either too small (in entry count) or too large to fit into L3, as advised. If I get desperate, I might consider re-architecting the application to move to a smaller message representation and move the deserialization into the “business logic” as suggested by the first link. The following results and analysis are NOT an endorsement or condemnation of the Disruptor under any kind of rigorous testing regimen. I wanted to see if I could use it as a slot-in replacement for our queues, nothing more, nothing less.

I tried out the Disruptor in the least invasive way I could think of: one Disruptor per summarization worker. Each summarization worker would have a RingBuffer<IInputEvent> that would be fed off of the various message parser workers. This fits nicely because it supports an easy MPSC configuration with the MultiThreadedClaimStrategy. I considered using it for the message parsing queue, but the hoops I’d having to jump through to stripe the RingBuffer to allow an MPMC configuration just seemed like overkill for a preliminary test. I tried out various WaitStrategys but the results shown below are from the ‘busy-spin’ strategy, which gave the best throughput.

Disruptor as a slot-in replacement for an MPSC BlockingQueue.

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

The results here were unsurprising: the Disruptor did not magically fix my problems. It performed quite well on our production hardware, matching the best results for both v1 and v2 messages, but ended up utilizing more CPU resources despite being allocated an equal number of threads, regardless of WaitStrategy. This brings up another interesting point: I had the choice of using put/take or offer/poll on our BlockingQueues and ended up choosing put/takefor the same reason. A marginal increase in throughput didn’t seem worthwhile if the tradeoff was having every thread in a busy spin consuming 100% of its core. For us, even a 10% performance increase wasn’t enough to justify the decreased visibility into the “true” utilization of the CPU resources.

### Hardware as a crutch

I was left in a pickle. Short of re-architecting around the “ideal” Disruptor workflow or reworking the way the summarization workers shared work (allowing N workers to 1 summary as well as 1 worker to N summaries) I was without a quick software option for fixing this. So, like the lazy clod that I am, I turned to hardware to buy some more time. I happened to have a faster, more modern CPU on hand, so I gave that a spin. The baseline v2 message throughput was 20k/sec, for reference.

The facet labels are [Message Queue Impl-Summarization Queue Impl, JDK].

Talk about hardware making a difference! Moving onto fresh hardware can literally double performance, without a doubling in clock speed.

• Though the Disruptor configurations gave the best results, the “mundane” LBQ-LBQ ones only trailed them by 8%, using 2-4 fewer threads and nearly a full core’s less of CPU, at that. LBQ-LBQ also beat ABQ-ABQ out handily by about 10% in most configurations.
• The performance benefits of the Java 7 Hotspot over Java 6 are clear on this newer hardware. I saw a 10-20% performance boost across the board. Notably its impact on the Disruptor configurations is more pronounced than on the others.

Note also that the optimal worker counts differ based on hardware, which is expected given the differences between Nehalem and Sandy Bridge. Every little bit of configuration seems to make a difference, and a meaningful one at that!

### Major takeaways:

1. Explore your configuration space: worker counts, JVMs, hardware. One step in any direction in any of those spaces can provide a meaningful performance boost.
3. Don’t bother tuning queue size except for the purpose of jitter or keeping it in L3.
4. Even if you don’t know what the black box at the bottom (or even the middle) of the stack is doing, you can still make progress! Experiment and plot and keep good notes!
5. The Java 7 Hotspot offers a small but consistent performance improvement over Java 6 on newer hardware.

1. Anton Zeef says:

Looking forward to the follow up blogpost in which you detail how you achieved the objectives ;-)

2. Very well written and informative. Thanks.

3. kunthar says:

i think you are an ascetic :)
why don’t you remodel your problem domain
in Erlang?
java is definetely wrong tool for your goal.

• timonk says:

Hi kunthar,

For one, none of us know Erlang, but that’s more of a practical excuse rather than a technical one. The real problem with erlang is that it doesn’t really address any part of the Summarizer’s problem domain besides sorting out some of the concurrency semantics.

To be clear, the Summarizer isn’t primarily engineered for optimal throughput. Instead, its main goal is keeping lots of aggregations in memory in a compact form. As long as we can update those aggregations in a timely fashion, we’re set. As far as I can tell there are many reasons erlang isn’t suited to this kind of application.

* Cliff Moon gave a good talk at StrangeLoop this year about the difficulties of applying backpressure in Erlang, given the heartbeat semantics of erlang processes. This wasn’t very reassuring, frankly. It hasn’t been posted yet, but if I get him to post the slides, I’ll point to them in another reply.
* Our in-memory aggregations are many tens of GBs (when very tightly packed) and are the fundamental shared, mutable state of the application. Some of the individual data structures are 10s or 100s of KBs and the copying overhead from ETS to process heaps would cripple us. The limited interface of ETS isn’t particularly helpful, either.
* Binaries are read-only in erlang. Lots of our probabilistic data structures are written as compact binary structures. Working around this seems problematic.

Hope this helps explain our choice!

• kunthar says:

Hi again,

1. Netty workers pulling data from rsyslog
2. Message parsing queue
3. And finally your message parsers
All of those can be easily fit to spread along multiple cores without hassle with Erlang VM. Also, i am surely sure that you can benefit some helpers like zeromq[1] Since i am not the software mentor of your project, i can’t come up with more ideas, but i assure you, far more better (even 20x)
performance is possible.
Concurrency has it’s own pros and cons and it is impossible to judge higher level languages from inside of garbage (collector) languages such as Java.
Note: ETS is a tool to use carefully within Erlang world. If you put the all load on it, i bet you can’t move one step ahead. Scalaris[2] is a best fit for your in memory operations. (I guess, there will be no such big need in this part, if you would design above 3 steps in distributed fashion.)
Hints:
– Think again, may be nothing have to mutable?
– Complex data structures are shouldn’t be a problem. Yes we have fundamental problems but trying to deal with it.[3]
– In memory compaction is not another barrier here too.
Also, Mr. moonpolysoft should knew about gproc and some other tools been added newly to Erlang toolset. We know RPC based epmd is not a perfect way. Nothing is perfect in this cruel world :) But you can call more helpers to give you more power[4] anytime. Erlang itself don’t have to carry on your *all* needs :)

Note2: I can understand, if the system can’t be changed at all, one can come up with totally strange solutions. Maybe one flash storage[5] solves all the all problems :) A lot of Oracle programmers using these boxes to prevent from bad coding performance nowadays :)

• kunthar says:

I have seen this right now. Maybe can help to your existing infrastructure:
http://storm-project.net/
Best,

4. Robert says:

Great post!

Sometimes the quickest, cheapest and most pragmatic thing to do is to throw some new hardware at an issue (a few thousand dollars on a hardware upgrade is much cheaper than hiring 5 Erlang contractors for 6 months to re-write it from scratch!).

Was your hardware upgrade just a new CPU or a totally new system? Often the CPU boost isn’t the biggest reason for improvement – faster network, memory access, disk access can give good end-to-end gains as well. On a recent project the (10 year old) SAN was upgraded and almost everything in the company started working better including systems where we didn’t realise this as an issue!

• timonk says:

Robert,

Thanks for the kind words!

We haven’t upgraded any hardware yet, mostly just weighing our options at this point. The only real difference between the test and production machine was the mobo and the CPU, so I’m guessing it was that. (The memory capacity and speed was roughly the same, and so are the NICs.) The step up in performance that we’ve seen year-over-year in Intel’s CPUs has been pretty remarkable.

Honestly, the cost of a rewrite is almost always too high, if only because you have to jettison so much institutional knowledge and gut-feel about the system. Throwing away the ability to quickly and accurately guess at which part of a system may be breaking is just madness most of the time. Better to dig a bit and find the fundamental divergences between what the runtime wants to do and what your app is doing than to simply try out a brand new language/runtime.

We’re pretty firm believers that the amount of work we’re doing should “fit on one machine” and as a result we’re pretty intent on figuring out what we’re doing vastly wrong before we start down the path of a whole new language or framework. Are we abusing the caching systems? Are we walking our data inefficiently? Are we using some fundamentally flawed data structure? It’s very difficult for me to believe that a mature, refined system like the JVM can’t be made to go very, very fast if you avoid the really obvious bad decisions, and tune a bit here and there.

Again, I’ll point to Martin Thompson and Michael Barker who’re slowly whittling away at the bad decisions and tuning and refining them into clear, straightforward ideas like the Single Writer Principle, like the fact that memory access patterns are important, like write-combining, like processor affinity/CPU topology, like off-heap data structures, like the Disruptor. The fact that they’re sharing some of these discoveries with the public is even better!

5. your architecture is fit for my CES idea: Context Event and State, the State is for aggregation :http://www.jdon.org/ces.html

It is very expensive to upgrade the hardware infrastructure, as if hardware is the problem then we talk about not just a CPU/RAM upgrade, but at least 2 new servers or more. So, the most common path is to try many things with software (as you do) and then decide for new hardware.
If find your approach very useful approach, including the details.
Great article.

7. Hmm is anyone else experiencing problems with the images on
this blog loading? I’m trying to figure out if its a problem on my end or if it’s the blog.
Any feedback would be greatly appreciated.

• timonk says:

The images are working for me from my phone and laptop. Sorry you’re having trouble!

8. stefano says:

I’m really late for this post… It however is interesting! Maybe it however interesting to repeat all tests with the new and old hardware but using the transferqueue of Java 7 and introducing some trie to reuse data in message since they are text