## Doubling the Size of an HLL Dynamically – Recovery

Author’s Note: This post is related to a few previous posts dealing with the HyperLogLog algorithm. See Matt’s overview of HLL, and see this post for an overview of “folding” or shrinking HLLs in order to perform set operations. It is also the first in a series of three posts on doubling the size of HLLs – the next two will be about set operations and utilizing additional bits of data, respectively.

### Overview

In this post, we explore the error of the cardinality estimate of an HLL whose size has been doubled using several different fill techniques. Specifically, we’re looking at how the error changes as additional keys are seen by the HLL.

#### A Quick Reminder – Terminology and Fill Strategies

If we have an HLL of size $2^n$ and we double it to be an HLL of size $2^{n+1}$, we call two bins “partners” if their bin number differs by $2^n$.  For example, in an HLL double to be size $8$, the bins $1$ and $5$ are partners, as are $2$ and $6$, etc. The Zeroes doubling strategy fills in the newly created bins with zeroes. The Concatenate strategy fills in the newly created bins with the values of their partner bins. MinusTwo fills in each bin with two less than its partner bin’s value. RE fills in the newly created bins according to the empirical distribution of each bin.

### Some Sample Recovery Paths

Below, we ran four experiments to check recovery time. Each experiment consisted of running an HLL of size 210 on 500,000 unique hashed keys (modeled here using a random number generator), doubling the HLL to be size 211, and then ran 500,000 more hashed keys through the HLL. Below, we have graphs showing how the error decreases as more keys are added.  Both graphs show the same data (the only difference being the scale on the y-axis). We have also graphed “Large,” an HLL of size $2^{11}$, and “Small,” an HLL of size $2^{10}$, which are shown only for comparison and are never doubled.

One thing to note about the graphs is that the error is relative.

Notice that Concatenate and Zeroes perform particularly poorly. Even after 500,000 extra keys have been added, they still don’t come within 5% of the true value! For Zeroes, this isn’t too surprising. Clearly the initial error of Zeroes, that is the error immediately after doubling, should be high.  A quick look at the harmonic mean shows why this occurs. If a single bin has a zero as its value, the harmonic mean of the values in the bins will be zero. Essentially, the harmonic mean of a list always tends towards the lowest elements of the list. Hence, even after all the zeroes have been replaced with positive values, the cardinality estimate will be very low.

On the other hand, a more surprising result is that Concatenate gives such a poor guess. To see this we need to look at the formula for the estimate again.  The formula for the cardinality estimate is $\frac{\alpha_m m^2}{\sum_{i=1}^{m} 2^{-M_i}}$ where $M_i$ is the value in the $i^{th}$ bin, $m$ is the number of bins, and $\alpha_m$ is a constant approaching about $.72$. For Concatenate, the value $M_{i + m}$ is equal to $M_i$.  Hence we have that the cardinality estimate for Concatenate is:

$\begin{array}{ll}\displaystyle\frac{\alpha_{2m} (2m)^2}{\left(\displaystyle\sum_{i=1}^{2m} 2^{-M_i}\right)} \vspace{10pt}&\approx \displaystyle\frac{.72\cdot 4\cdot m^2}{\left(\displaystyle\sum_{i=1}^m 2^{-M_i}\right) + \left(\displaystyle\sum_{i=1}^m 2^{-M_i}\right) }\vspace{10pt} \\&\displaystyle= \displaystyle 4\cdot \frac{.72 \cdot m^2}{2\cdot \left(\displaystyle\sum_{i=1}^m 2^{-M_i}\right)}\vspace{10pt}\\&= \displaystyle 2 \cdot \frac{.72 \cdot m^2}{\left(\displaystyle\sum_{i=1}^m 2^{-M_i}\right)}\vspace{10pt}\\&\approx \displaystyle 2 \cdot \frac{ \alpha_m \cdot m^2}{\left(\displaystyle\sum_{i=1}^m 2^{-M_i}\right)}\end{array}$

Notice that this last term is about equal to 2 times the cardinality estimate of the HLL before doubling. One quick thing that we can take away from this is that it is unlikely for two “partner” bins to have the same value in them (since if this happens frequently, we get an estimate close to that given by Concatenate – which is very inaccurate!).

As for MinusTwo and RE, these have small initial error and the error only falls afterwards. The initial error is small since the rules for these give guesses approximately equal to the guess of the original HLL before doubling. From here, the error should continue to shrink, and eventually, it should match that of the large HLL.

One thing we noticed was that error for Concatenate in the graph above suggested that the absolute error wasn’t decreasing at all. To check this we looked at the trials and, sure enough, the absolute error stays pretty flat. Essentially, Concatenate overestimates pretty badly, and puts the HLL in a state where it thinks it has seen twice as many keys as it actually has. In the short term, it will continue to make estimates as if it has seen 500,000 extra keys. We can see this clearly in the graphs below.

### Recovery Time Data

I also ran 100 experiments where we doubled the HLLs after adding 500,000 keys, then continued to add keys until the cardinality estimate fell within 5% of the true cardinality.  The HLLs were set up to stop running at 2,000,000 keys if they hadn’t arrived at the error bound.

