HLL Intersections

Why?

The intersection of two streams (of user ids) is a particularly important business need in the advertising industry. For instance, if you want to reach suburban moms but the cost of targeting those women on a particular inventory provider is too high, you might want to know about a cheaper inventory provider whose audience overlaps with the first provider. You may also want to know how heavily your car-purchaser audience overlaps with a certain metro area or a particular income range. These types of operations are critical for understanding where and how well you’re spending your advertising budget.

As we’ve seen before, HyperLogLog provides a time- and memory-efficient algorithm for estimating the number of distinct values in a stream. For the past two years, we’ve been using HLL at AK to do just that: count the number of unique users in a stream of ad impressions. Conveniently, HLL also supports the union operator ( \cup ) allowing us to trivially estimate the distinct value count of any composition of streams without sacrificing precision or accuracy. This piqued our interest because if we can “losslessly” compute the union of two streams and produce low-error cardinality estimates, then there’s a chance we can use that estimate along with the inclusion-exclusion principle to produce “directionally correct” cardinality estimates of the intersection of two streams. (To be clear, when I say “directionally correct” my criteria is “can an advertiser make a decision off of this number?”, or “can it point them in the right direction for further research?”. This often means that we can tolerate relative errors of up to 50%.)

The goals were:

  1. Get a grasp on the theoretical error bounds of intersections done with HLLs, and
  2. Come up with heuristic bounds around m, overlap, and the set cardinalities that could inform our usage of HLL intersections in the AK product.

Quick terminology review:

  • If I have set of integers A, I’m going to call the HLL representing it H_{A}.
  • If I have HLLs H_{A}, H_{B} and their union H_{A \cup B}, then I’m going to call the intersection cardinality estimate produced |H_{A \cap B}|.
  • Define the overlap between two sets as overlap(A, B) := \frac{|A \cap B|}{min(|A|, |B|)}.
  • Define the cardinality ratio \frac{max(|A|, |B|)}{min(|A|, |B|)} as a shorthand for the relative cardinality of the two sets.
  • We’ll represent the absolute error of an observation |H_{A}| as \Delta |H_{A}|.

That should be enough for those following our recent posts, but for those just jumping in, check out Appendices A and B at the bottom of the post for a more thorough review of the set terminology and error terminology.

Experiment

We fixed 16 overlap values (0.0001, 0.001, 0.01, 0.02, 0.05, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0) and 12 set cardinalities (100M, 50M, 10M, 5M, 1M, 500K, 100K, 50K, 10K, 5K, 1K, 300) and did 100 runs of each permutation of (overlap, |A|, |B|). A random stream of 64-bit integers hashed with Murmur3 was used to create the two sets such that they shared exactly min(|A|,|B|) \cdot overlap = |A \cap B|  elements. We then built the corresponding HLLs H_{A} and H_{B} for those sets and calculated the intersection cardinality estimate |H_{A} \cap H_{B}| and computed its relative error.

Given that we could only generate and insert about 2M elements/second per core, doing runs with set cardinalities greater than 100M was quickly ruled out for this blog post. However, I can assure you the results hold for much larger sets (up to the multiple billion-element range) as long as you heed the advice below.

Results

This first group of plots has a lot going on, so I’ll preface it by saying that it’s just here to give you a general feeling for what’s going on. First note that within each group of boxplots overlap increases from left to right (orange to green to pink), and within each plot cardinality ratio increases from left to right. Also note that the y-axis (the relative error of the observation) is log-base-10 scale. You can clearly see that as the set sizes diverge, the error skyrockets for all but the most similar (in both cardinality and composition) sets. I’ve drawn in a horizontal line at 50% relative error to make it easier to see what falls under the “directionally correct” criteria. You can (and should) click for a full-sized image.

Note: the error bars narrow as we progress further to the right because there are fewer observations with very large cardinality ratios. This is an artifact of the experimental design.

interAre_vs_cardF_all_800

A few things jump out immediately:

  • For cardinality ratio > 500, the majority of observations have many thousands of percent error.
  • When cardinality ratio is smaller than that and overlap > 0.4, register count has little effect on error, which stays very low.
  • When overlap \le 0.01, register count has little effect on error, which stays very high.

