Writing Analytics SQL with Common Table Expressions

Author’s Note: Hello readers! I’m Josh O’Brien. I recently joined the Science team as a junior engineer, and this is my first post for the blog.

Introduction

One of my first tasks with the Science team has been learning to write effective analytics SQL. I came in with a basic knowledge of SQL, but writing complex analytics reports required me to learn tools and strategies for managing complexity that aren’t yet part of the standard introductions to SQL. Luckily, I had the Science team to teach me to work with Common Table Expressions (CTEs). I’ve come to love CTEs for the clarity that they’ve helped bring to my thinking and writing in SQL. The CTE syntax encourages me to reason through a problem as a sequence of simple parts and enables me to directly code a solution in terms of those parts, which I can individually document and test for correctness. Working with CTEs has jump-started my productivity, and helped the team as a whole set a higher standard for our SQL.

In the Science team’s experience, much of the common frustration with SQL comes down to a failure to treat SQL queries as declarative programs that demand the same care as imperative programs. SQL is code, and we should treat it as such. We can better manage the complexity of SQL by using the same basic techniques we do in other languages: we can divide work into composable parts, document our intent, and test for correctness. We use CTEs as a foundation for building queries that are factored, documented, and tested, and we’ve enjoyed excellent results writing and maintaining numerous hundred- and thousand-line reports using this approach.

In this post, I’ll share an example of how the Science team uses CTEs to treat SQL as code. I’ll walk through the process of writing an analytics report with CTEs, and show how CTEs help me think through a problem and implement, document, and test a solution.

* If you’re thinking that CTEs are no better than temporary tables or views for these purposes, read on. CTEs, temporary tables, and views all have their place in our SQL toolkit. We use CTEs because they are best suited for this work. For more on the relative merits of CTEs, temporary tables, and views, please see the appendices to this post.

Common Table Expressions

Before we dive into the example report, let’s take a quick look at the CTE syntax we’ll be using. CTEs are defined inside of a WITH clause attached to a primary statement. Within the scope of the larger query, each CTE can be manipulated like a table. This allows us to chain CTEs together and build sequences of operations. In the following diagram, we’re building up a four-part query, part by part. We start with two parts: a foo CTE attached to a main SELECT statement. Next, we add a bar CTE. In the final step, we add a baz CTE to complete the four-part query.

cte_syntax_progression

Examples of two-, three-, and four-part queries with Common Table Expressions. The query grows by one CTE at each stage.

Notice what we did here. In the foo, bar, and baz CTEs, we now have three intermediate result sets that we can test individually and “print” with a SELECT *. Once we know each part is correct, we add another, until we’ve solved our problem. We can use CTEs to break queries into as many simple parts as the problem requires.

We use CTEs rather than temporary tables or views to decompose queries in development because they are simpler to use. There is no need to add the complexity of managing CREATE and DROP statements at this stage in the writing process.

Frequency Report

We’ll use a simplified example report to illustrate how we use CTEs in our everyday work: a frequency report. A frequency report is an online advertising analytics report that helps advertisers determine the number of ads to serve users over a specific time period. Advertisers want to reach out to customers enough times to build awareness of and interest in their offerings, but not so many times that customers become jaded or annoyed. A frequency report breaks down return on advertising investment by the number of ads users have been shown, a classification known as a user’s impression frequency class.

This report produces data that can be graphed as:

An example of a report in our UI, showing impressions for an advertiser by frequency class.

Stripped all the way down, the basic query that generates the report above is:

WITH impression_counts AS (
    SELECT user_id,
           SUM(1) AS impression_count
    FROM impressions
    GROUP BY 1
    )
SELECT impression_count      AS frequency_class,
       SUM(impression_count) AS total_impressions
FROM impression_counts
GROUP BY 1
;

The challenge of writing these reports comes from managing all the additional data we need. Actual reporting queries need to correctly handle the complexity of timestamp, ad campaign, conversion attribution, click, and cost data without becoming tangled messes.

For this simplified example, we’ll start with tables recording impression (ad view), click (ad interaction), and conversion (sale) events, and produce a frequency report tracking the total number of users, impressions, clicks, and conversions for each impression frequency class for each ad campaign in the database for the month of March 2014. We can visualize our task like this:

Our task: use the impressions, clicks, impression_attributed_conversions, and click_attributed_conversions tables to produce a frequency report for the month of March 2014.

Thinking with CTEs

Working with CTEs begins with reasoning about the problem in terms of the stages and parts needed to produce the report. From the above starting point, we can already work out four main stages.

We’ll need to:

  • FILTER the four input tables by record_date,
  • GROUP BY user_id and campaign_id, and SUM to get user-level counts for impressions, clicks, and conversions,
  • JOIN those counts together on user_id and campaign_id, and finally,
  • GROUP BY impression_count (= frequency_class) and campaign_id, and SUM to generate the report totals for users, impressions, clicks, and conversions.

We can express the relationships between these operations visually:

A map of the query to produce the frequency report. Each of the conceptual parts (rectangles) connecting the green input tables to the frequency report will be written as a simple CTE.

A map of the query to produce the frequency report. Each of the conceptual parts connecting the green input tables to the orange frequency report will be written as a simple CTE.

