Big Data Ain’t Fat Data: A Case Study

We’ve always had a hunch that our users stick to the same geographic region. Sure, there’s the occasional jet-setter that takes their laptop from New York to Los Angeles (or like Rob, goes Chicago to San Francisco) on a daily or weekly basis, but they’re the exception and not the rule. Knowing how true this is can simplify the way we work with user-centric data across multiple data centers.

When Rob asked me to find this out for sure, my first instinct was to groan and fire up Hive on an Elastic MapReduce cluster, but after a second, I heard Matt’s voice in my head saying, “Big Data isn’t Fat Data”. Why bother with Hadoop?

The Setup

If I was solving this problem on a small data-set, it’d be pretty straight-forward. I could write a Python script in about 10 minutes that would take care of the problem. It would probably look something like:

users = {}

for line in sys.stdin:
    user, data_center = parse(line)
    try:
        users[user].append(data_center)
    except KeyError:
        users[user] = [data_center]

total_users = len(users)
multiple_dc_users = len([u for u in users if len(users[u]) > 1])

Easy peasy. However, explicitly storing such a large hash-table gets a little problematic once you start approaching medium-sized data (1GB+). Your memory needs grow pretty rapidly – with M users and N data centers, storage is O(MN) – , and things start to get a little slow in Python. At this point there are two options. You can brute force the problem by throwing hardware at it, either with a bigger machine or with something like Hadoop. Or, we can put on our Computer Science and Statistics hats and get a little bit clever.

What if we turn the problem sideways? Above, we’re keeping a hash table that holds a set of data-center for each user. Instead, let’s keep a set of users per data-center, splitting the problem up into multiple hash tables. This lets us keep a small, fixed number of tables – since I’d hope any company knows exactly how many data centers they have – and spread the load across them, hopefully making the load on each table more tolerable. We can then check how many sets each user falls into, and call it a day.

data_centers = dict([(dc, set()) for dc in AK_DATA_CENTERS])

for line in sys.stdin:
    user, data_center = parse(line)
    data_centers[data_center].add(user)

# Get the total users by intersecting all of the data center sets
...

# Get all users who are in exactly one set by taking symmetric differences (XOR) of data-center sets
# and count the size of that set.
...

While this approach theoretically has better performance with the same O(MN) space requirements, with big enough data the space requirements of the problem totally dominate whatever improvement this approach would provide. In other words, it doesn’t matter how small each hash table is, you can’t fit 80GB of user IDs into the 8GB of RAM on your laptop.

It’s looking pretty bleak for the Clever Way of doing things, since what we really want is a magic hash table that can store our 80GB of user IDs in the memory on our laptops.

Bloom Filters

Enter Bloom Filters. A bloom filter is a fixed-size set data structure with two minor features/drawbacks:

  1. You can never ask a Bloom Filter for the set of elements it contains.
  2. Membership queries have a small, controllable, false-positive probability. Bloom filters will never return false negatives.

With a little bit of work, it’s pretty easy to substitute Bloom Filters for plain old hash tables in our sideways approach above. There’s a slight tweak we have to make to our algorithm to accommodate the fact that we can’t ever query a bloom filter for the elements it contains, but the idea remains the same.

The Payoff

Suppose now we’re keeping a bloom-filter of users per data center. The only thing we have to work around is the fact that we’ll never be able to recover the list of users we’ve added to each set. So, we’ll just deal with users each time we see them instead of deferring our counting to the end.

With that idea in the bag, there are really only a few things to worry about when a request comes in for a given data center.

  • Check the bloom filter for that data center to see if the user has been to that one before
  • Check the other bloom filters to see how many other data-centers that user has been to before
  • Count the number of total data-centers that user has seen before. If the user is new to this data center, and the user has seen exactly one other data center before, increment the multiple data center user counter
  • If the user has never seen any of your data centers before, that user is a completely new user. Increment the total number of users seen.
  • If the user has already seen this data-center, this user is a repeat. Do nothing!

We ran our version of this overnight. It took us one core, 8GB of RAM, and just under than 4 hours to count the number of users who hit multiple data centers in a full week worth of logs.

Not bad!

Custom Input/Output Formats in Hadoop Streaming