Notice how badly Concatenate did! In no trials did it make it under 5% error. Zeroes did poorly as well, though it did recover eventually. My guess here is that the harmonic mean had a bit to do with this – any bin with a low value, call it $k$, in it would pull the estimate down to be about $m^2 \cdot 2^k$. As a result, the estimate produced by the Zeroes HLL will remain depressed until every bin is hit with a(n unlikely) high value. Zeroes and Concatenate should not do well since essentially the initial estimate (after doubling) of each HLL is off by a very large fixed amount. The graph of absolute errors, above, shows this.

On the other hand, RE and MinusTwo performed fairly well. Certainly, RE looks better in terms of median and middle 50%, though its variance is much higher than MinusTwo‘s.This should make sense as we are injecting a lot of randomness into RE when we fill in the values, whereas MinusTwo‘s bins are filled in deterministically.

### Recovery Time As A Function of Size

One might wonder whether the recovery time of MinusTwo and RE depend on the size of the HLL before the doubling process. To get a quick view of whether or not this is true, we did 1,000 trials like those above but by adding 200K, 400K, 600K, 800K, 1M keys and with a a cutoff of 3% this time. Below, we have the box plots for the data for each of these. The headings of each graph gives the size of the HLL before doubling, and the y-axis gives the fractional recovery time (the true recovery time divided by the size of the HLL before doubling).

Notice that, for each doubling rule, there is almost no variation between each of the plots. This suggests that the size of the HLL before doubling doesn’t change the fractional recovery time. As a side note, one thing that we found really surprising is that RE is no longer king – MinusTwo has a slightly better average case. We think that this is just a factor of the higher variation of RE and the change in cutoff.

### Summary

Of the four rules, MinusTwo and RE are clearly the best. Both take about 50 – 75% more keys after doubling to get within 3% error, and both are recover extremely quickly if you ask for them to only get within 5% error.

To leave you with one last little brainteaser, an HLL of size $2^{10}$, which is then doubled, will eventually have the same values in its bins as an HLL of size $2^{11}$ which ran on the same data. About how long will it take for these HLLs to converge? One (weak) requirement for this to happen is to have the value in every bin of both HLLs be changed. To get an upper bound on how long this should take, one should read about the coupon collector problem.

## Doubling the Size of an HLL Dynamically

### Introduction

In my last post, I explained how to halve the number of bins used in an HLL as a way to allow set operations between that HLL and smaller HLLs.  Unfortunately, the accuracy of an HLL is tied to the number of bins used, so one major drawback with this “folding” method is that each time you have the number of bins, you reduce that accuracy by a factor of $\sqrt{2}$.

In this series of posts I’ll focus on the opposite line of thinking: given an HLL, can one double the number of bins, assigning the new bins values according to some strategy, and recover some of the accuracy that a larger HLL would have had?  Certainly, one shouldn’t be able to do this (short of creating a new algorithm for counting distinct values) since once we use the HLL on a dataset the extra information that a larger HLL would have gleaned is gone.  We can’t recover it and so we can’t expect to magically pull a better estimate out of thin air (assuming Flajolet et al. have done their homework properly and the algorithm makes the best possible guess with the given information – which is a pretty good bet!).  Instead, in this series of posts, I’ll focus on how doubling plays with recovery time and set operations.  By this, I mean the following:  Suppose we have an HLL of size 2n and while its running, we double it to be an HLL of size 2n+1. Initially, this may have huge error, but if we allow it to continue running, how long will it take for its error to be relatively small?  I’ll also discuss some ways of modifying the algorithm to carry slightly more information.

### The Candidates

Before we begin, a quick piece of terminology.  Suppose we have an HLL of size 2n and we double it to be an HLL of size 2 n+1.  We consider two bins to be partners if their bin numbers differ by 2n.  To see why this is important – check the post on HLL folding.

Colin and I did some thinking and came up with a few naive strategies to fill in the newly created bins after the doubling. I’ve provided a basic outline of the strategies below.

• Zeroes – Fill in with zeroes.
• Concatenate – Fill in each bin with the value of its partner.
• MinusTwo – Fill in each bin with the value of its partner minus two. Two may seem like an arbitrary amount, but quick look at the formulas involved in the algorithm show that this leaves the cardinality estimate approximately unchanged.
• RandomEstimate (RE) – Fill in each bin according to its probability distribution. I’ll describe more about this later.
• ProportionDouble (PD) – This strategy is only for use with set operations. We estimate the number of bins in the two HLLs which should have the same value, filling in the second half so that that proportion holds and the rest are filled in according to RE.

#### Nitty Gritty of RE

The first three strategies given above are pretty self-explanatory, but the last two are a bit more complicated. To understand these, one needs to understand the distribution of values in a given bin.  In the original paper, Flajolet et al. calculate the probability that a given bin takes the value $k$ to be given by $(1 - 0.5^k)^v - (1 - 0.5^{k-1})^v$ where $v$ is the number of keys that the bin has seen so far. Of course, we don’t know this value ($v$) exactly, but we can easily estimate it by dividing the cardinality estimate by the number of bins. However, we have even more information than this. When choosing a value for our doubled HLL, we know that that value cannot exceed its partner’s value. To understand why this is so, look back at my post on folding, and notice how the value in the partner bins in a larger HLL correspond to the value in the related bin in the smaller HLL.