In one form or another, each of these operations would need to be a part of any query that produces this report. With CTEs, we can preserve the logical clarity of our thought process in the code itself. Each of the main parts of this query will be implemented using simple CTEs that serve only one main purpose. For added clarity, we will name and comment the CTEs to communicate our intent at every stage. This technique yields a query that we can read straight through and maintain with ease, just like our other code.

Writing with CTEs

Let’s take a look at a CTE from each stage right now. The full query with documentation comments can be found here, and in the appendices to this post.

First come the three filter CTEs. Here’s the CTE for filtered_impressions. Its only purpose is to filter the impressions table down to March 2014:

filtered_impressions AS (
    SELECT record_date,
           user_id,
           campaign_id
    FROM impressions
    WHERE record_date >= '2014-03-01' AND
          record_date <  '2014-04-01'
)

Next, we calculate user-level counts for impressions, clicks, and conversions. Each of the three “counts” CTEs performs only a simple aggregate function: a GROUP BY and a SUM. Here is the impression_counts CTE:

impression_counts AS (
    SELECT user_id,
           campaign_id,
           SUM(1) AS impression_count
    FROM filtered_impressions
    GROUP BY 1, 2
)

After that, we JOIN the three “counts” CTEs together in a single long table. This collated_counts CTE is the longest in the query, but, like the others, it has only one main purpose:

collated_counts AS (
    SELECT imp.user_id           AS user_id,
           imp.campaign_id       AS campaign_id,
           imp.impression_count  AS impression_count,
           cl.click_count        AS click_count,
           conv.conversion_count AS conversion_count
    FROM impression_counts imp
        LEFT OUTER JOIN click_counts cl ON
            imp.user_id      = cl.user_id AND
            imp.campaign_id  = cl.campaign_id
        LEFT OUTER JOIN conversion_counts conv ON
            imp.user_id      = conv.user_id AND
            imp.campaign_id  = conv.campaign_id
    )

Last comes the main SELECT statement. Its only purpose is to group by impression_count (= frequency_class) and campaign_id, and calculate the four SUMs for the report:

SELECT impression_count                   AS frequency_class,
       campaign_id                        AS campaign_id,
       SUM(1)                             AS total_users,
       SUM(impression_count)              AS total_impressions,
       SUM(COALESCE(click_count, 0))      AS total_clicks,
       SUM(COALESCE(conversion_count, 0)) AS total_conversions
FROM collated_counts
GROUP BY 1, 2

 

Testing with CTEs

As we build up the query with CTEs, we leverage the ability to SELECT from each CTE individually to test for correctness as part of the writing process. This basic testing can be as simple as three files in a text editor, which we execute from psql (or equivalent) in a sequence as we write:

  • setup.sql: CREATE tables and INSERT rows of test data
  • test.sql: the query itself
  • teardown.sql: DROP the tables created in setup.sql

We write and comment one CTE at a time in the test file. Each time we add a CTE, we add test rows to exercise that CTE to the setup file, and include comments to indicate what should happen to those rows when we SELECT * from the relevant CTE. When the output matches our expectations, we move to the next part of the query, and repeat the process.

As an example, initial tests for the filtered_impressions CTE could consist of creating an impressions table and inserting five rows to exercise the date range in the WHERE clause. We indicate our expectations for those rows with brief comments:

CREATE TABLE impressions (
    record_date  date   NOT NULL,
    user_id      bigint NOT NULL,
    campaign_id  bigint NOT NULL
);
INSERT INTO impressions (record_date, user_id, campaign_id) VALUES
    /* The following 2 rows should not appear in filtered_impressions: */
    ('2014-02-28', 707, 7),
    ('2014-04-01', 707, 7),
    /* The following 3 rows should appear in filtered_impressions: */
    ('2014-03-01', 101, 1),
    ('2014-03-15', 101, 1),
    ('2014-03-31', 101, 1)
;

This basic testing at the time of writing is not a substitute for a comprehensive test framework, but it is enough to catch many errors that could otherwise sneak through, and it provides a good return on a modest investment of effort. By the time the full query is complete, this process will have generated tests and documentation for each part of the query.

Conclusion

This method of working with CTEs has helped me by bringing clarity and simplicity to complex analytics queries. Thinking, writing, and testing with CTEs helps me treat SQL as part of software engineering practice by writing SQL that’s factored, documented, and tested more like other code.

The Science team thinks of this method as producing a foundation for further refinements. When appropriate, optimizations for performance can and will be made, but we focus on correctness first. Optimizations tend to add complexity, and before we do that, we want to mitigate the complexity of the query as much as possible.

By starting with CTEs, we can more easily write queries that we can quickly read and reuse six months from now. Analysts can return to their models and analyses with confidence and engineers are better able to add new features to reports without introducing new bugs. We’re building upon a foundation of factored, documented, and tested SQL.

Appendices

Code for the Example Report
On CTEs, Temporary Tables, and Views

We asked Christophe Pettus of PostgreSQL Experts to help illuminate the tradeoffs between CTEs, views, and temporary tables, and received the following helpful response, which we publish here with his permission and our thanks:

[E]ach have characteristics that can make them better or worse in particular situations:

1. CTEs are optimization fences; the query planner will plan CTEs
separately from the rest of the query. This can be good or bad,
depending on the way the CTE is used.

