Summarizer

Since I get a lot of questions about our summarizer I thought I would provide some additional details.

Below is an example summary definition that we use:

summary CampaignDetails {
    key {
        int64   campaign_id = 1;
        int64   inventory_id = 2;
        int64   creative_id = 3;
        int64   audience_attribute_id = 4;
    }
    value {
        sumint32       impression_count = 1;
        multiset64     impression_uu = 2;

        sumcurrency    media_cost = 3;
        sumcurrency    data_cost = 4;

        summary Interaction {
            key {
                int64   action_id = 1;
            } 
            value {
                sumint32     count = 1;
                multiset64   uu_count = 2;
                sumcurrency  revenue = 3;
            }
        }
        repeated Interaction interactions = 5;

        summary Conversion {
            key {
                int64   model_id = 1;
                int64   type_id = 2;
            } 
            value {
                sumint32     count = 1;
                multiset64   uu_count = 2;
                sumcurrency  revenue = 3;
            }
        }
        repeated Conversion conversions = 6;
    }
}

The grammar was inspired by protobuf which is the IDL that we use to define our events. (The events themselves are currently a mix of CSV and JSON.) This schema definition is used to define both the in-memory storage format and the serialization format. The values are all aggregates of some kind: sumint32 and sumfloat aggregate (by summing) integers and floating-point numbers while multiset64 maintains a sketch of the set of unique users from which we can compute the cardinality. Streaming algorithms are a natural fit in this type of data store. Many O(n^m) algorithms have streaming O(n) counterparts that provide sufficiently accurate results. We’ll be covering streaming algorithms in future posts.

Ingested events are mapped to summaries via a mapping definition. Multiple events can be mapped to the same summary (by key). This feature has provided us with unexpected benefits: (Diving slightly into the nuances of ad serving) Impressions, clicks and conversions occur at three distinct times and are recorded as three separate events. When reporting on these entities one wants to see them as a single unit joined by their key. In other words, one wants to see clicks and conversions associated with impressions. In a relational database or map-reduce you one would need to group the impressions together by key, get their count, and join by key the group-by and count for the clicks and conversions. This is a bear of a join and when combined with counting the number of unique users it can bring even the largest cluster to its knees. The summarizer simply maps different events into the same summary by key and is aggregates as necessary. This provides us with a succinct (unsampled) summary of all impressions, clicks and conversions by key that can be accessed in O(1).

Currently we can aggregate more than 200,000 events per second per thread. Some events are aggregated into more than a dozen summaries. The key look-up and aggregation process parallelizes very well where we can ingest just shy of 1 million events per second per host. Even if we couldn’t reach our desired ingest rate, we could run multiple summarizers in parallel giving each a fraction of the event stream. A simple post-process (which could be performed by another summarizer) would bring all of the results together. We have around 50 summaries with an average of 10 fields (counters) each. We’re currently tracking about 20M keys which results in aggregating on more than 200M individual counters.

Our summaries are designed such that they rarely contain more than 10M rows and are stored in CSV format. Initially we used CSV simply because we already had all of the code written for ingesting 3rd party CSV data. We quickly found other uses for them: our analysts gobbled them up and use them in Excel, our data scientists use them directly in R, and even our engineers use them for back-of-the-envelope calculations. Having manageable summaries and/or sketches enabled agile analytics throughout our organization.

That’s just a quick overview of the summarizer. Please post comments for any questions that you may have!

Bring it on!

Recently I did a bit of show and tell on some of the volumes that we’re dealing with on our new infrastructure. It’s less than a month later and we’re handling over 3.5 billion events per day.

Nov 15, 2011 -- Events per second

(The three spikes that you see are due to the fact that we’re still trying to figure out the balance between event archive compression speed versus size. We should have this figured out over the next few weeks. Hopefully I can convince our Ops folks to do a post on what they’re learned.)

On Accuracy and Precision

A joint post from Matt and Ben

Believe it or not, we’ve been getting inspired by MP3’s lately, and not by turning on music in the office. Instead, we drew a little bit of inspiration from the way MP3 encoding works. From wikipedia:

“The compression works by reducing accuracy of certain parts of sound that are considered to be beyond the auditory resolution ability of most people. This method is commonly referred to as perceptual coding. It uses psychoacoustic models to discard or reduce precision of components less audible to human hearing, and then records the remaining information in an efficient manner.”