Hence, to get the distribution for the value in a given bin, we take the original distribution, chop it off at the relevant value, and rescale it to have total area 1. This may seem kind of hokey but let’s quickly look at a toy example. Suppose you ask me to guess a number between 1 and 10, and you will try to guess which number I picked. At this moment, assuming I’m a reasonable random number generator, there is a $1/10$ chance that I chose the number one, a $1/10$ chance that I chose the number two, etc. However, if I tell you that my guess is no larger than two, you can now say there there is a $1/2$ chance that my guess is a one, a $1/2$ chance that my guess is a two, and there is no chance that my guess is larger. So what happened here? We took the original probability distribution, used our knowledge to cut off and ignore the values above the maximum possible value, and then rescaled them so that the sum of the possible probabilities is equal to zero.

RE consists simply of finding this distribution, picking a value according to it, and placing that value in the relevant bin.

#### Nitty Gritty of PD

Recall that we only use PD for set operations. One thing we found was that the accuracy of doubling with set operations according to RE is highly dependent on the the intersection size of the two HLLs. To account for this, we examine the fraction of bins in the two HLLs which contain the same value, and then we force the doubled HLL to preserve this fraction

So how do we do this? Let’s say we have two HLLs: $H$ and $G$. We wish to double $H$ before taking its union with $G$. To estimate the proportion of their intersection, make a copy of $G$ and fold it to be the same size as $H$. Then count the number of bins where $G$ and $H$ agree, call this number $a$. Then if $m$ is the number of bins in $H$, we can estimate that $H$ and $G$ should overlap in about $a/m$ bins. Then for each bin, with probability $a/m$ we fill in the bin with the the minimum of the relevant bin from $G$ and that bin’s partner in $G$. With probability $1 - a/m$ we fill in the bin according to the rules of RE.

### The Posts

(Links will be added as the posts are published. Keep checking back for updates!)

## Sketching the last year

Sketching is an area of big-data science that has been getting a lot of attention lately. I personally am very excited about this.  Sketching analytics has been a primary focus of our platform and one of my personal interests for quite a while now. Sketching as an area of big-data science has been slow to unfold, (thanks Strata for declining our last two proposals on sketching talks!), but clearly the tide is turning. In fact, our summarizer technology, which relies heavily on our implementation of Distinct Value (DV) sketches, has been in the wild for almost a year now (and, obviously we were working on it for many months before that).

#### Fast, But Fickle

The R&D of the summarizer was fun but, as with most technical implementations, it’s never as easy as reading the papers and writing some code. The majority of the work we have done to make our DV sketches perform in production has nothing to do with the actual implementation.  We spend a lot of time focused on how we tune them, how we feed them, and make them play well with the rest of our stack.

Likewise, setting proper bounds on our sketches is an ongoing area of work for us and has led down some very interesting paths.  We have gained insights that are not just high level business problems, but very low level watchmaker type stuff.  Hash function behaviors and stream entropy alongside the skewness of data-sets themselves are areas we are constantly looking into to improve our implementations. This work has helped us refine and find optimizations around storage that aren’t limited to sketches themselves, but the architecture of the system as a whole.

#### Human Time Analytics

Leveraging DV sketches as more than just counters has proven unbelievably useful for us. The DV sketches we use provide arbitrary set operations. This comes in amazingly handy when our customers ask “How many users did we see on Facebook and on AOL this month that purchased something?” You can imagine how far these types of questions go in a real analytics platform. We have found that DV counts alongside set operation queries satisfy a large portion of our analytics platforms needs.

Using sketches for internal analytics has been a blast as well. Writing implementations and libraries in scripting languages enables our data-science team to perform very cool ad-hoc analyses faster and in “human-time”. Integrating DV sketches as custom data-types into existing databases has proven to be a boon for analysts and engineers alike.

#### Reap The Rewards

Over the course of the year that we’ve been using DV sketches to power analytics, the key takeaways we’ve found are: be VERY careful when choosing and implementing sketches; and leverage as many of their properties as possible.  When you get the formula right, these are powerful little structures. Enabling in-memory DV counting and set operations is pretty amazing when you think of the amount of data and analysis we support. Sketching as an area of big-data science seems to have (finally!) arrived and I, for one, welcome our new sketching overlords.

## Big Data Ain’t Fat Data: A Case Study

We’ve always had a hunch that our users stick to the same geographic region. Sure, there’s the occasional jet-setter that takes their laptop from New York to Los Angeles (or like Rob, goes Chicago to San Francisco) on a daily or weekly basis, but they’re the exception and not the rule. Knowing how true this is can simplify the way we work with user-centric data across multiple data centers.

When Rob asked me to find this out for sure, my first instinct was to groan and fire up Hive on an Elastic MapReduce cluster, but after a second, I heard Matt’s voice in my head saying, “Big Data isn’t Fat Data”. Why bother with Hadoop?