2. Views are *not* optimization fences; you can think of them as being
textually inserted into the query at the appropriate place, so queries
can be rewritten, join clauses moved around, etc.

3. Temporary tables can have indexes; for very large intermediate result
sets, this can be essential for good performance.

We agree that the choice between CTEs, temporary tables, and views is a matter of balancing the different trade-offs of the different stages of software development.

As explained in this post, the Science team finds the balance in favor of CTEs as the foundation for query development. We reach for the CTE syntax first for its clarity and ease of use. When we write and test queries part-by-part, we want to keep the code as clear and simple as possible. Juggling extra CREATE and DROP statements for temporary tables or views works against that goal.

Once we have a correct, clear foundation, then we move onto the optimizations I mentioned in the conclusion. At that point, we consider re-writing CTEs as views or materialized tables on a case-by-case basis. Sometimes the balance tips away from CTEs. In our experience, the most common reason for this has been to gain the performance benefits of indexing on intermediate result sets that can contain hundreds of millions to tens of billions of rows.

More posts featuring CTEs

Open Source Release: java-hll

We’re happy to announce our newest open-source project, java-hll, a HyperLogLog implementation in Java that is storage-compatible with the previously released postgresql-hll and js-hll implementations. And as the rule of three dictates, we’ve also extracted the storage specification that makes them interoperable into it’s own repository. Currently, all three implementations support reading storage specification v1.0.0, while only the PostgreSQL and Java implementations fully support writing v1.0.0. We hope to bring the JS implementation up to speed, with respect to serialization, shortly.

Open Source Release: js-hll

One of the first things that we wanted to do with HyperLogLog when we first started playing with it was to support and expose it natively in the browser. The thought of allowing users to directly interact with these structures — perform arbitrary unions and intersections on effectively unbounded sets all on the client — was exhilarating to us. We knew it could be done but we simply didn’t have the time.

Fast forward a few years to today. We had finally enough in the meager science/research budget to pick up an intern for a few months and as a side project I tasked him with turning our dream into a reality. Without further ado, we are pleased to announce the open-source release of AK’s HyperLogLog implementation for JavaScript, js-hll. We are releasing this code under the Apache License, Version 2.0 matching our other open source offerings.

We knew that we couldn’t just release a bunch of JavaScript code without allowing you to see it in action — that would be a crime. We passed a few ideas around and the one that kept bubbling to the top was a way to kill two birds with one stone. We wanted something that would showcase what you can do with HLL in the browser and give us a tool for explaining HLLs. It is typical for us to explain how HLL intersections work using a Venn diagram. You draw some overlapping circles with a broder that represents the error and you talk about how if that border is close to or larger than the intersection then you can’t say much about the size of that intersection. This works just ok on a whiteboard but what you really want is to just build a visualization that allows you to select from some sets and see the overlap. Maybe even play with the precision a little bit to see how that changes the result. Well, we did just that!

Click above to interact with the visualization

Click above to interact with the visualization

Note: There’s more interesting math in the error bounds that we haven’t explored. Presenting error bounds on a measurement that cannot mathematically be less than zero is problematic. For instance, if you have a ruler that can only measure to 1/2″ and you measure an object that truly is 1/8″ long you can say “all I know is this object measures under 0.25 inches”. Your object cannot measure less than 0 inches, so you would never say 0 minus some error bound. That is, you DO NOT say 0.0 ± 0.25 inches.  Similarly with set intersections there is no meaning to a negative intersection. We did some digging and just threw our hands up and tossed in what we feel are best practices. In the js-hll code we a) never show negative values and b) we call “spurious” any calculation that results in an answer within 20% of the error bound. If you have a better answer, we would love to hear it!

Open Source Release: js-murmur3-128

As you can imagine from of all of our blog posts about hashing that we hash a lot of things. While the various hashing algorithms may be well-defined, the devil is always in the details especially when working with multiple languages that have different ways of representing numbers. We’re happy to announce the open-source release of AK’s 128bit Murmur3 implementation for JavaScript, js-murmur3-128. We are releasing this code under the Apache License, Version 2.0 matching our other open source offerings.

Details

The goal of the implementation is to produce a hash value that is equivalent to the C++ and Java (Guava) versions for the same input and it must be usable in the browser. (Full disclosure: we’re still working through some signed/unsigned issues between the C++ and Java/JavaScript versions. The Java and JavaScript versions match exactly.)

Usage

Java (Guava):

final int seed = 0;
final byte[] bytes = { (byte)0xDE, (byte)0xAD, (byte)0xBE, (byte)0xEF,
                       (byte)0xFE, (byte)0xED, (byte)0xFA, (byte)0xCE };
com.google.common.hash.HashFunction hashFunction = com.google.common.hash.Hashing.murmur3_128(seed);
com.google.common.hash.HashCode hashCode = hashFunction.newHasher()
       .putBytes(bytes)
       .hash();
System.err.println(hashCode.asLong());

JavaScript:

var seed = 0;
var rawKey = new ArrayBuffer(8);
    var byteView = new Int8Array(rawKey);
        byteView[0] = 0xDE; byteView[1] = 0xAD; byteView[2] = 0xBE; byteView[3] = 0xEF;
        byteView[4] = 0xFE; byteView[5] = 0xED; byteView[6] = 0xFA; byteView[7] = 0xCE;