Just eyeballing this, the lesson I get is that computing intersection cardinality with small error (relative to the true value) is difficult and only works within certain constraints. Specifically,

  1. \frac{|A|}{|B|} < 100, and
  2. overlap(A, B) = \frac{|A \cap B|}{min(|A|, |B|)} \ge 0.05.

The intuition behind this is very simple: if the error of any one of the terms in your calculation is roughly as large as the true value of the result then you’re not going to estimate that result well. Let’s look back at the intersection cardinality formula. The left-hand side (that we are trying to estimate) is a “small” value, usually. The three terms on the right tend to be “large” (or at least “larger”) values. If any of the “large” terms has error as large as the left-hand side we’re out of luck.

Overlap Examples

So, let’s say you can compute the cardinality of an HLL with relative error of a few percent. If |H_{A}| is two orders of magnitude smaller than |H_{B}|, then the error alone of |H_{B}| is roughly as large as |A|.

|A \cap B| \le |A| by definition, so

|A \cap B| \le |A| \approx |H_{A}| \approx \Delta |H_{B}|.

In the best scenario, where A \cap B = A, the errors of |H_{B}| and |H_{A \cup B}| \approx |H_{B}| are both roughly the same size as what you’re trying to measure. Furthermore, even if |A| \approx |B| but the overlap is very small, then |A \cap B|  will be roughly as large as the error of all three right-hand terms.

On the bubble

Let’s throw out the permutations whose error bounds clearly don’t support “directionally correct” answers (overlap < 0.01 and \frac{|A|}{|B|} > 500 ) and those that trivially do (overlap > 0.4 ) so we can focus more closely on the observations that are “on the bubble”. Sadly, these plots exhibit a good deal of variance inherent in their smaller sample size. Ideally we’d have tens of thousands of runs of each combination, but for now this rough sketch will hopefully be useful. (Apologies for the inconsistent colors between the two plots. It’s a real bear to coordinate these things in R.) Again, please click through for a larger, clearer image.

interAre_vs_cardF_good_800

By doubling the number of registers, the variance of the relative error falls by about a quarter and moves the median relative error down (closer to zero) by 10-20 points.

In general, we’ve seen that the following cutoffs perform pretty well, practically. Note that some of these aren’t reflected too clearly in the plots because of the smaller sample sizes.

Register Count Data Structure Size Overlap Cutoff Cardinality Ratio Cutoff
8192 5kB 0.05 10
16384 10kB 0.05 20
32768 20kB 0.05 30
65536 41kB 0.05 100

Error Estimation

To get a theoretical formulation of the error envelope for intersection, in terms of the two set sizes and their overlap, I tried the first and simplest error propagation technique I learned. For variables Y, Z, ..., and X a linear combination of those (independent) variables, we have

\Delta X = \sqrt{ (\Delta Y)^2 + (\Delta Z)^2 + ...}

Applied to the inclusion-exclusion formula:

\begin{array}{ll} \displaystyle \Delta |H_{A \cap B}| &= \sqrt{ (\Delta |H_{A}|)^2 + (\Delta |H_{B}|)^2 + (\Delta |H_{A \cup B}|)^2} \\ &= \sqrt{ (\sigma\cdot |A|)^2 + (\sigma\cdot |B|)^2 + (\sigma\cdot |A \cup B|)^2} \end{array}

where

\sigma = \frac{1.04}{\sqrt{m}} as in section 4 (“Discussion”) of the HLL paper.

Aside: Clearly |H_{A \cup B}| is not independent of |H_{A}| + |H_{B}|, though |H_{A}| is likely independent of |H_{B}|. However, I do not know how to a priori calculate the covariance in order to use an error propagation model for dependent variables. If you do, please pipe up in the comments!

I’ve plotted this error envelope against the relative error of the observations from HLLs with 8192 registers (approximately 5kB data structure).

err_bounds_good_800

Despite the crudeness of the method, it provided a 95% error envelope for the data without significant differences across cardinality ratio or overlap. Specifically, at least 95% of observations satisfied