#### The Setup

If I was solving this problem on a small data-set, it’d be pretty straight-forward. I could write a Python script in about 10 minutes that would take care of the problem. It would probably look something like:

users = {}

for line in sys.stdin:
user, data_center = parse(line)
try:
users[user].append(data_center)
except KeyError:
users[user] = [data_center]

total_users = len(users)
multiple_dc_users = len([u for u in users if len(users[u]) > 1])


Easy peasy. However, explicitly storing such a large hash-table gets a little problematic once you start approaching medium-sized data (1GB+). Your memory needs grow pretty rapidly – with M users and N data centers, storage is O(MN) – , and things start to get a little slow in Python. At this point there are two options. You can brute force the problem by throwing hardware at it, either with a bigger machine or with something like Hadoop. Or, we can put on our Computer Science and Statistics hats and get a little bit clever.

What if we turn the problem sideways? Above, we’re keeping a hash table that holds a set of data-center for each user. Instead, let’s keep a set of users per data-center, splitting the problem up into multiple hash tables. This lets us keep a small, fixed number of tables – since I’d hope any company knows exactly how many data centers they have – and spread the load across them, hopefully making the load on each table more tolerable. We can then check how many sets each user falls into, and call it a day.

data_centers = dict([(dc, set()) for dc in AK_DATA_CENTERS])

for line in sys.stdin:
user, data_center = parse(line)

# Get the total users by intersecting all of the data center sets
...

# Get all users who are in exactly one set by taking symmetric differences (XOR) of data-center sets
# and count the size of that set.
...


While this approach theoretically has better performance with the same O(MN) space requirements, with big enough data the space requirements of the problem totally dominate whatever improvement this approach would provide. In other words, it doesn’t matter how small each hash table is, you can’t fit 80GB of user IDs into the 8GB of RAM on your laptop.

It’s looking pretty bleak for the Clever Way of doing things, since what we really want is a magic hash table that can store our 80GB of user IDs in the memory on our laptops.

#### Bloom Filters

Enter Bloom Filters. A bloom filter is a fixed-size set data structure with two minor features/drawbacks:

1. You can never ask a Bloom Filter for the set of elements it contains.
2. Membership queries have a small, controllable, false-positive probability. Bloom filters will never return false negatives.

With a little bit of work, it’s pretty easy to substitute Bloom Filters for plain old hash tables in our sideways approach above. There’s a slight tweak we have to make to our algorithm to accommodate the fact that we can’t ever query a bloom filter for the elements it contains, but the idea remains the same.

#### The Payoff

Suppose now we’re keeping a bloom-filter of users per data center. The only thing we have to work around is the fact that we’ll never be able to recover the list of users we’ve added to each set. So, we’ll just deal with users each time we see them instead of deferring our counting to the end.

With that idea in the bag, there are really only a few things to worry about when a request comes in for a given data center.

• Check the bloom filter for that data center to see if the user has been to that one before
• Check the other bloom filters to see how many other data-centers that user has been to before
• Count the number of total data-centers that user has seen before. If the user is new to this data center, and the user has seen exactly one other data center before, increment the multiple data center user counter
• If the user has never seen any of your data centers before, that user is a completely new user. Increment the total number of users seen.
• If the user has already seen this data-center, this user is a repeat. Do nothing!

We ran our version of this overnight. It took us one core, 8GB of RAM, and just under than 4 hours to count the number of users who hit multiple data centers in a full week worth of logs.

## On Accuracy and Precision

A joint post from Matt and Ben

Believe it or not, we’ve been getting inspired by MP3’s lately, and not by turning on music in the office. Instead, we drew a little bit of inspiration from the way MP3 encoding works. From wikipedia:

“The compression works by reducing accuracy of certain parts of sound that are considered to be beyond the auditory resolution ability of most people. This method is commonly referred to as perceptual coding. It uses psychoacoustic models to discard or reduce precision of components less audible to human hearing, and then records the remaining information in an efficient manner.”

Very similarly, in online advertising there are signals that go “beyond the resolution of advertisers to action”. Rather than tackling the problem of clickstream analysis in the standard way, we’ve employed an MP3-like philosophy to storage. Instead of storing absolutely everything and counting it, we’ve employed a probabilistic, streaming approach to measurement. This lets us give clients real-time measurements of how many users and impressions a campaign has seen at excruciating levels of detail. The downside is that our reports tends to include numbers like “301M unique users last month” as opposed to “301,123,098 unique users last month”, but we believe that the benefits of this approach far outweigh the cost of limiting precision.

### Give a little, get a lot

The precision of our approach does not depend on the size of the thing we’re counting. When we set our precision to +/-1%, we can tell the difference between 1000 and 990 as easily as we can tell the difference between 30 billion and 29.7 billion users. For example when we count the numbers of users a campaign reached in Wernersville, PA (Matt’s hometown) we can guarantee that we saw 1000 +/- 10 unique cookies, as well as saying the campaign reached 1 Billion +/- 10M unique cookies overall.