console.log(murmur3.hash128(rawKey, seed));

HyperLogLog Engineering: Choosing The Right Bits

Author’s Note: this is just a quick post about an engineering hiccup we ran into while implementing HyperLogLog features that aren’t mentioned in the original paper. We have an introduction to the algorithm and several other posts on the topic if you’re interested.

Say you had two HyperLogLog data structures with 5-bit-wide registers, one with log_{2}m = 11 and the other with log_{2}m = 15, and wanted to compute their union. You could just follow my colleague Chris’ advice and “fold” the larger one down to the size of the smaller one and then proceed as usual taking the pairwise max of the registers. This turns out to be a more involved process than Chris makes it out to be if you designed your HLL implementation in a particular way. For instance, if you use the 15 least(/most) significant bits of the 64-bit hashed input to determine register index and the next 30 bits to determine the register value, you end up in a tricky situation when you truncate the last 4 bits of the index to get the new 11-bit index.

bit string bad

If you imagine feeding the same element into an HLL of the smaller size, then the 4 bits you truncated from the index would have actually been used in the computation of the register value.

bit string bad after fold

You couldn’t simply take the original register value you computed, you’d have to take into account the new prefix added to the register value bit string. If the prefix has a 1 in it, you would recompute the run of zeroes on just the prefix (because you know it contains a 1 and thus all the information you need), and if not, you’d add the length of the prefix to the original register value computed. Not a ton of work, but having clutter like this in algorithmic code distracts the reader from the true intention. So how do we avoid this?

Well, you could say that it’s very, very unlikely that you’ll ever need more than 30 bits for your register value, so you could assume that the register width would remain constant forever and use the bottom 30 bits for your register value and the next log_{2}m bits for your register index. That way you could just truncate the last 4 bits of the index and know that your register value would still be the same. On the other hand, if you’re Google, that may not be true. In that case, what you should do is use the log_{2}m least (/most) significant bits of your hashed value for the register index and the 30 most (/least) significant bits for the register value.

bit string

Now you can just truncate the register index and use the original register value.

bit string after fold

If you’re using a good hash function like MurmurHash3 that gives you 128 bits of entropy, you could simply compute the register index from the first 64-bit word in the hash and compute the register value from the second 64-bit word and completely ignore this problem up to a mind-bending log_{2}m = 64 and register width of 6 (aka the heat death of the universe).

I know it’s not always possible to anticipate this problem in the early stages of implementing and vetting an algorithm, but hopefully with a bit of research the next time someone looks to implement HLL they’ll see this and learn from our mistake.

Adventures in Concurrency

 

The Past

The Summarizer, our main piece of aggregation infrastructure, used to have a very simple architecture:

  1. RSyslog handed Netty some bytes.
  2. A Netty worker turned those bytes into a String.
  3. The Netty worker then peeled off the RSyslog envelope to reveal a payload and an event type. We call this combination an IMessage.
  4. The IMessage‘s payload got turned into an IInputEvent (basically a POJO).
  5. The IInputEvent was mapped to one or many summaries, based on its event type, and which would then be updated with the event.
Original Summarizer Architecture with Summarization Lock

Netty workers contend for a central summarization lock before updating summaries.

All of this work was done inside the Netty workers, and the synchronization of the summary objects was handled by a single lock on all of them. Some events were mapped to a single summary, others to a half dozen. Luckily the payloads were simple (CSV-formatted) and the summaries were even simpler. It didn’t really matter if the events hit one summary or ten, we could summarize events much faster than we could parse them. Under these conditions we could handle 200k messages per second, no sweat.

Slowly, new reporting features were added and the summaries became more complex. The number of operations per event increased, and throughput dropped to 100k/sec. Progressively, summarization supplanted parsing as the bottleneck.

Then we introduced a more complex, nested JSON event format (v2 messages) in order to support new product features. Complex, nested events meant ever more complex, nested summaries, which meant ever more time holding the single lock while updating them. Parsing time increased with the new payload format, but was still far faster than updating the summaries. Throughput for v1 messages had dipped to 60k/sec, and 10k/sec for the v2 messages.

Moreover, the new features the v2 messages permitted weren’t simply an open-ended exercise: with them came the customers that demanded those features and their additional traffic.  The Summarizer simply wouldn’t stand up to the predicted traffic without some work. This post is an overview of the multithreaded solution we used and hopefully will provide some insight into the pitfalls of concurrency in Java 6 and 7.

Objective

Get v1 message throughput back to 200k/sec and v2 throughput to 100k/sec, ideally on our production hardware. Simple enough, given that I knew the main bottlenecks were summarization and to a lesser extent the parsing of the IMessages to IInputEvents.

Let’s put in some queues

The basic premise of concurrency is that you find the time-consuming bits of work, throw some queues and workers at them, and out comes performance, right? (Wrong! But I’ve got a whole narrative going here, so bear with me.) The natural places for inserting these queues seemed to be between steps 3 and 4, and steps 4 and 5. If parsing IMessages to IInputEvents and summarizing IInputEvents are the only time-consuming work units, adding concurrency there should open up those bottlenecks. According to the “train book” I had three options for queues:

  • ArrayBlockingQueue (henceforth ABQ) – bounded, backed by an array (duh), uses a single ReentrantLock
  • LinkedBlockingQueue (henceforth LBQ)- bounded, backed by a linked list (duh), uses two ReentrantLocks (one for the head and one for the tail)
  • ConcurrentLinkedQueue (henceforth CLQ)- unbounded, backed by a linked-list, uses no locks, instead relies on a work-stealing algorithm and CAS