(|H_{A \cap B}| - |A \cap B|) < \Delta |H_{A \cap B}|

However, it’s really only useful in the ranges shown in the table above. Past those cutoffs the bound becomes too loose and isn’t very useful.

This is particularly convenient because you can tune the number of registers you need to allocate based on the most common intersection sizes/overlaps you see in your application. Obviously, I’d recommend everyone run these tests and do this analysis on their business data, and not on some contrived setup for a blog post. We’ve definitely seen that we can get away with far less memory usage than expected to successfully power our features, simply because we tuned and experimented with our most common use cases.

Next Steps

We hope to improve the intersection cardinality result by finding alternatives to the inclusion-exclusion formula. We’ve tried a few different approaches, mostly centered around the idea of treating the register collections themselves as sets, and in my next post we’ll dive into those and other failed attempts!


Appendix A: A Review Of Sets

Let’s say we have two streams of user ids, S_{A} and S_{B}. Take a snapshot of the unique elements in those streams as sets and call them A and B. In the standard notation, we’ll represent the cardinality, or number of elements, of each set as |A| and |B|.

Example: If A = \{1,2,10\} then |A| = 3.

If I wanted to represent the unique elements in both of those sets combined as another set I would be performing the union, which is represented by A \cup B.

Example: If A = \{1,2,3\}, B=\{2,3,4\} then A \cup B = \{1,2,3,4\}.

If I wanted to represent the unique elements that appear in both A and B I would be performing the intersection, which is represented by A \cap B.

Example: With A, B as above, A \cap B = \{2,3\}.

The relationship between the union’s cardinality and the intersection’s cardinality is given by the inclusion-exclusion principle. (We’ll only be looking at the two-set version in this post.) For reference, the two-way inclusion-exclusion formula is |A \cap B| = |A| + |B| - |A \cup B| .

Example: With A, B as above, we see that |A \cap B| = 2 and |A| + |B| - |A \cup B| = 3 + 3 - 4 = 2.

For convenience we’ll define the overlap between two sets as overlap(A, B) := \frac{|A \cap B|}{min(|A|, |B|)}.

Example: With A, B as above, overlap(A,B) = \frac{|A \cap B|}{min(|A|, |B|)} = \frac{2}{min(3,3)} = \frac{2}{3}.

Similarly, for convenience, we’ll define the cardinality ratio \frac{max(|A|, |B|)}{min(|A|, |B|)} as a shorthand for the relative cardinality of the two sets.

The examples and operators shown above are all relevant for exact, true values. However, HLLs do not provide exact answers to the set cardinality question. They offer estimates of the cardinality along with certain error guarantees about those estimates. In order to differentiate between the two, we introduce HLL-specific operators.

Consider a set A. Call the HLL constructed from this set’s elements H_{A}. The cardinality estimate given by the HLL algorithm for H_{A} is |H_{A}|.

Define the union of two HLLs H_{A} \cup H_{B} := H_{A \cup B}, which is also the same as the HLL created by taking the pairwise max of H_{A}‘s and H_{B}‘s registers.

Finally, define the intersection cardinality of two HLLs in the obvious way: |H_{A} \cap H_{B}| := |H_{A}| + |H_{B}| - |H_{A \cup B}|. (This is simply the inclusion-exclusion formula for two sets with the cardinality estimates instead of the true values.)

Appendix B: A (Very Brief) Review of Error

The simplest way of understanding the error of an estimate is simply “how far is it from the truth?”. That is, what is the difference in value between the estimate and the exact value, also known as the absolute error.

However, that’s only useful if you’re only measuring a single thing over and over again. The primary criteria for judging the utility of HLL intersections is relative error because we are trying to measure intersections of many different sizes. In order to get an apples-to-apples comparison of the efficacy of our method, we normalize the absolute error by the true size of the intersection. So, for some observation \hat{x} whose exact value is non-zero x, we say that the relative error of the observation is \frac{x-\hat{x}}{x}. That is, “by what percentage off the true value is the observation off?”