Our storage size is fixed once we choose our level of precision. This means that we can accurately predict the amount of storage needed and our system has no problem coping with increases in data volume and scales preposterously well. Just to reiterate, it takes exactly as much space to count the number of users you reach in Wernersville as it does to count the total number of users you reach in North America. Contrast this with sampling, where to maintain a fixed precision when capturing long-tail features (things that don’t show up a lot relative to the rest of the data-set, like Wernersville) you need to drastically increase the size of your storage.

The benefits of not having unexpected storage spikes, and scaling well are pretty obvious – fewer technical limits, fewer surprises, and lower costs for us, which directly translates to better value for our users and a more reliable product. A little bit of precision seems like a fair trade here.

The technique we chose supports set-operations. This lets us ask questions like, “how many unique users did I see from small towns in Pennsylvania today” and get an answer instantaneously by composing multiple data structures. Traditionally, the answers to questions like this have to be pre-computed, leaving you waiting for a long job to run every time you ask a question you haven’t prepared for. Fortunately, we can do these computations nearly instantaneously, so you can focus on digging into your data. You can try that small-town PA query again, but this time including Newton, MA (Ben’s hometown), and not worry that no one has prepared an answer.

Unfortunately, not all of these operations are subject to the same “nice” error bounds. However, we’ve put the time in to detect these errors, and make sure that the functionality our clients see degrades gracefully. And since our precision is tunable, we can always dial the precision up as necessary.

### Getting insight from data

Combined with our awesome streaming architecture this allows us to stop thinking about storage infrastructure as the limiting factor in analytics, similar to the way MP3 compression allows you to fit more and more music on your phone or MP3-player. When you throw the ability to have ad-hoc queries execute nearly instantly into the mix, we have no regrets about getting a little bit lossy. We’ve already had our fair share of internal revelations, and enabled clients to have quite a few of their own, just because it’s now just so easy to work with our data.

## Streaming Algorithms and Sketches

Here at Aggregate Knowledge we spend a lot of time thinking about how to do analytics on a massive amount of data. Rob recently posted about building our streaming datastore and the architecture that helps us deal with “big data”. Given a streaming architecture, the obvious question for the data scientist is “How do we fit in?”. Clearly we need to look towards streaming algorithms to match the speed and performance of our datastore.

A streaming algorithm is defined generally as having finite memory – significantly smaller than the data presented to it – and must process the input in one pass. Streaming algorithms start pretty simple, for instance counting the number of elements in the stream:

counter = 0
for event in stream:
counter += 1


While eventually counter will overflow (and you can be somewhat clever about avoiding that) this is way better than the non-streaming alternative.

elements = list(stream)
counter = len(elements)


Pretty simple stuff. Even a novice programmer can tell you why the second method is way worse than the first. You can get more complicated and keep the same basic approach – computing the mean of a floating point number stream is almost as simple: keep around counter as above, and add a new variable, total_sum += value_new. Now that we’re feeling smart, what about the quantiles of the stream? Ah! Now that is harder.

While it may not be immediately obvious, you can prove (as Munro and Paterson did in 1980) that computing exact quantiles of a stream requires memory that is at least linear with respect to the size of the stream. So, we’re left approximating a solution to the quantiles problem. A first stab might be sampling where you keep every 1000th element. While this isn’t horrible, it has it’s downsides – if your stream is infinite, you’ll still run out of space. It’s a good thing there are much better solutions. One of the first and most elegant was proposed by Cormode and Muthukrishnan in 2003 where they introduce the Count-Min sketch data structure. (A nice reference for sketching data structures can be found here.)

Count-Min sketch works much like a bloom filter. You compose k empty tables and k hash functions. For each incoming element we simply hash it through each function and increment the appropriate element in the corresponding table. To find out how many times we have historically seen a particular element we simply hash our query and take the MINIMUM value that we find in the tables. In this way we limit the effects of hash collision, and clearly we balance the size of the Count-Min sketch with the accuracy we require for the final answer. Heres how it works:

The Count-Min sketch is an approximation to the histogram of the incoming data, in fact it’s really only probabilistic when hashes collide. In order to compute quantiles we want to find the “mass” of the histogram above/below a certain point. Luckily Count-Min sketches support range queries of the type “select count(*) where val between 1 and x;“. Now it is just a matter of finding the quantile of choice.

To actually find the quantiles is slightly tricky, but not that hard. You basically have to perform a binary search with the range queries. So to find the first decile value, and supposing you kept around the the number of elements you have seen in the stream, you would binary search through values of x until the return count of the range query is 1/10 of the total count.

Pretty neat, huh?

## Custom Input/Output Formats in Hadoop Streaming

Like I’ve mentioned before, working with Hadoop’s documentation is not my favorite thing in the world, so I thought I’d provide a straightforward explanation of one of Hadoop’s coolest features – custom input/output formats in Hadoop streaming jobs.

### Use Case

It is common for us to have jobs that get results across a few weeks, or months, and it’s convenient to look at the data by day, week, or month. Sometimes including the date (preferably in a nice standard format) in the output isn’t quite enough, and for whatever reason it’s just more convenient to have each output file correspond to some logical unit.