Queued Summarizer Architecture with BlockingQueues

IMessage to IInputEvent parsing and IInputEvent summarization are buffered by BlockingQueues.

We added a message parsing queue to which the Netty workers would dump IMessages. Message parser workers would take those IMessages and turn them into IInputEvents. They would then distribute those IInputEvents to a summarization queue I added to each summarization worker. Since I didn’t want to lock each report object, I decided that only a single summarization worker would ever write to a particular summary. (Martin Thompson’s blog posts about the Single Writer Principle were inspiration for this.) That is, each summarization worker would be assigned (by round-robin at startup) one or many summaries to own exclusively. So, in total I added one multiple-producer, multiple-consumer (MPMC) message parsing queue and N multiple-producer, single-consumer (MPSC) summarization queues (one for each summarization worker).

The First Bottleneck: Parsing

I slotted in the various queues available, replayed some traffic to get a feel for what was going on.

  • The message parsing queue was always full, which confirmed my suspicion that parsing, not Netty, was the first bottleneck.
  • The summarization queues were split between two groups: those that were always full and always empty. The reason was clear: some summarization workers were assigned high-volume summaries and others low-volume summaries.

This was my first lesson in queueing: queues are on average completely full or empty because it is nearly impossible to perfectly balance production/consumption rates. Which leads to the second lesson: CLQ (well, any unbounded queue) probably shouldn’t be used as a producer/consumer queue because “completely full” means “always growing” for an unbounded queue. Naturally, that’s an OK situation when consumption outpaces production, but in that scenario I wouldn’t have needed the queue in the first place. I needed back-pressure and only the blocking (in this case, bounded) queues could give me that.

In order to address the parsing bottleneck, I wanted to get a better grasp of the IMessage to IInputEvent throughput rate under different configurations. I constructed a test in which Netty workers would either:

  • do all the parsing work themselves, and then discard the message, or
  • would enqueue IMessages to the message parsing queue (either ABQ or LBQ), and parser workers would dequeue, parse, and then discard the IInputEvent. CLQ was not included here since it would consistently OOM as the queue grew without bound.

Each Netty worker would be responsible for a single connection and each connection could provide as many as 150k messages per second. Results for v1 and v2 message parsing were nearly identical, as were the results for Java 6/7, so they are presented here without distinction.

Netty-Only Message Parsing

When Netty did all of the parsing work, throughput maxed out at about 130k/sec.

 

Queued Message Parsing Throughput