Example: If |A| = 100, |H_{A}| = 90 then the relative error is \frac{100 - 90}{100} = \frac{10}{100} = 10\%.

On Accuracy and Precision

A joint post from Matt and Ben

Believe it or not, we’ve been getting inspired by MP3’s lately, and not by turning on music in the office. Instead, we drew a little bit of inspiration from the way MP3 encoding works. From wikipedia:

“The compression works by reducing accuracy of certain parts of sound that are considered to be beyond the auditory resolution ability of most people. This method is commonly referred to as perceptual coding. It uses psychoacoustic models to discard or reduce precision of components less audible to human hearing, and then records the remaining information in an efficient manner.”

Very similarly, in online advertising there are signals that go “beyond the resolution of advertisers to action”. Rather than tackling the problem of clickstream analysis in the standard way, we’ve employed an MP3-like philosophy to storage. Instead of storing absolutely everything and counting it, we’ve employed a probabilistic, streaming approach to measurement. This lets us give clients real-time measurements of how many users and impressions a campaign has seen at excruciating levels of detail. The downside is that our reports tends to include numbers like “301M unique users last month” as opposed to “301,123,098 unique users last month”, but we believe that the benefits of this approach far outweigh the cost of limiting precision.

Give a little, get a lot

The precision of our approach does not depend on the size of the thing we’re counting. When we set our precision to +/-1%, we can tell the difference between 1000 and 990 as easily as we can tell the difference between 30 billion and 29.7 billion users. For example when we count the numbers of users a campaign reached in Wernersville, PA (Matt’s hometown) we can guarantee that we saw 1000 +/- 10 unique cookies, as well as saying the campaign reached 1 Billion +/- 10M unique cookies overall.

Our storage size is fixed once we choose our level of precision. This means that we can accurately predict the amount of storage needed and our system has no problem coping with increases in data volume and scales preposterously well. Just to reiterate, it takes exactly as much space to count the number of users you reach in Wernersville as it does to count the total number of users you reach in North America. Contrast this with sampling, where to maintain a fixed precision when capturing long-tail features (things that don’t show up a lot relative to the rest of the data-set, like Wernersville) you need to drastically increase the size of your storage.

The benefits of not having unexpected storage spikes, and scaling well are pretty obvious – fewer technical limits, fewer surprises, and lower costs for us, which directly translates to better value for our users and a more reliable product. A little bit of precision seems like a fair trade here.

The technique we chose supports set-operations. This lets us ask questions like, “how many unique users did I see from small towns in Pennsylvania today” and get an answer instantaneously by composing multiple data structures. Traditionally, the answers to questions like this have to be pre-computed, leaving you waiting for a long job to run every time you ask a question you haven’t prepared for. Fortunately, we can do these computations nearly instantaneously, so you can focus on digging into your data. You can try that small-town PA query again, but this time including Newton, MA (Ben’s hometown), and not worry that no one has prepared an answer.

Unfortunately, not all of these operations are subject to the same “nice” error bounds. However, we’ve put the time in to detect these errors, and make sure that the functionality our clients see degrades gracefully. And since our precision is tunable, we can always dial the precision up as necessary.

Getting insight from data

Combined with our awesome streaming architecture this allows us to stop thinking about storage infrastructure as the limiting factor in analytics, similar to the way MP3 compression allows you to fit more and more music on your phone or MP3-player. When you throw the ability to have ad-hoc queries execute nearly instantly into the mix, we have no regrets about getting a little bit lossy. We’ve already had our fair share of internal revelations, and enabled clients to have quite a few of their own, just because it’s now just so easy to work with our data.

Building a Big Analytics Infrastructure

There is much buzz about “big data” and “big analytics” but precious little information exists about the struggle of building an infrastructure to tackle these problems. Some notable exceptions are Facebook, Twitter’s Rainbird and MetaMarket’s Druid. In this post we provide an overview of how we built Aggregate Knowledge’s “big analytics” infrastructure. It will cover how we mix rsyslog, 0MQ and our in-house streaming key-value store to route hundreds of thousands of events per second and efficiently answer reporting queries over billions of events per day.