Suppose we wanted to count unique users in our logs by state, by day. The streaming job probably starts looking something like:

hadoop jar /path/to/hadoop-streaming.jar \
-input log_data/ \
-output geo_output/ \
-mapper geo_mapper.py \
-reducer user_geo_count_reducer.py \
-cacheFile ip_geo_mapping.dat#geo.dat


And the output from the job might look like:

2011-06-20,CA,1512301
2011-06-21,CA,1541111
2011-06-22,CA,1300001
...
2011-06-20,IL,23244
2011-06-21,IL,23357
2011-0-21,IL,12213
...


This is kind of a pain. If we do this for a month of data and all 50 states appear every day, that’s at least 1500 records – not quite so easy to eyeball. So, let’s ask Hadoop to give us a file per day, named YYYY-MM-DD.csv, that contains all the counts for that day. 30 files containing 50 records each is much more manageable.

### Write Some Java

The first step is to write some Java. I know, this is a tutorial about writing Input/Output formats for Hadoop streaming jobs. Unfortunately, there is no way to write a custom output format other than in Java.

The good news is that once you’re set up to develop, both input and output formats tend to take minimal effort to write. Hadoop provides a class just for putting output records into different files based on the content of each record. Since we’re looking to split records based on the first field of each record, let’s subclass it.

public class DateFieldMultipleOutputFormat
extends MultipleTextOutputFormat<Text, Text> {

@Override
protected String generateFileNameForKeyValue(Text key, Text value, String name) {
String date = key.toString().split(",")[0];
return date + ".csv";
}
}


It’s a pretty simple exercise, even if you’ve never written a single line of Java. All the code does is take the first field of the key and use it as the output filename. Honestly, the hardest part is going to be setting up your IDE to work with Hadoop (the second hardest part was finding this blog post).

### Use It

The most recent Hadoop documentation I can find, still has documentation on using custom Input/Output formats in Hadoop 0.14. Very helpful.

It turns out life is still easy. When you look at a less-misleading part of the Hadoop Streaming documentation, all the pieces you need are right there. There are flags, -inputformat and -outputformat that let you specify Java classes as your input and output format. They have to be in the Java classpath when Hadoop runs, which is taken care of by the -libjars generic Hadoop option. There is no need to compile a custom streaming jar or anything crazy (I found worse suggestions on StackOverflow while figuring this out).

Using this newfound wisdom, it’s pretty trivial to add the output format to the existing streaming job. The next version of the job is going to look something like:

hadoop jar /path/to/hadoop-streaming.jar \
-libjars /path/to/custom-formats.jar \
-input log_data/ \
-output geo_output/ \
-outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
-mapper geo_mapper.py \
-reducer user_geo_count_reducer.py \
-cacheFile ip_geo_mapping.dat#geo.dat


### Pay Attention to the Details

If you write an output format like the one above and try to run a job, you’ll notice that some output records disappear. The overly-simple explanation is that Hadoop ends up opening the file for a specific date once for every reducer that date appears in, clobbering the data that was there before. Fortunately, it’s also easy to tell Hadoop how to send all the data from a date to the same reducer, so each file is opened exactly once. There isn’t even any Java involved.

All it takes is specifying the right partitioner class for the job on the command line. This partitioner is configured just like unix cut, so Data Scientists should have an easy time figuring out how to use it. To keep data from disappearing, tell the partitioner that the first field of the comma-delimited output record is the value to partition on.

With those options included, the final streaming job ends up looking like:

hadoop jar /path/to/hadoop-streaming.jar \
-libjars /path/to/custom-formats.jar \
-D map.output.key.field.separator=, \
-D mapred.text.key.partitioner.options=-k1,1 \
-input log_data/ \
-output geo_output/ \
-outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
-mapper geo_mapper.py \
-reducer user_geo_count_reducer.py \
-cacheFile ip_geo_mapping.dat#geo.dat


On the next run all the data should appear again, and the output directory should contain a file per day there is output. It’s not hard to take this example and get a little more complicated – it’d take minimal changes to make this job output to a file per state, for example. Not bad for a dozen lines of code and some command line flags.

## Using Tools You Already Have

I was reading FlowingData (a great data science blog!) a little while ago and came across a post on why programming is a good skill for data scientists to have. My first reaction was, “well, duh” – while I don’t expect everyone in the data science business to be a machine learning whiz or re-writing the linux kernel for fun, I would have thought that most people reading a the blog had some kind of computer science background. Judging from the comments on that post, it looks like my assumption was quite wrong – Nathan definitely has a fair number of readers who really appreciated that post.

Nathan is good enough to provide aspiring readers with a list of essential tools, and a list of good starting points. Both lists are quite good, covering everything from Python to Processing, but there’s a glaring omission: shell scripting. So, in the spirit of teaching, I thought I’d share a little bit about why every data scientist should know at least a bit about bash scripting.

### They’re Everywhere