Message parsing throughput with a BlockingQueue between the Netty workers and the message parser workers. The facet labels are [Queue Implementation, Queue Size].

  • Without a queue, throughput topped out at 130k/s. With a queue and the right parser worker count, each of the four Netty workers could produce 60k/sec worth of IMessages. Notably, neither situation provoked anywhere near 100% saturation on (# netty worker + # parser worker) cores, so I have to believe that it’s simply a matter of having dedicated parsing threads that are not affected by the context switching required to read from the network. Say context switching takes up r% of your time, then 5 netty workers can do at most w_0 = 5(1-r/100) units of work. However, 4 Netty workers and 1 parser worker can do w_1 = 4(1-r/100) + 1 > w_0 units. The fact that 4 Netty workers + 1 parser worker yields about 130-150k/sec, which is a small gain over just 5 Netty workers, suggests this. It might also be a matter of code “locality”: by allowing each thread to focus on a narrower scope of work, better branch prediction or compilation may be possible.
  • ABQ, touted as the end all of high-performance Java queues, gave us “atrocious” throughput, compared to LBQ, if more than two consumer threads were hitting it. This was surprising until I poked an active VM with SIGQUIT a few hundred times only to find that most of the workers were waiting on the ABQ’s ReentrantLock. The difference between two and three consumers hammering that lock amounted to a 50% drop in throughput.
  • LBQ’s split lock seemed to degrade more gracefully in the presence of “extra” producers or consumers. Specifically, the overhead of GC and a linked-list (vs. array) was less than that produced by lock contention on ABQ’s single lock. More remarkably, 2-8 parser workers always produced better results than a single parser worker, so a misconfiguration here couldn’t really do worse than revert to the 1 worker scenario. ABQ was not so lenient, however, dropping to throughput numbers lower than the Netty-only setup after 2 parser workers.
  • Queue size was largely irrelevant compared to the impact of adding or removing even a single producer/consumer. As long as the queue is large enough to buffer jitters, there’s really little point in spending hours tuning it.

Progress! At least I knew I could parse the 200k messages per second I needed.

The Second Bottleneck: Summarization

I knew that I wouldn’t be able to preserve the full parsing throughput simply because the queuing/dequeuing latency of another queue would always be present. The tradeoff was going to be adding more summarization workers at the cost of more time spent by the message parsing workers distributing the newly parsed IInputEvents to all relevant summarization workers. Each event would likely be distributed to more than one summarization worker, which meant a sequential lock acquisition for each summarization worker.

The cost of delivery was affected by the number of message parser workers, the number of summaries, the number of summarization workers, as well as the fan-out factor of each particular event, and hence on the proportions of different events to each other in a “nominal” data stream. This seemed like too many variables to isolate and too brittle of a measurement to be of any use. Instead, I threw out the fine-grained rigor and just plotted as many things as I could. I ran all the queues at one size: 2048.

BlockingQueue V1 Throughput on Production Hardware

At high throughputs, adding more workers simply makes things worse. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

BlockingQueue V2 Throughput on Production Hardware

At lower throughputs, lock overhead becomes less of a factor. Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

  • Again, the touted ABQ is matched or bested by LBQ in many configurations. It’s very, very interesting to me that GC on such fast-moving LBQs isn’t a massive issue. That said, for these tests I was running 30GB heaps, so the new generation was rather large, and the nodes of the linked list are extremely short-lived. Don’t write off LBQ, especially with higher producer/consumer counts!
  • Again, it’s simply stunning how much of a difference a single added or removed producer or consumer can make on the total throughput. Our production machines have enough hardware threads to cover the worker threads, so it’s unlikely that resource starvation is a problem here. It seems that application performance can suffer immensely from simple lock convoys caused by too many workers. Comparing the v1 and v2 plots, it’s clear that the queue lock(s) can’t support any more contention from new workers at high throughputs. Adding more workers at 100k/sec completely guts performance in a way that simply does not occur at 25k/sec. Lock overhead can destroy performance at high throughputs!
  • The “best” worker configurations for v1 are, from a performance perspective, incompatible with v2 workloads and vice versa. It is absolutely crucial to distinguish and separate the different types of workloads. Trying to run them together will lead to misleading and muddled results. Tease them apart and you can fairly easily optimize both. For instance, LBQ-LBQ seems to work best with 2 summarization workers for v1 workloads. However, that configuration sacrifices 50% of peak performance on v2 workloads, which worked best with 3 or 4 summarization workers. The way this separation is implemented in production is via a rule in our event routing layer: all v1 messages are routed to one Summarizer and all v2 messages are routed to another. If that kind of separation isn’t possible, it’s probably worth instantiating two different queues and balancing worker pools separately, instead of trying to lump all the events together.
  • Java 6 to Java 7 bought us nothing on this hardware. You may note that under some configurations, performance appears to dip slightly under Java 7, but that’s slightly misleading because I’ve used averages of throughputs in these plots for visual clarity. The performance “dip” easily falls within the jitter of the raw data.

The problem was that despite these improvements I hadn’t reached my stated goals. It was time to look a bit further afield than java.util.concurrent.

Disruptor

I’d mentioned that Martin Thompson’s Mechanical Sympathy blog had been inspiration for some of our design choices. It was also an excellent introduction to LMAX’s Disruptor, which can be used to simulate a graph of bounded queues. Since it advertised vast improvements in throughput over LBQ and ABQ, I decided to give it a shot.

Side note: Yes, I know the Disruptor is meant to be used when the actual bytes of data are in the RingBuffer‘s entries, as opposed to just references. No, I can’t do that easily because we have variable message sizes and using the max size as an upper bound for the entries would make the buffer either too small (in entry count) or too large to fit into L3, as advised. If I get desperate, I might consider re-architecting the application to move to a smaller message representation and move the deserialization into the “business logic” as suggested by the first link. The following results and analysis are NOT an endorsement or condemnation of the Disruptor under any kind of rigorous testing regimen. I wanted to see if I could use it as a slot-in replacement for our queues, nothing more, nothing less.

I tried out the Disruptor in the least invasive way I could think of: one Disruptor per summarization worker. Each summarization worker would have a RingBuffer<IInputEvent> that would be fed off of the various message parser workers. This fits nicely because it supports an easy MPSC configuration with the MultiThreadedClaimStrategy. I considered using it for the message parsing queue, but the hoops I’d having to jump through to stripe the RingBuffer to allow an MPMC configuration just seemed like overkill for a preliminary test. I tried out various WaitStrategys but the results shown below are from the ‘busy-spin’ strategy, which gave the best throughput.

Summarizer Architecture with Message Parsing Queue and Disruptor Summarization Queues

Disruptor as a slot-in replacement for an MPSC BlockingQueue.

Disruptor V1 Throughput on Production Hardware

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

Disruptor V2 Throughput on Production Hardware

Facet labels are [Message Parser Queue Impl-Summarization Queue Impl, JDK].

The results here were unsurprising: the Disruptor did not magically fix my problems. It performed quite well on our production hardware, matching the best results for both v1 and v2 messages, but ended up utilizing more CPU resources despite being allocated an equal number of threads, regardless of WaitStrategy. This brings up another interesting point: I had the choice of using put/take or offer/poll on our BlockingQueues and ended up choosing put/takefor the same reason. A marginal increase in throughput didn’t seem worthwhile if the tradeoff was having every thread in a busy spin consuming 100% of its core. For us, even a 10% performance increase wasn’t enough to justify the decreased visibility into the “true” utilization of the CPU resources.

Hardware as a crutch

I was left in a pickle. Short of re-architecting around the “ideal” Disruptor workflow or reworking the way the summarization workers shared work (allowing N workers to 1 summary as well as 1 worker to N summaries) I was without a quick software option for fixing this. So, like the lazy clod that I am, I turned to hardware to buy some more time. I happened to have a faster, more modern CPU on hand, so I gave that a spin. The baseline v2 message throughput was 20k/sec, for reference.

V2 Throughput on Modern Hardware

The facet labels are [Message Queue Impl-Summarization Queue Impl, JDK].

Talk about hardware making a difference! Moving onto fresh hardware can literally double performance, without a doubling in clock speed.

  • Though the Disruptor configurations gave the best results, the “mundane” LBQ-LBQ ones only trailed them by 8%, using 2-4 fewer threads and nearly a full core’s less of CPU, at that. LBQ-LBQ also beat ABQ-ABQ out handily by about 10% in most configurations.
  • The performance benefits of the Java 7 Hotspot over Java 6 are clear on this newer hardware. I saw a 10-20% performance boost across the board. Notably its impact on the Disruptor configurations is more pronounced than on the others.

Note also that the optimal worker counts differ based on hardware, which is expected given the differences between Nehalem and Sandy Bridge. Every little bit of configuration seems to make a difference, and a meaningful one at that!

Major takeaways:

  1. Explore your configuration space: worker counts, JVMs, hardware. One step in any direction in any of those spaces can provide a meaningful performance boost.
  2. Separate your workloads! Tune for each workload!
  3. Don’t bother tuning queue size except for the purpose of jitter or keeping it in L3.
  4. Even if you don’t know what the black box at the bottom (or even the middle) of the stack is doing, you can still make progress! Experiment and plot and keep good notes!
  5. The Java 7 Hotspot offers a small but consistent performance improvement over Java 6 on newer hardware.

Efficient Field-Striped, Nested, Disk-backed Record Storage

At AK we deal with a torrent of data every day. We can report on the lifetime of a campaign which may encompass more than a year’s worth of data. To be able to efficiently access our data we are constantly looking at different approaches to storage, retrieval and querying. One approach that we have been interested in involves dissecting data into its individual fields (or “columns” if you’re thinking in database terms) so that we only need to access the fields that are pertinent to a query. This is not a new approach to dealing with large volumes of data – it’s the basis of column-oriented databases like HBase.

Much of our data contains nested structures and this causes things to start to get a little more interesting, since this no longer easily fits within the data-model of traditional column-stores. Our Summarizer uses an in-memory approach to nested, field-striped storage but we wanted to investigate this for our on-disk data. Google published the Dremel paper a few years ago covering this exact topic. As with most papers, it only provides a limited overview of the approach without covering many of the “why”s and trade-offs made. So, we felt that we needed to start from the ground up and investigate how nested, field-striped storage works in order to really understand the problem.

Due to time constraints we have only been able to scratch the surface. Since the community is obviously interested in a Dremel-like project, we want to make the work that we have done available. We apologize in advance for the rough edges.

Without further ado: Efficient Field-Striped, Nested, Disk-backed Record Storage (on GitHub).

Netty’s CodecEmbedder

We love Netty. It’s a great full-featured network framework for Java. One of the features that rounds out the framework is the CodecEmbedder. It allows you to test your encoders and decoders without any fuss using a offer-poll paradigm. For example, to test our Rsyslog decoder, we simply:

ChannelBuffer messageBuffer =
    ChannelBuffers.copiedBuffer("2011-06-30T00:00:03-07:00 some.host.agkn.net EVT_NM column1,column2\n", CharsetUtil.UTF_8);
RsyslogDecoder decoder = new RsyslogDecoder();
DecoderEmbedder<IRsyslogMessage> embedder = new DecoderEmbedder<IRsyslogMessage>(decoder);
    embedder.offer(messageBuffer);

IRsyslogMessage message = embedder.poll();
assertNotNull(message, "Decoded message");
assertEquals(message.getTimestamp(), "2011-06-30T00:00:03-07:00", "Timestamp");
assertEquals(message.getHostname(), "some.host.agkn.net", "Host");
assertEquals(message.getProgramname(), "EVT_NM", "Programname");
assertEquals(message.getBody(), "column1,column2", "Body");

One gotcha to watch out for (which always manages to bite me in the butt, and is the impetus for writing this post) is that handlers will only process the type of data that they understand. Data of other types is passed along completely untouched. For example, while the following successfully compiles, it throws a java.lang.ClassCastException: java.lang.String cannot be cast to IRsyslogMessage at embedder.poll():

RsyslogDecoder decoder = new RsyslogDecoder();
DecoderEmbedder<IRsyslogMessage> embedder = new DecoderEmbedder<IRsyslogMessage>(decoder);
    embedder.offer("2011-06-30T00:00:03-07:00 some.host.agkn.net EVT_NM column1,column2\n");

IRsyslogMessage message = embedder.poll();
assertNotNull(message, "Decoded message");

The ChannelPipeline that backs the embedder can handle any type of input object. In the above case the object offer‘d is a string which is simply passed through the RsyslogDecoder untouched and tries unsuccessfully to pop out of the poll as an IRsyslogMessage. As long as you always make sure that your offered object is understood by one of your handlers then the embedder will work as you expect.

Big Data Ain’t Fat Data: A Case Study

We’ve always had a hunch that our users stick to the same geographic region. Sure, there’s the occasional jet-setter that takes their laptop from New York to Los Angeles (or like Rob, goes Chicago to San Francisco) on a daily or weekly basis, but they’re the exception and not the rule. Knowing how true this is can simplify the way we work with user-centric data across multiple data centers.

When Rob asked me to find this out for sure, my first instinct was to groan and fire up Hive on an Elastic MapReduce cluster, but after a second, I heard Matt’s voice in my head saying, “Big Data isn’t Fat Data”. Why bother with Hadoop?

The Setup

If I was solving this problem on a small data-set, it’d be pretty straight-forward. I could write a Python script in about 10 minutes that would take care of the problem. It would probably look something like:

users = {}

for line in sys.stdin:
    user, data_center = parse(line)
    try:
        users[user].append(data_center)
    except KeyError:
        users[user] = [data_center]

total_users = len(users)
multiple_dc_users = len([u for u in users if len(users[u]) > 1])

Easy peasy. However, explicitly storing such a large hash-table gets a little problematic once you start approaching medium-sized data (1GB+). Your memory needs grow pretty rapidly – with M users and N data centers, storage is O(MN) – , and things start to get a little slow in Python. At this point there are two options. You can brute force the problem by throwing hardware at it, either with a bigger machine or with something like Hadoop. Or, we can put on our Computer Science and Statistics hats and get a little bit clever.

What if we turn the problem sideways? Above, we’re keeping a hash table that holds a set of data-center for each user. Instead, let’s keep a set of users per data-center, splitting the problem up into multiple hash tables. This lets us keep a small, fixed number of tables – since I’d hope any company knows exactly how many data centers they have – and spread the load across them, hopefully making the load on each table more tolerable. We can then check how many sets each user falls into, and call it a day.

data_centers = dict([(dc, set()) for dc in AK_DATA_CENTERS])

for line in sys.stdin:
    user, data_center = parse(line)
    data_centers[data_center].add(user)

# Get the total users by intersecting all of the data center sets
...

# Get all users who are in exactly one set by taking symmetric differences (XOR) of data-center sets
# and count the size of that set.
...

While this approach theoretically has better performance with the same O(MN) space requirements, with big enough data the space requirements of the problem totally dominate whatever improvement this approach would provide. In other words, it doesn’t matter how small each hash table is, you can’t fit 80GB of user IDs into the 8GB of RAM on your laptop.

It’s looking pretty bleak for the Clever Way of doing things, since what we really want is a magic hash table that can store our 80GB of user IDs in the memory on our laptops.

Bloom Filters

Enter Bloom Filters. A bloom filter is a fixed-size set data structure with two minor features/drawbacks:

  1. You can never ask a Bloom Filter for the set of elements it contains.
  2. Membership queries have a small, controllable, false-positive probability. Bloom filters will never return false negatives.

With a little bit of work, it’s pretty easy to substitute Bloom Filters for plain old hash tables in our sideways approach above. There’s a slight tweak we have to make to our algorithm to accommodate the fact that we can’t ever query a bloom filter for the elements it contains, but the idea remains the same.

The Payoff

Suppose now we’re keeping a bloom-filter of users per data center. The only thing we have to work around is the fact that we’ll never be able to recover the list of users we’ve added to each set. So, we’ll just deal with users each time we see them instead of deferring our counting to the end.

With that idea in the bag, there are really only a few things to worry about when a request comes in for a given data center.

  • Check the bloom filter for that data center to see if the user has been to that one before
  • Check the other bloom filters to see how many other data-centers that user has been to before
  • Count the number of total data-centers that user has seen before. If the user is new to this data center, and the user has seen exactly one other data center before, increment the multiple data center user counter
  • If the user has never seen any of your data centers before, that user is a completely new user. Increment the total number of users seen.
  • If the user has already seen this data-center, this user is a repeat. Do nothing!

We ran our version of this overnight. It took us one core, 8GB of RAM, and just under than 4 hours to count the number of users who hit multiple data centers in a full week worth of logs.

Not bad!

Never trust a profiler

A week or so ago I had mentioned to Timon that for the first time a profiler had actually pointed me in a direction that directly lead to a positive increase in performance. Initially Timon just gave me that “you’re just a crotchety old man” look (which, in most cases, is the correct response). I pointed him to Josh Bloch’s Performance Anxiety presentation which dives into why it is so hard (in fact “impossible” in Josh’s words) to benchmark modern applications. It also references the interesting paper “Evaluating the Accuracy of Java Profilers”.

Just last week I was trying to track down a severe performance degradation in my snapshot recovery code. I was under some time pressure so I turned on my profiler to try to point me in the right direction. The result that it gave was clear, repeatable and unambiguous and pointed me into the linear probing algorithm of the hash table that I am using. Since I had recently moved to a new hash table (one that allowed for rehashing and resizing) it was possible that this was in fact the root of my performance problem but I had my doubts. (More on this in a future post.) I swapped out my hash table and re-profiled. The profiler again gave me a clear, repeatable and unambiguous result that my performance woes were solved so I moved on. When we were able to test the snapshot recovery code on production snapshots, we found that the performance problems still existed.

My profiler lied to me. Never trust your profiler.

Follow

Get every new post delivered to your Inbox.

Join 258 other followers