Like I’ve mentioned before, working with Hadoop’s documentation is not my favorite thing in the world, so I thought I’d provide a straightforward explanation of one of Hadoop’s coolest features – custom input/output formats in Hadoop streaming jobs.

Use Case

It is common for us to have jobs that get results across a few weeks, or months, and it’s convenient to look at the data by day, week, or month. Sometimes including the date (preferably in a nice standard format) in the output isn’t quite enough, and for whatever reason it’s just more convenient to have each output file correspond to some logical unit.

Suppose we wanted to count unique users in our logs by state, by day. The streaming job probably starts looking something like:

hadoop jar /path/to/hadoop-streaming.jar \
        -input log_data/ \
        -output geo_output/ \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -cacheFile ip_geo_mapping.dat#geo.dat  

And the output from the job might look like:

2011-06-20,CA,1512301
2011-06-21,CA,1541111
2011-06-22,CA,1300001
...
2011-06-20,IL,23244
2011-06-21,IL,23357
2011-0-21,IL,12213
...

This is kind of a pain. If we do this for a month of data and all 50 states appear every day, that’s at least 1500 records – not quite so easy to eyeball. So, let’s ask Hadoop to give us a file per day, named YYYY-MM-DD.csv, that contains all the counts for that day. 30 files containing 50 records each is much more manageable.

Write Some Java

The first step is to write some Java. I know, this is a tutorial about writing Input/Output formats for Hadoop streaming jobs. Unfortunately, there is no way to write a custom output format other than in Java.

The good news is that once you’re set up to develop, both input and output formats tend to take minimal effort to write. Hadoop provides a class just for putting output records into different files based on the content of each record. Since we’re looking to split records based on the first field of each record, let’s subclass it.

public class DateFieldMultipleOutputFormat
    extends MultipleTextOutputFormat<Text, Text> {

    @Override
    protected String generateFileNameForKeyValue(Text key, Text value, String name) {
        String date = key.toString().split(",")[0];
        return date + ".csv";
    }
}

It’s a pretty simple exercise, even if you’ve never written a single line of Java. All the code does is take the first field of the key and use it as the output filename. Honestly, the hardest part is going to be setting up your IDE to work with Hadoop (the second hardest part was finding this blog post).

Use It

The most recent Hadoop documentation I can find, still has documentation on using custom Input/Output formats in Hadoop 0.14. Very helpful.

It turns out life is still easy. When you look at a less-misleading part of the Hadoop Streaming documentation, all the pieces you need are right there. There are flags, -inputformat and -outputformat that let you specify Java classes as your input and output format. They have to be in the Java classpath when Hadoop runs, which is taken care of by the -libjars generic Hadoop option. There is no need to compile a custom streaming jar or anything crazy (I found worse suggestions on StackOverflow while figuring this out).

Using this newfound wisdom, it’s pretty trivial to add the output format to the existing streaming job. The next version of the job is going to look something like:

hadoop jar /path/to/hadoop-streaming.jar \
        -libjars /path/to/custom-formats.jar \
        -input log_data/ \
        -output geo_output/ \
        -outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -cacheFile ip_geo_mapping.dat#geo.dat  

Pay Attention to the Details

If you write an output format like the one above and try to run a job, you’ll notice that some output records disappear. The overly-simple explanation is that Hadoop ends up opening the file for a specific date once for every reducer that date appears in, clobbering the data that was there before. Fortunately, it’s also easy to tell Hadoop how to send all the data from a date to the same reducer, so each file is opened exactly once. There isn’t even any Java involved.

All it takes is specifying the right partitioner class for the job on the command line. This partitioner is configured just like unix cut, so Data Scientists should have an easy time figuring out how to use it. To keep data from disappearing, tell the partitioner that the first field of the comma-delimited output record is the value to partition on.

With those options included, the final streaming job ends up looking like:

hadoop jar /path/to/hadoop-streaming.jar \
        -libjars /path/to/custom-formats.jar \
        -D map.output.key.field.separator=, \
        -D mapred.text.key.partitioner.options=-k1,1 \
        -input log_data/ \
        -output geo_output/ \
        -outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
        -cacheFile ip_geo_mapping.dat#geo.dat  

On the next run all the data should appear again, and the output directory should contain a file per day there is output. It’s not hard to take this example and get a little more complicated – it’d take minimal changes to make this job output to a file per state, for example. Not bad for a dozen lines of code and some command line flags.