Very similarly, in online advertising there are signals that go “beyond the resolution of advertisers to action”. Rather than tackling the problem of clickstream analysis in the standard way, we’ve employed an MP3-like philosophy to storage. Instead of storing absolutely everything and counting it, we’ve employed a probabilistic, streaming approach to measurement. This lets us give clients real-time measurements of how many users and impressions a campaign has seen at excruciating levels of detail. The downside is that our reports tends to include numbers like “301M unique users last month” as opposed to “301,123,098 unique users last month”, but we believe that the benefits of this approach far outweigh the cost of limiting precision.

Give a little, get a lot

The precision of our approach does not depend on the size of the thing we’re counting. When we set our precision to +/-1%, we can tell the difference between 1000 and 990 as easily as we can tell the difference between 30 billion and 29.7 billion users. For example when we count the numbers of users a campaign reached in Wernersville, PA (Matt’s hometown) we can guarantee that we saw 1000 +/- 10 unique cookies, as well as saying the campaign reached 1 Billion +/- 10M unique cookies overall.

Our storage size is fixed once we choose our level of precision. This means that we can accurately predict the amount of storage needed and our system has no problem coping with increases in data volume and scales preposterously well. Just to reiterate, it takes exactly as much space to count the number of users you reach in Wernersville as it does to count the total number of users you reach in North America. Contrast this with sampling, where to maintain a fixed precision when capturing long-tail features (things that don’t show up a lot relative to the rest of the data-set, like Wernersville) you need to drastically increase the size of your storage.

The benefits of not having unexpected storage spikes, and scaling well are pretty obvious – fewer technical limits, fewer surprises, and lower costs for us, which directly translates to better value for our users and a more reliable product. A little bit of precision seems like a fair trade here.

The technique we chose supports set-operations. This lets us ask questions like, “how many unique users did I see from small towns in Pennsylvania today” and get an answer instantaneously by composing multiple data structures. Traditionally, the answers to questions like this have to be pre-computed, leaving you waiting for a long job to run every time you ask a question you haven’t prepared for. Fortunately, we can do these computations nearly instantaneously, so you can focus on digging into your data. You can try that small-town PA query again, but this time including Newton, MA (Ben’s hometown), and not worry that no one has prepared an answer.

Unfortunately, not all of these operations are subject to the same “nice” error bounds. However, we’ve put the time in to detect these errors, and make sure that the functionality our clients see degrades gracefully. And since our precision is tunable, we can always dial the precision up as necessary.

Getting insight from data

Combined with our awesome streaming architecture this allows us to stop thinking about storage infrastructure as the limiting factor in analytics, similar to the way MP3 compression allows you to fit more and more music on your phone or MP3-player. When you throw the ability to have ad-hoc queries execute nearly instantly into the mix, we have no regrets about getting a little bit lossy. We’ve already had our fair share of internal revelations, and enabled clients to have quite a few of their own, just because it’s now just so easy to work with our data.

A bit of Show and Tell

The show:

Oct 21, 2011 -- Events per second

Nov 7, 2011 -- Events per second

The tell:

We are in the process of ramping up the traffic on the new architecture that we’ve been blogging about. We recently reached a new milestone: 2 billion events in a single day. The system poked its head up, looked around a little, shrugged and went back to sleep. It performed very well yawning the whole time.

Streaming Algorithms and Sketches

Here at Aggregate Knowledge we spend a lot of time thinking about how to do analytics on a massive amount of data. Rob recently posted about building our streaming datastore and the architecture that helps us deal with “big data”. Given a streaming architecture, the obvious question for the data scientist is “How do we fit in?”. Clearly we need to look towards streaming algorithms to match the speed and performance of our datastore.

A streaming algorithm is defined generally as having finite memory – significantly smaller than the data presented to it – and must process the input in one pass. Streaming algorithms start pretty simple, for instance counting the number of elements in the stream:

counter = 0
for event in stream:
    counter += 1

While eventually counter will overflow (and you can be somewhat clever about avoiding that) this is way better than the non-streaming alternative.

elements = list(stream)
counter = len(elements)

Pretty simple stuff. Even a novice programmer can tell you why the second method is way worse than the first. You can get more complicated and keep the same basic approach – computing the mean of a floating point number stream is almost as simple: keep around counter as above, and add a new variable, total_sum += value_new. Now that we’re feeling smart, what about the quantiles of the stream? Ah! Now that is harder.