The tools built in to every flavor of *nix (check out this IEEE standard) cover most of what you need to do to manipulate, manhandle, and munge data-sets. There are tools for selecting columns, sorting, de-duping, counting, pattern matching and text manipulation, joins, and more. In order, that translates into:

• cut
• sort
• uniq
• wc
• grep
• sed and awk
• join

I use all these nearly every day. The best part is, once you know they exist, these tools are available on every unix machine you will ever use. Nothing else (except maybe perl) is as universal – you don’t have to worry about versions or anything. Being comfortable with these tools means you can get work done anywhere – any EC2 instance you boot up will have them, as will any unix server you ssh into.

### They’re Efficient

One of the first lessons I learned as a programmer is that there is absolutely no way I can sort data using a script faster than I could do it with sort. With a small data-set, it’ll take you longer to write print statements than it will for sort to finish, and with large data sets, I’m just glad someone else wrote N-way external merge-sort for me.

Similarly, the other commands are highly optimized, and the code has been around for years, touched by many great hands (it’s fun reading a man page and seeing “Written by Richard Stallman” at the bottom), and used by thousands and thousands of people. So, there probably aren’t that many obvious bugs left.

If you want to be a geek about it (and we do), they’re also all, with the obvious exception of sort, one-pass algorithms and O(N) or better with low memory usage. What’s not to love?

### They’re easy to work with

Getting used to having things already done for you also makes a data-centric workflow more efficient. The first step of almost any data project is figuring out the idiosyncrasies of a data set. Knowing shell utilities lets you very easily get a first impression of a data set, and often gets you most of the way through the process of cleaning it up.

As an example, I can quickly get sense of how frequently users appear in our logs – let’s say the top 10 users – by just using a few commands chained together.

cut -d, -f1 data.csv | sort | uniq -c | sort -r | head

Running the same command again with tail instead of head gives the bottom 10 users by frequency, and with another round of cut I can get the whole frequency distribution of users in the log, all without writing a single line of code.

Once you end up doing this more than once, it’s easy to save a nice little script that you can easily re-run.

#! /bin/bash

if [ -z "$1" ]; then echo "usage: top_users.sh input_file" exit fi cut -d, -f1$1 | sort | uniq -c | sort -r


### EOF and Disclaimer

I’ve barely scratched the surface here, and already you can get a good sense of how easy and powerful a lot of these tools are. If you’re interested in picking them up, open a terminal and use man page, wikipedia, or your favorite search engine to find out more. There are good bash scripting guides scattered around the web, once you get interested in putting all of this together. The sky is the limit, really.

## My Love/Hate Relationship with Hadoop

A few months ago, the need for some log file analysis popped up. As the junior Data Scientist, I had the genuine pleasure of waking up one morning to an e-mail from Matt and Rob letting me know that I was expected to be playing with terabytes of data as soon as possible. Exciting, to say the least.

The project seemed like a perfect fit for Hadoop specifically Amazon’s Elastic MapReduce (EMR). So, I grabbed the company card, signed up, and dove right in. It’s been quite a learning experience.

After a few months learning the particulars of Amazon’s flavor of cloud computing and Hadoop’s take on distributed computing, I’ve developed a relationship with Hadoop as complicated as any MapReduce job – I’ve learned to love and loathe it at the same time.

### The Good

EMR is incredibly easy to interface with, despite some of Amazon’s tools being less-than stellar (I’m looking at you, Ruby CLI). The third-party APIs tend to be excellent. We’ve been using boto heavily.

Hadoop Streaming jobs are, like most everyone else on the internet will tell you, awesome for rapid prototyping and development. The rest of the Science team and I are not super concerned with speed for most of what we do in Hadoop, so we’re perfect users for Streaming jobs. We iterate on our models constantly, and Streaming makes it possible to easily test their behavior over whatever data we please.

The ability to include HIVE in an EMR workflow is yet another awesome bonus. It’s incredibly easy to boot up a cluster, install HIVE, and be doing simple SQL analytics in no time flat. External tables even make the data loading step a breeze.

While Hadoop and EMR have let us do some very cool things that wouldn’t be possible otherwise, we’ve had some problems too.

I’ve blown up NameNodes, run into the S3 file size limit, and hit what feels like every pain point in-between while formatting and compressing our data. I’ve crashed every JVM that Hadoop has to offer, broken the HIVE query planner, and had Streaming jobs run out of memory both because they were badly designed, and because I didn’t tweak the right settings. In short, after just a few months, with what I would consider some fairly simple, standard use cases, I’ve run into every “standard” Hadoop problem, along with what feels like more than my fair share of non-standard problems.

While it should be no surprise to anyone that a lone data-scientist can wreak havoc on any piece of software, there was a certain flavor to an unsettling large amount of these crises that really started to bother me.

After running into the dfs.datanode.max.xcievers property problem mentioned in the post above, I put my finger on both what makes a problem quintessentially Hadoop-y and why a Hadoop problem isn’t a good one to have.

### The Ugly

To fix any problem, you have to know about the problem. To know about a problem, you must have read the documentation or broken something enough times to start to pinpoint it.