Using Tools You Already Have

I was reading FlowingData (a great data science blog!) a little while ago and came across a post on why programming is a good skill for data scientists to have. My first reaction was, “well, duh” – while I don’t expect everyone in the data science business to be a machine learning whiz or re-writing the linux kernel for fun, I would have thought that most people reading a the blog had some kind of computer science background. Judging from the comments on that post, it looks like my assumption was quite wrong – Nathan definitely has a fair number of readers who really appreciated that post.

Nathan is good enough to provide aspiring readers with a list of essential tools, and a list of good starting points. Both lists are quite good, covering everything from Python to Processing, but there’s a glaring omission: shell scripting. So, in the spirit of teaching, I thought I’d share a little bit about why every data scientist should know at least a bit about bash scripting.

They’re Everywhere

The tools built in to every flavor of *nix (check out this IEEE standard) cover most of what you need to do to manipulate, manhandle, and munge data-sets. There are tools for selecting columns, sorting, de-duping, counting, pattern matching and text manipulation, joins, and more. In order, that translates into:

  • cut
  • sort
  • uniq
  • wc
  • grep
  • sed and awk
  • join

I use all these nearly every day. The best part is, once you know they exist, these tools are available on every unix machine you will ever use. Nothing else (except maybe perl) is as universal – you don’t have to worry about versions or anything. Being comfortable with these tools means you can get work done anywhere – any EC2 instance you boot up will have them, as will any unix server you ssh into.

They’re Efficient

One of the first lessons I learned as a programmer is that there is absolutely no way I can sort data using a script faster than I could do it with sort. With a small data-set, it’ll take you longer to write print statements than it will for sort to finish, and with large data sets, I’m just glad someone else wrote N-way external merge-sort for me.

Similarly, the other commands are highly optimized, and the code has been around for years, touched by many great hands (it’s fun reading a man page and seeing “Written by Richard Stallman” at the bottom), and used by thousands and thousands of people. So, there probably aren’t that many obvious bugs left.

If you want to be a geek about it (and we do), they’re also all, with the obvious exception of sort, one-pass algorithms and O(N) or better with low memory usage. What’s not to love?

They’re easy to work with

Getting used to having things already done for you also makes a data-centric workflow more efficient. The first step of almost any data project is figuring out the idiosyncrasies of a data set. Knowing shell utilities lets you very easily get a first impression of a data set, and often gets you most of the way through the process of cleaning it up.

As an example, I can quickly get sense of how frequently users appear in our logs – let’s say the top 10 users – by just using a few commands chained together.

cut -d, -f1 data.csv | sort | uniq -c | sort -r | head

Running the same command again with tail instead of head gives the bottom 10 users by frequency, and with another round of cut I can get the whole frequency distribution of users in the log, all without writing a single line of code.

Once you end up doing this more than once, it’s easy to save a nice little script that you can easily re-run.

#! /bin/bash

if [ -z "$1" ]; then
  echo "usage: top_users.sh input_file"
  exit
fi

cut -d, -f1 $1 | sort | uniq -c | sort -r

EOF and Disclaimer

I’ve barely scratched the surface here, and already you can get a good sense of how easy and powerful a lot of these tools are. If you’re interested in picking them up, open a terminal and use man page, wikipedia, or your favorite search engine to find out more. There are good bash scripting guides scattered around the web, once you get interested in putting all of this together. The sky is the limit, really.

My Love/Hate Relationship with Hadoop

Hadoop

A few months ago, the need for some log file analysis popped up. As the junior Data Scientist, I had the genuine pleasure of waking up one morning to an e-mail from Matt and Rob letting me know that I was expected to be playing with terabytes of data as soon as possible. Exciting, to say the least.

The project seemed like a perfect fit for Hadoop specifically Amazon’s Elastic MapReduce (EMR). So, I grabbed the company card, signed up, and dove right in. It’s been quite a learning experience.

After a few months learning the particulars of Amazon’s flavor of cloud computing and Hadoop’s take on distributed computing, I’ve developed a relationship with Hadoop as complicated as any MapReduce job – I’ve learned to love and loathe it at the same time.

The Good