While it may not be immediately obvious, you can prove (as Munro and Paterson did in 1980) that computing exact quantiles of a stream requires memory that is at least linear with respect to the size of the stream. So, we’re left approximating a solution to the quantiles problem. A first stab might be sampling where you keep every 1000th element. While this isn’t horrible, it has it’s downsides – if your stream is infinite, you’ll still run out of space. It’s a good thing there are much better solutions. One of the first and most elegant was proposed by Cormode and Muthukrishnan in 2003 where they introduce the Count-Min sketch data structure. (A nice reference for sketching data structures can be found here.)

Count-Min sketch works much like a bloom filter. You compose k empty tables and k hash functions. For each incoming element we simply hash it through each function and increment the appropriate element in the corresponding table. To find out how many times we have historically seen a particular element we simply hash our query and take the MINIMUM value that we find in the tables. In this way we limit the effects of hash collision, and clearly we balance the size of the Count-Min sketch with the accuracy we require for the final answer. Heres how it works:

The Count-Min sketch is an approximation to the histogram of the incoming data, in fact it’s really only probabilistic when hashes collide. In order to compute quantiles we want to find the “mass” of the histogram above/below a certain point. Luckily Count-Min sketches support range queries of the type “select count(*) where val between 1 and x;“. Now it is just a matter of finding the quantile of choice.

To actually find the quantiles is slightly tricky, but not that hard. You basically have to perform a binary search with the range queries. So to find the first decile value, and supposing you kept around the the number of elements you have seen in the stream, you would binary search through values of x until the return count of the range query is 1/10 of the total count.

Pretty neat, huh?

My Love/Hate Relationship with Hadoop

Hadoop

A few months ago, the need for some log file analysis popped up. As the junior Data Scientist, I had the genuine pleasure of waking up one morning to an e-mail from Matt and Rob letting me know that I was expected to be playing with terabytes of data as soon as possible. Exciting, to say the least.

The project seemed like a perfect fit for Hadoop specifically Amazon’s Elastic MapReduce (EMR). So, I grabbed the company card, signed up, and dove right in. It’s been quite a learning experience.

After a few months learning the particulars of Amazon’s flavor of cloud computing and Hadoop’s take on distributed computing, I’ve developed a relationship with Hadoop as complicated as any MapReduce job – I’ve learned to love and loathe it at the same time.

The Good

EMR is incredibly easy to interface with, despite some of Amazon’s tools being less-than stellar (I’m looking at you, Ruby CLI). The third-party APIs tend to be excellent. We’ve been using boto heavily.

Hadoop Streaming jobs are, like most everyone else on the internet will tell you, awesome for rapid prototyping and development. The rest of the Science team and I are not super concerned with speed for most of what we do in Hadoop, so we’re perfect users for Streaming jobs. We iterate on our models constantly, and Streaming makes it possible to easily test their behavior over whatever data we please.

The ability to include HIVE in an EMR workflow is yet another awesome bonus. It’s incredibly easy to boot up a cluster, install HIVE, and be doing simple SQL analytics in no time flat. External tables even make the data loading step a breeze.

The Bad

While Hadoop and EMR have let us do some very cool things that wouldn’t be possible otherwise, we’ve had some problems too.

I’ve blown up NameNodes, run into the S3 file size limit, and hit what feels like every pain point in-between while formatting and compressing our data. I’ve crashed every JVM that Hadoop has to offer, broken the HIVE query planner, and had Streaming jobs run out of memory both because they were badly designed, and because I didn’t tweak the right settings. In short, after just a few months, with what I would consider some fairly simple, standard use cases, I’ve run into every “standard” Hadoop problem, along with what feels like more than my fair share of non-standard problems.

While it should be no surprise to anyone that a lone data-scientist can wreak havoc on any piece of software, there was a certain flavor to an unsettling large amount of these crises that really started to bother me.

After running into the dfs.datanode.max.xcievers property problem mentioned in the post above, I put my finger on both what makes a problem quintessentially Hadoop-y and why a Hadoop problem isn’t a good one to have.

The Ugly

To fix any problem, you have to know about the problem. To know about a problem, you must have read the documentation or broken something enough times to start to pinpoint it.