Reading the documentation isn’t an option for learning about dfs.datanode.max.xcievers. It’s badly documented, there’s no default anywhere and it’s misspelled (i before e except after c). But once you know what’s going on it’s an easy fix to change a cluster’s configuration.

What’s so bad about a Hadoop problem is that causing enough issues to figure out a cause takes a large amount of time, in what I find to be the most disruptive way possible. It doesn’t take a large number of tries, or any particularly intelligent debugging effort, just a lot of sitting and waiting to see if you missed a configuration property or set one incorrectly. It doesn’t seem so bad at first, but since these problems often manifest only in extremely large data-sets, each iteration can take a significant amount of time, and you can be quite a ways through a job before they appear. Investigative work in such a stop and go pattern, mixed with the worst kind of system administration, is killing me. I don’t want to stop working in the middle of a cool thought because I had to adjust a value in an XML document from 1024 to 4096.

Never mind the hardware requirements Hadoop presents, or issues with HDFS or any of the legitimate, low level complaints people like Dale have. I don’t like working on Hadoop because you have to keep so much about Hadoop in the back of your mind for such little, rare gains. It’s almost as bad as having a small child (perhaps a baby elephant?) on my desk.

### What’s Next

The easy solution is to insulate me, the analyst, from the engineering. We could throw cash at the problem and dedicate an engineer or three to keeping a cluster operable. We could build a cluster in our data center. But this isn’t practical for any small company, especially when the projects don’t require you to keep a cluster running 24/7. Not only could the company not afford it, but it would be a waste of time and money.

The hard solution is coming up with something better. The whole team at AK believes that there is a better way, that working with big data can still be agile.

If possible, I should be able to access a data-set quickly and cleanly. The size and complexity of the tools that enable me to work with big data should be minimized. The barrier to entry should be low. While there are projects and companies that are trying to make using Hadoop easier and easier, I think the fundamental problem is with the one-very-large-framework-fits-all approach to big data. While Hadoop, and batch processing in general, has it’s time and place, there’s no reason I should need an elephantine framework to count anything, or find the mean of a list of numbers.

The rest of AK seems to agree. We all think the solution has to incorporate batch processing, somehow, but still embrace clever ways to navigate a large, constantly flowing data set. The crazy people here even think that our solution can be reliable enough that a Data Scientist can’t be too smart (or just incompetent enough) to break it.

## Hacking is Indeed a Data Science Skill

Recently I ran into a task that required me to manipulate a bunch of disparate log level data. Ahh, the tedium of data mining! There were a few particularly annoying things about this task.

1. The data was split into 3 distinct files. Impression logs, activity logs, and user-attribute logs.
2. The user-attribute logs were stored sequentially. It looked like:

request_id, date, user_id, attribute_id

where there were a varying number of lines for each request_id, representing how many attributes we may have been handed at that time for that user.

3. The data was reasonably large (~500GB)

What I wanted after this was a flattened, normalized data set to use for various modeling tasks. The output format needed to be:

request_id, date, user_id, activity_id, { attribute_ids }

The first approach I thought of was to get the entire set of unique attributes from the file using something like “cat | cut | sort -u”to create a database table and generate a bunch of inserts. This was dumb and obviously this gets annoying very quickly. Not to mention that my final data set would be a few 100GB and my research instance of Postgres would get real annoyed.

How about Hadoop?  While this isn’t a terrible answer, there are a few problems. Mainly, I’m under a deadline and getting 500GB to the cluster would take too long. What I really want is some Unix-foo that i can kick off and forget about. It feels like there is some “cut | join | awk” solution. These are times when i wish I had better Unix skills.  Maybe emacs has a function that does this and brings you lunch (c-x-lunch)?

So, what did I do?  Well, many definitions of data science include the technical skill of “hacking” as a necessary ingredient. One of the finer points of “hacking” has to be social engineering. It’s way easier to get the president of the bank drunk and have him tell you the combination to his lock than it is to crack the safe. So, along these lines i came up with a plan. Most engineers pride themselves on being extremely smart (and most are) and love challenges. This can also get them into trouble though. Next time you walk into an engineering meeting, ask an engineer what sorting algorithm Java uses and if it’s the right choice. One hour wasted!

Our CTO, Rob G., happens to be a brilliant engineer, so I called him up and casually brought up this annoying formatting problem I was having. He immediately started brainstorming solutions and he ended up talking himself into Java as the fastest way (wall clock) that he could get this done.  Fortunately, I’m not really a Java guy. So after Rob convinced himself that his solution was best, he also ended up talking himself into writing all the code. Awesome! Now, my annoying data task was “executing” and I could go back to work on more important things. This entire conversation took about 10 minutes. Much faster than Googling around for Unix foo. The next morning, my data set was all organized and sitting on one of our servers.  Hacking is indeed a useful data science skill!

I guess the moral here is twofold. 1) Sometimes asking for help (and figuring out ways to get it!) really is the best solution, and 2) distributing workloads across your team makes everybody work faster.

P.S. Obviously wasting the CTO’s time is never a good idea. Luckily, Rob is a champion of scheduling and apparently he had a few extra cycles, so no harm done to the greater good.