EMR is incredibly easy to interface with, despite some of Amazon’s tools being less-than stellar (I’m looking at you, Ruby CLI). The third-party APIs tend to be excellent. We’ve been using boto heavily.

Hadoop Streaming jobs are, like most everyone else on the internet will tell you, awesome for rapid prototyping and development. The rest of the Science team and I are not super concerned with speed for most of what we do in Hadoop, so we’re perfect users for Streaming jobs. We iterate on our models constantly, and Streaming makes it possible to easily test their behavior over whatever data we please.

The ability to include HIVE in an EMR workflow is yet another awesome bonus. It’s incredibly easy to boot up a cluster, install HIVE, and be doing simple SQL analytics in no time flat. External tables even make the data loading step a breeze.

The Bad

While Hadoop and EMR have let us do some very cool things that wouldn’t be possible otherwise, we’ve had some problems too.

I’ve blown up NameNodes, run into the S3 file size limit, and hit what feels like every pain point in-between while formatting and compressing our data. I’ve crashed every JVM that Hadoop has to offer, broken the HIVE query planner, and had Streaming jobs run out of memory both because they were badly designed, and because I didn’t tweak the right settings. In short, after just a few months, with what I would consider some fairly simple, standard use cases, I’ve run into every “standard” Hadoop problem, along with what feels like more than my fair share of non-standard problems.

While it should be no surprise to anyone that a lone data-scientist can wreak havoc on any piece of software, there was a certain flavor to an unsettling large amount of these crises that really started to bother me.

After running into the dfs.datanode.max.xcievers property problem mentioned in the post above, I put my finger on both what makes a problem quintessentially Hadoop-y and why a Hadoop problem isn’t a good one to have.

The Ugly

To fix any problem, you have to know about the problem. To know about a problem, you must have read the documentation or broken something enough times to start to pinpoint it.

Reading the documentation isn’t an option for learning about dfs.datanode.max.xcievers. It’s badly documented, there’s no default anywhere and it’s misspelled (i before e except after c). But once you know what’s going on it’s an easy fix to change a cluster’s configuration.

What’s so bad about a Hadoop problem is that causing enough issues to figure out a cause takes a large amount of time, in what I find to be the most disruptive way possible. It doesn’t take a large number of tries, or any particularly intelligent debugging effort, just a lot of sitting and waiting to see if you missed a configuration property or set one incorrectly. It doesn’t seem so bad at first, but since these problems often manifest only in extremely large data-sets, each iteration can take a significant amount of time, and you can be quite a ways through a job before they appear. Investigative work in such a stop and go pattern, mixed with the worst kind of system administration, is killing me. I don’t want to stop working in the middle of a cool thought because I had to adjust a value in an XML document from 1024 to 4096.

Never mind the hardware requirements Hadoop presents, or issues with HDFS or any of the legitimate, low level complaints people like Dale have. I don’t like working on Hadoop because you have to keep so much about Hadoop in the back of your mind for such little, rare gains. It’s almost as bad as having a small child (perhaps a baby elephant?) on my desk.

What’s Next

The easy solution is to insulate me, the analyst, from the engineering. We could throw cash at the problem and dedicate an engineer or three to keeping a cluster operable. We could build a cluster in our data center. But this isn’t practical for any small company, especially when the projects don’t require you to keep a cluster running 24/7. Not only could the company not afford it, but it would be a waste of time and money.

The hard solution is coming up with something better. The whole team at AK believes that there is a better way, that working with big data can still be agile.

If possible, I should be able to access a data-set quickly and cleanly. The size and complexity of the tools that enable me to work with big data should be minimized. The barrier to entry should be low. While there are projects and companies that are trying to make using Hadoop easier and easier, I think the fundamental problem is with the one-very-large-framework-fits-all approach to big data. While Hadoop, and batch processing in general, has it’s time and place, there’s no reason I should need an elephantine framework to count anything, or find the mean of a list of numbers.

The rest of AK seems to agree. We all think the solution has to incorporate batch processing, somehow, but still embrace clever ways to navigate a large, constantly flowing data set. The crazy people here even think that our solution can be reliable enough that a Data Scientist can’t be too smart (or just incompetent enough) to break it.

Follow

Get every new post delivered to your Inbox.

Join 258 other followers