Reading the documentation isn’t an option for learning about dfs.datanode.max.xcievers. It’s badly documented, there’s no default anywhere and it’s misspelled (i before e except after c). But once you know what’s going on it’s an easy fix to change a cluster’s configuration.

What’s so bad about a Hadoop problem is that causing enough issues to figure out a cause takes a large amount of time, in what I find to be the most disruptive way possible. It doesn’t take a large number of tries, or any particularly intelligent debugging effort, just a lot of sitting and waiting to see if you missed a configuration property or set one incorrectly. It doesn’t seem so bad at first, but since these problems often manifest only in extremely large data-sets, each iteration can take a significant amount of time, and you can be quite a ways through a job before they appear. Investigative work in such a stop and go pattern, mixed with the worst kind of system administration, is killing me. I don’t want to stop working in the middle of a cool thought because I had to adjust a value in an XML document from 1024 to 4096.

Never mind the hardware requirements Hadoop presents, or issues with HDFS or any of the legitimate, low level complaints people like Dale have. I don’t like working on Hadoop because you have to keep so much about Hadoop in the back of your mind for such little, rare gains. It’s almost as bad as having a small child (perhaps a baby elephant?) on my desk.

What’s Next

The easy solution is to insulate me, the analyst, from the engineering. We could throw cash at the problem and dedicate an engineer or three to keeping a cluster operable. We could build a cluster in our data center. But this isn’t practical for any small company, especially when the projects don’t require you to keep a cluster running 24/7. Not only could the company not afford it, but it would be a waste of time and money.

The hard solution is coming up with something better. The whole team at AK believes that there is a better way, that working with big data can still be agile.

If possible, I should be able to access a data-set quickly and cleanly. The size and complexity of the tools that enable me to work with big data should be minimized. The barrier to entry should be low. While there are projects and companies that are trying to make using Hadoop easier and easier, I think the fundamental problem is with the one-very-large-framework-fits-all approach to big data. While Hadoop, and batch processing in general, has it’s time and place, there’s no reason I should need an elephantine framework to count anything, or find the mean of a list of numbers.

The rest of AK seems to agree. We all think the solution has to incorporate batch processing, somehow, but still embrace clever ways to navigate a large, constantly flowing data set. The crazy people here even think that our solution can be reliable enough that a Data Scientist can’t be too smart (or just incompetent enough) to break it.

2eight7

2-eight-7When I was writing my post yesterday I was reflecting on how much time we were spending making our code int friendly — specifically, dealing with the problems when you’re working with values around Integer.MAX_VALUE or 2147483647 (231-1). I likened it to i18n (internationalization) or l10n (localization). Much in that same vein, I’d like to coin the term “2eight7” to represent the problems one runs into when working with a signed integer and any code that depends (implicitly or explicitly) on it.

Let the debates begin!

Billions of anything

In most programming languages an int is 32 bits wide providing for 4294967295 (232-1) values or 2147483647 (231-1) if signed. In the case of Java, which we use for a number of components in our infrastructure, many of the fundamental components use int‘s: array indexes, NIO, IO, most collections (as they are commonly based on arrays), etc. When you’re working with billions of anything, its easy to run into these bounds which result in subtle bugs that are hard to track down due to exceptions that aren’t what they seem. The most common cases that we run into are due to the roll-over that occurs when you add any positive value to 2147483647 — the value becomes negative (since Java’s int‘s are signed). Sometimes this will result in an ArrayIndexOutOfBounds exception or sometimes it will result in a seemingly impossible callpath from deep inside of some java.* class.

I remember working on my first few i18N (internationalization) and l10n (localization) projects where I learned the do’s and don’ts of how to write code that worked seamlessly (or at least was easy to work with) in multiple locales. Working with “big data” feels exactly like that — you have to slowly build up a set of techniques:  instead of a single array, you need to keep around arrays of arrays (since each dimension is limited to 2147483647 elements); you have to know how to shard your collections so that they do not exceed the maximum allowed capacity (e.g. HashMap is limited to 1073741824 (230) buckets); if(value > Integer.MAX_VALUE) doesn’t do what you think it does (and most of the time it’s hard to tell that that’s the code that you wrote). The list goes on.

One interesting development was “announced” at EclipseCon: there’s talk about “big data” support in Java 9 (ref Reinhold Already Talking About Java 9 for example). This is something that I will keep my eye on. Unfortunately, it wont help us for the next few years.