Yesterday we had a visitor at the office, Bahman Bahmani. He was nice enough to give us a preview of his talk for Strata this week. As we are sketching cheerleaders, it was really cool of him to let us see his talk and to trade some war stories. If you are at Strata this week, definitely go and check it out. He has some really cool examples of sketching applications and a detailed description of his work at Twitter for their streaming PageRank sketch.
There is much buzz about “big data” and “big analytics” but precious little information exists about the struggle of building an infrastructure to tackle these problems. Some notable exceptions are Facebook, Twitter’s Rainbird and MetaMarket’s Druid. In this post we provide an overview of how we built Aggregate Knowledge’s “big analytics” infrastructure. It will cover how we mix rsyslog, 0MQ and our in-house streaming key-value store to route hundreds of thousands of events per second and efficiently answer reporting queries over billions of events per day.
Overview and Goal
Recording and reporting on advertising events (impressions, interactions, conversions) is the core of what we do at Aggregate Knowledge. We capture information about:
- Who: audience, user attributes, browser
- What: impression, interaction, conversion
- Where: placement, ad size, context
- When: global time, user’s time, day part
just to name a few. We call these or any combination of these our keys. The types of metrics (or values) that we support but aren’t limited to:
- Counts: number of impressions, number of conversions, number of unique users (unique cookies)
- Revenue: total inventory cost, data cost
- Derived: click-through rate (CTR), cost per action (CPA)
Our reports support drill-downs and roll-ups all of the available dimensions — we support many of the standard OLAP functions.
We are architecting for a sustained event ingest rate of 500k events per second over a 14 hour “internet day” yielding around 30 billion events per day (or around 1 trillion events a month). Our daily reports run over billions of events should take seconds to run and our monthly or lifetime reports run over hundreds of billion events should take at most minutes.
Over the past few years we have taken a few different paths to produce our reports with varying degrees of success.
First Attempt: Warehouses, Map-Reduce and Batching
When I first started at Aggregate Knowledge we had a multi-terabyte distributed warehouse that used Map-Reduce to process queries. The events were gathered from the edge where they were recorded and batch loaded into the warehouse on regular intervals. It stored hundreds of millions of facts (events) and took hours to generate reports. Some reports on unique users would take longer than a day to run. We had a team dedicated to maintaining and tuning the warehouse.
At the time our event recorders were placed on many high-volume news sites and it was quite common for us to see large spikes in the number of recorded events when a hot news story hit the wires. It was common for a 5 minute batch of events from a spike to take longer than 5 minutes to transfer, process and load which caused many headaches. Since the time it took to run a report was dependent on the number of events being processed, whenever a query would hit one of these spikes, reporting performance would suffer. Because we provided 30-, 60- and 90-day reports, a spike would cause us grief for a long time.
After suffering this pain for a while, this traditional approach of storing and aggregating facts seemed inappropriate for our use. Because our data is immutable once written, it seemed clear that we needed to pre-compute and store aggregated summaries. Why walk over hundreds of millions of facts summing along some dimension more than once if the answer is always a constant — simply store that constant. The summaries are bounded in size by the cardinality of the set of dimensions rather than the number of events. Our worries would move from something we could not control — the number of incoming events — to something that we could control — the dimensionality and number of our keys.
Second Attempt: Streaming Databases and Better Batching
Having previously worked on a financial trading platform, I had learned much about streaming databases and Complex Event Processing (e.g. Coral8, StreamBase, Truviso). Our second approach would compute our daily summaries in much the same way that a financial exchange keeps track and tally of trades. The event ingest of the streaming database would be the only part of our infrastructure affected by spikes in the number of events since everything downstream worked against the summaries. Our reporting times went from hours to seconds or sub-seconds. If we were a retail shop that had well-known dimensionality then we would likely still be using a streaming database today. It allowed us to focus on immediate insights and actionable reports rather than the warehouse falling over or an M-R query taking 12 hours.
Once worrying about individual events was a thing of the past, we started to look at the dimensionality of our data. We knew from our old warehouse data that the hypercube of dimensional data was very sparse but we didn’t know much else. The initial analysis of the distribution of keys yielded interesting results:
Keys are seen with frequencies that tend to follow Zipf’s Law:
the frequency of any key is inversely proportional to its rank in the frequency table
Put simply: there are a large number of things that we see very infrequently and a small number of things that we see very often. Decile (where the black line is 50%) and CDF plots of the key frequency provide additional insights:
60% of our keys have been seen hundreds of times or less and around 15% of our keys had been seen only once. (The graph only covers one set of our dimensions. As we add more dimensions to the graph the CDF curve gets steeper.) This told us that not only is the hypercube very sparse but the values tend to be quite small and are updated infrequently. If these facts could be exploited then the storage of the hypercube could be highly compressed even for many dimensions with high cardinality and stored very efficiently.
We improved our transfer, transform and loading batch processes to better cope with event volume spikes which resulted in less headaches but it still felt wrong. The phrase “batching into a streaming database” reveals the oxymoron. We didn’t progress much in computing unique user counts. Some of the streaming databases provided custom support for unique user counting but not at the volume and rate that we required. Another solution was needed.
Third Attempt: Custom Key-Value Store and Streaming Events
From our work with streaming databases we knew a few things:
- Out-of-order data was annoying (this is something that I will cover in future blog posts);
- Counting unique sets (unique users, unique keys) was hard;
- There was much efficiency to be gained in our distribution of keys and key-counts;
- Structured (or semi-structured) data suited us well;
- Batching data to a streaming database is silly;
Unfortunately none of the existing NoSQL solutions covered all of our cases. We built a Redis prototype and found that the majority of our code was in ingesting our events from our event routers, doing key management and exporting the summaries to our reporting tier. Building the storage in-house provided us the opportunity to create custom types for aggregation and sketches for the cardinality of large sets (e.g. unique user counting). Once we had these custom types it was a small leap to go from the Redis prototype to a full-featured in-house key-value store. We call this beast “The Summarizer”. (“The Aggregator” was already taken by our Ops team for event aggregation and routing.)
The summarizer simply maps events into summaries by key and updates the aggregates. Streaming algorithms are a natural fit in this type of data store. Many O(n^m) algorithms have streaming O(n) counterparts that provide sufficiently accurate results. (We’ll be covering streaming algorithms in future posts.) It provides us with a succinct (unsampled) summary that can be accessed in O(1). Currently we can aggregate more than 200,000 events per second per core (saturating just shy of 1 million events per second) where some events are aggregated into more than ten summaries.
Our summaries are computed per day. (Future blog posts will provide more information about how we treat time.) They are designed such that they rarely contain more than 10M rows and are stored in CSV format. Initially we used CSV simply because we already had all of the code written for ingesting 3rd party CSV data. We quickly found other uses for them: our analysts gobbled them up and use them in Excel, our data scientists use them directly in R, and even our engineers use them for back-of-the-envelope calculations. Having manageable summaries and/or sketches enabled agile analytics.
To get the events into our summarizer we completely rethought how events move through our infrastructure. Instead of batching events, we wanted to stream them. To deal with spikes and to simplify maintenance, we wanted to allow the events to be queued if downstream components became unavailable or unable to meet the current demand. We needed to be able to handle our desired ingest rate of 500k events per second. The answer was right under our noses: rsyslog and 0MQ. (See the “Real-Time Streaming for Data Analytics” and “Real-Time Streaming with Rsyslog and ZeroMQ” posts for more information.)
Our challenge was to be able to produce reports on demand over billions of events in seconds and over hundreds of billions in minutes while ingesting at most 500,000 events per second. Choosing the defacto technology de jour caused us to focus on fixing and maintaining the technology rather than solving business problems and providing value to our customers. We could have stayed with our first approach, immediately scaling to hundreds of nodes and taking on the challenges that solution presents. Instead, we looked at the types of answers we wanted and worked backwards until we could provide them easily on minimal hardware and little maintenance cost. Opting for the small, agile solution allowed us to solve business problems and provide value to our customers much more quickly.
Astute readers may have noticed the point on the far-left of the CDF graph and wondered how it was possible for a key to have been seen zero times or wondered why we would store keys that have no counts associated with them. We only summarize what is recorded in an event. The graph shows the frequency of keys as defined by impressions and it doesn’t include the contribution of any clicks or conversions. In other words, these “zero count keys” mean that for a given day there are clicks and/or conversions but no impressions. (This is common when a campaign ends.) In hindsight we should have summed the count of impressions, clicks and conversions and used that total in the graph but this provided the opportunity to show a feature of the summarizer — we can easily find days for which clicks and conversions have no impressions without running a nasty join.