Overview and Goal

Recording and reporting on advertising events (impressions, interactions, conversions) is the core of what we do at Aggregate Knowledge. We capture information about:

  • Who: audience, user attributes, browser
  • What: impression, interaction, conversion
  • Where: placement, ad size, context
  • When: global time, user’s time, day part

just to name a few. We call these or any combination of these our keys. The types of metrics (or values) that we support but aren’t limited to:

  • Counts: number of impressions, number of conversions, number of unique users (unique cookies)
  • Revenue: total inventory cost, data cost
  • Derived: click-through rate (CTR), cost per action (CPA)

Our reports support drill-downs and roll-ups all of the available dimensions — we support many of the standard OLAP functions.

We are architecting for a sustained event ingest rate of 500k events per second over a 14 hour “internet day” yielding around 30 billion events per day (or around 1 trillion events a month). Our daily reports run over billions of events should take seconds to run and our monthly or lifetime reports run over hundreds of billion events should take at most minutes.

Over the past few years we have taken a few different paths to produce our reports with varying degrees of success.

First Attempt: Warehouses, Map-Reduce and Batching

When I first started at Aggregate Knowledge we had a multi-terabyte distributed warehouse that used Map-Reduce to process queries. The events were gathered from the edge where they were recorded and batch loaded into the warehouse on regular intervals. It stored hundreds of millions of facts (events) and took hours to generate reports. Some reports on unique users would take longer than a day to run. We had a team dedicated to maintaining and tuning the warehouse.

At the time our event recorders were placed on many high-volume news sites and it was quite common for us to see large spikes in the number of recorded events when a hot news story hit the wires. It was common for a 5 minute batch of events from a spike to take longer than 5 minutes to transfer, process and load which caused many headaches. Since the time it took to run a report was dependent on the number of events being processed, whenever a query would hit one of these spikes, reporting performance would suffer. Because we provided 30-, 60- and 90-day reports, a spike would cause us grief for a long time.

After suffering this pain for a while, this traditional approach of storing and aggregating facts seemed inappropriate for our use. Because our data is immutable once written, it seemed clear that we needed to pre-compute and store aggregated summaries. Why walk over hundreds of millions of facts summing along some dimension more than once if the answer is always a constant — simply store that constant. The summaries are bounded in size by the cardinality of the set of dimensions rather than the number of events. Our worries would move from something we could not control — the number of incoming events — to something that we could control — the dimensionality and number of our keys.

Second Attempt: Streaming Databases and Better Batching

Having previously worked on a financial trading platform, I had learned much about streaming databases and Complex Event Processing (e.g. Coral8, StreamBase, Truviso). Our second approach would compute our daily summaries in much the same way that a financial exchange keeps track and tally of trades. The event ingest of the streaming database would be the only part of our infrastructure affected by spikes in the number of events since everything downstream worked against the summaries. Our reporting times went from hours to seconds or sub-seconds. If we were a retail shop that had well-known dimensionality then we would likely still be using a streaming database today. It allowed us to focus on immediate insights and actionable reports rather than the warehouse falling over or an M-R query taking 12 hours.

Once worrying about individual events was a thing of the past, we started to look at the dimensionality of our data. We knew from our old warehouse data that the hypercube of dimensional data was very sparse but we didn’t know much else. The initial analysis of the distribution of keys yielded interesting results:

Zipf: Key frequency

Keys are seen with frequencies that tend to follow Zipf’s Law:

the frequency of any key is inversely proportional to its rank in the frequency table

Put simply: there are a large number of things that we see very infrequently and a small number of things that we see very often. Decile (where the black line is 50%) and CDF plots of the key frequency provide additional insights:

Key Count Distribution DecileKey Frequency CDF

60% of our keys have been seen hundreds of times or less and around 15% of our keys had been seen only once. (The graph only covers one set of our dimensions. As we add more dimensions to the graph the CDF curve gets steeper.) This told us that not only is the hypercube very sparse but the values tend to be quite small and are updated infrequently. If these facts could be exploited then the storage of the hypercube could be highly compressed even for many dimensions with high cardinality and stored very efficiently.

We improved our transfer, transform and loading batch processes to better cope with event volume spikes which resulted in less headaches but it still felt wrong. The phrase “batching into a streaming database” reveals the oxymoron. We didn’t progress much in computing unique user counts. Some of the streaming databases provided custom support for unique user counting but not at the volume and rate that we required. Another solution was needed.

Third Attempt: Custom Key-Value Store and Streaming Events

From our work with streaming databases we knew a few things:

  • Out-of-order data was annoying (this is something that I will cover in future blog posts);
  • Counting unique sets (unique users, unique keys) was hard;
  • There was much efficiency to be gained in our distribution of keys and key-counts;
  • Structured (or semi-structured) data suited us well;
  • Batching data to a streaming database is silly;

Unfortunately none of the existing NoSQL solutions covered all of our cases. We built a Redis prototype and found that the majority of our code was in ingesting our events from our event routers, doing key management and exporting the summaries to our reporting tier. Building the storage in-house provided us the opportunity to create custom types for aggregation and sketches for the cardinality of large sets (e.g. unique user counting). Once we had these custom types it was a small leap to go from the Redis prototype to a full-featured in-house key-value store. We call this beast “The Summarizer”. (“The Aggregator” was already taken by our Ops team for event aggregation and routing.)

The summarizer simply maps events into summaries by key and updates the aggregates. Streaming algorithms are a natural fit in this type of data store. Many O(n^m) algorithms have streaming O(n) counterparts that provide sufficiently accurate results. (We’ll be covering streaming algorithms in future posts.) It provides us with a succinct (unsampled) summary that can be accessed in O(1). Currently we can aggregate more than 200,000 events per second per core (saturating just shy of 1 million events per second) where some events are aggregated into more than ten summaries.

Our summaries are computed per day. (Future blog posts will provide more information about how we treat time.) They are designed such that they rarely contain more than 10M rows and are stored in CSV format. Initially we used CSV simply because we already had all of the code written for ingesting 3rd party CSV data. We quickly found other uses for them: our analysts gobbled them up and use them in Excel, our data scientists use them directly in R, and even our engineers use them for back-of-the-envelope calculations. Having manageable summaries and/or sketches enabled agile analytics.

To get the events into our summarizer we completely rethought how events move through our infrastructure. Instead of batching events, we wanted to stream them. To deal with spikes and to simplify maintenance, we wanted to allow the events to be queued if downstream components became unavailable or unable to meet the current demand. We needed to be able to handle our desired ingest rate of 500k events per second. The answer was right under our noses: rsyslog and 0MQ. (See the “Real-Time Streaming for Data Analytics” and “Real-Time Streaming with Rsyslog and ZeroMQ” posts for more information.)

Wrap Up

Our challenge was to be able to produce reports on demand over billions of events in seconds and over hundreds of billions in minutes while ingesting at most 500,000 events per second. Choosing the defacto technology de jour caused us to focus on fixing and maintaining the technology rather than solving business problems and providing value to our customers. We could have stayed with our first approach, immediately scaling to hundreds of nodes and taking on the challenges that solution presents. Instead, we looked at the types of answers we wanted and worked backwards until we could provide them easily on minimal hardware and little maintenance cost. Opting for the small, agile solution allowed us to solve business problems and provide value to our customers much more quickly.

Footnote

Astute readers may have noticed the point on the far-left of the CDF graph and wondered how it was possible for a key to have been seen zero times or wondered why we would store keys that have no counts associated with them. We only summarize what is recorded in an event. The graph shows the frequency of keys as defined by impressions and it doesn’t include the contribution of any clicks or conversions. In other words, these “zero count keys” mean that for a given day there are clicks and/or conversions but no impressions. (This is common when a campaign ends.) In hindsight we should have summed the count of impressions, clicks and conversions and used that total in the graph but this provided the opportunity to show a feature of the summarizer — we can easily find days for which clicks and conversions have no impressions without running a nasty join.

Follow

Get every new post delivered to your Inbox.

Join 224 other followers