Custom Input/Output Formats in Hadoop Streaming

Like I’ve mentioned before, working with Hadoop’s documentation is not my favorite thing in the world, so I thought I’d provide a straightforward explanation of one of Hadoop’s coolest features – custom input/output formats in Hadoop streaming jobs.

Use Case

It is common for us to have jobs that get results across a few weeks, or months, and it’s convenient to look at the data by day, week, or month. Sometimes including the date (preferably in a nice standard format) in the output isn’t quite enough, and for whatever reason it’s just more convenient to have each output file correspond to some logical unit.

Suppose we wanted to count unique users in our logs by state, by day. The streaming job probably starts looking something like:

hadoop jar /path/to/hadoop-streaming.jar \
        -input log_data/ \
        -output geo_output/ \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -cacheFile ip_geo_mapping.dat#geo.dat  

And the output from the job might look like:

2011-06-20,CA,1512301
2011-06-21,CA,1541111
2011-06-22,CA,1300001
...
2011-06-20,IL,23244
2011-06-21,IL,23357
2011-0-21,IL,12213
...

This is kind of a pain. If we do this for a month of data and all 50 states appear every day, that’s at least 1500 records – not quite so easy to eyeball. So, let’s ask Hadoop to give us a file per day, named YYYY-MM-DD.csv, that contains all the counts for that day. 30 files containing 50 records each is much more manageable.

Write Some Java

The first step is to write some Java. I know, this is a tutorial about writing Input/Output formats for Hadoop streaming jobs. Unfortunately, there is no way to write a custom output format other than in Java.

The good news is that once you’re set up to develop, both input and output formats tend to take minimal effort to write. Hadoop provides a class just for putting output records into different files based on the content of each record. Since we’re looking to split records based on the first field of each record, let’s subclass it.

public class DateFieldMultipleOutputFormat
    extends MultipleTextOutputFormat<Text, Text> {

    @Override
    protected String generateFileNameForKeyValue(Text key, Text value, String name) {
        String date = key.toString().split(",")[0];
        return date + ".csv";
    }
}

It’s a pretty simple exercise, even if you’ve never written a single line of Java. All the code does is take the first field of the key and use it as the output filename. Honestly, the hardest part is going to be setting up your IDE to work with Hadoop (the second hardest part was finding this blog post).

Use It

The most recent Hadoop documentation I can find, still has documentation on using custom Input/Output formats in Hadoop 0.14. Very helpful.

It turns out life is still easy. When you look at a less-misleading part of the Hadoop Streaming documentation, all the pieces you need are right there. There are flags, -inputformat and -outputformat that let you specify Java classes as your input and output format. They have to be in the Java classpath when Hadoop runs, which is taken care of by the -libjars generic Hadoop option. There is no need to compile a custom streaming jar or anything crazy (I found worse suggestions on StackOverflow while figuring this out).

Using this newfound wisdom, it’s pretty trivial to add the output format to the existing streaming job. The next version of the job is going to look something like:

hadoop jar /path/to/hadoop-streaming.jar \
        -libjars /path/to/custom-formats.jar \
        -input log_data/ \
        -output geo_output/ \
        -outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -cacheFile ip_geo_mapping.dat#geo.dat  

Pay Attention to the Details

If you write an output format like the one above and try to run a job, you’ll notice that some output records disappear. The overly-simple explanation is that Hadoop ends up opening the file for a specific date once for every reducer that date appears in, clobbering the data that was there before. Fortunately, it’s also easy to tell Hadoop how to send all the data from a date to the same reducer, so each file is opened exactly once. There isn’t even any Java involved.

All it takes is specifying the right partitioner class for the job on the command line. This partitioner is configured just like unix cut, so Data Scientists should have an easy time figuring out how to use it. To keep data from disappearing, tell the partitioner that the first field of the comma-delimited output record is the value to partition on.

With those options included, the final streaming job ends up looking like:

hadoop jar /path/to/hadoop-streaming.jar \
        -libjars /path/to/custom-formats.jar \
        -D map.output.key.field.separator=, \
        -D mapred.text.key.partitioner.options=-k1,1 \
        -input log_data/ \
        -output geo_output/ \
        -outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
        -cacheFile ip_geo_mapping.dat#geo.dat  

On the next run all the data should appear again, and the output directory should contain a file per day there is output. It’s not hard to take this example and get a little more complicated – it’d take minimal changes to make this job output to a file per state, for example. Not bad for a dozen lines of code and some command line flags.

Using Tools You Already Have

I was reading FlowingData (a great data science blog!) a little while ago and came across a post on why programming is a good skill for data scientists to have. My first reaction was, “well, duh” – while I don’t expect everyone in the data science business to be a machine learning whiz or re-writing the linux kernel for fun, I would have thought that most people reading a the blog had some kind of computer science background. Judging from the comments on that post, it looks like my assumption was quite wrong – Nathan definitely has a fair number of readers who really appreciated that post.

Nathan is good enough to provide aspiring readers with a list of essential tools, and a list of good starting points. Both lists are quite good, covering everything from Python to Processing, but there’s a glaring omission: shell scripting. So, in the spirit of teaching, I thought I’d share a little bit about why every data scientist should know at least a bit about bash scripting.

They’re Everywhere

The tools built in to every flavor of *nix (check out this IEEE standard) cover most of what you need to do to manipulate, manhandle, and munge data-sets. There are tools for selecting columns, sorting, de-duping, counting, pattern matching and text manipulation, joins, and more. In order, that translates into:

  • cut
  • sort
  • uniq
  • wc
  • grep
  • sed and awk
  • join

I use all these nearly every day. The best part is, once you know they exist, these tools are available on every unix machine you will ever use. Nothing else (except maybe perl) is as universal – you don’t have to worry about versions or anything. Being comfortable with these tools means you can get work done anywhere – any EC2 instance you boot up will have them, as will any unix server you ssh into.

They’re Efficient

One of the first lessons I learned as a programmer is that there is absolutely no way I can sort data using a script faster than I could do it with sort. With a small data-set, it’ll take you longer to write print statements than it will for sort to finish, and with large data sets, I’m just glad someone else wrote N-way external merge-sort for me.

Similarly, the other commands are highly optimized, and the code has been around for years, touched by many great hands (it’s fun reading a man page and seeing “Written by Richard Stallman” at the bottom), and used by thousands and thousands of people. So, there probably aren’t that many obvious bugs left.

If you want to be a geek about it (and we do), they’re also all, with the obvious exception of sort, one-pass algorithms and O(N) or better with low memory usage. What’s not to love?

They’re easy to work with

Getting used to having things already done for you also makes a data-centric workflow more efficient. The first step of almost any data project is figuring out the idiosyncrasies of a data set. Knowing shell utilities lets you very easily get a first impression of a data set, and often gets you most of the way through the process of cleaning it up.

As an example, I can quickly get sense of how frequently users appear in our logs – let’s say the top 10 users – by just using a few commands chained together.

cut -d, -f1 data.csv | sort | uniq -c | sort -r | head

Running the same command again with tail instead of head gives the bottom 10 users by frequency, and with another round of cut I can get the whole frequency distribution of users in the log, all without writing a single line of code.

Once you end up doing this more than once, it’s easy to save a nice little script that you can easily re-run.

#! /bin/bash

if [ -z "$1" ]; then
  echo "usage: top_users.sh input_file"
  exit
fi

cut -d, -f1 $1 | sort | uniq -c | sort -r

EOF and Disclaimer

I’ve barely scratched the surface here, and already you can get a good sense of how easy and powerful a lot of these tools are. If you’re interested in picking them up, open a terminal and use man page, wikipedia, or your favorite search engine to find out more. There are good bash scripting guides scattered around the web, once you get interested in putting all of this together. The sky is the limit, really.

My Love/Hate Relationship with Hadoop

Hadoop

A few months ago, the need for some log file analysis popped up. As the junior Data Scientist, I had the genuine pleasure of waking up one morning to an e-mail from Matt and Rob letting me know that I was expected to be playing with terabytes of data as soon as possible. Exciting, to say the least.

The project seemed like a perfect fit for Hadoop specifically Amazon’s Elastic MapReduce (EMR). So, I grabbed the company card, signed up, and dove right in. It’s been quite a learning experience.

After a few months learning the particulars of Amazon’s flavor of cloud computing and Hadoop’s take on distributed computing, I’ve developed a relationship with Hadoop as complicated as any MapReduce job – I’ve learned to love and loathe it at the same time.

The Good

EMR is incredibly easy to interface with, despite some of Amazon’s tools being less-than stellar (I’m looking at you, Ruby CLI). The third-party APIs tend to be excellent. We’ve been using boto heavily.

Hadoop Streaming jobs are, like most everyone else on the internet will tell you, awesome for rapid prototyping and development. The rest of the Science team and I are not super concerned with speed for most of what we do in Hadoop, so we’re perfect users for Streaming jobs. We iterate on our models constantly, and Streaming makes it possible to easily test their behavior over whatever data we please.

The ability to include HIVE in an EMR workflow is yet another awesome bonus. It’s incredibly easy to boot up a cluster, install HIVE, and be doing simple SQL analytics in no time flat. External tables even make the data loading step a breeze.

The Bad

While Hadoop and EMR have let us do some very cool things that wouldn’t be possible otherwise, we’ve had some problems too.

I’ve blown up NameNodes, run into the S3 file size limit, and hit what feels like every pain point in-between while formatting and compressing our data. I’ve crashed every JVM that Hadoop has to offer, broken the HIVE query planner, and had Streaming jobs run out of memory both because they were badly designed, and because I didn’t tweak the right settings. In short, after just a few months, with what I would consider some fairly simple, standard use cases, I’ve run into every “standard” Hadoop problem, along with what feels like more than my fair share of non-standard problems.

While it should be no surprise to anyone that a lone data-scientist can wreak havoc on any piece of software, there was a certain flavor to an unsettling large amount of these crises that really started to bother me.

After running into the dfs.datanode.max.xcievers property problem mentioned in the post above, I put my finger on both what makes a problem quintessentially Hadoop-y and why a Hadoop problem isn’t a good one to have.

The Ugly

To fix any problem, you have to know about the problem. To know about a problem, you must have read the documentation or broken something enough times to start to pinpoint it.

Reading the documentation isn’t an option for learning about dfs.datanode.max.xcievers. It’s badly documented, there’s no default anywhere and it’s misspelled (i before e except after c). But once you know what’s going on it’s an easy fix to change a cluster’s configuration.

What’s so bad about a Hadoop problem is that causing enough issues to figure out a cause takes a large amount of time, in what I find to be the most disruptive way possible. It doesn’t take a large number of tries, or any particularly intelligent debugging effort, just a lot of sitting and waiting to see if you missed a configuration property or set one incorrectly. It doesn’t seem so bad at first, but since these problems often manifest only in extremely large data-sets, each iteration can take a significant amount of time, and you can be quite a ways through a job before they appear. Investigative work in such a stop and go pattern, mixed with the worst kind of system administration, is killing me. I don’t want to stop working in the middle of a cool thought because I had to adjust a value in an XML document from 1024 to 4096.

Never mind the hardware requirements Hadoop presents, or issues with HDFS or any of the legitimate, low level complaints people like Dale have. I don’t like working on Hadoop because you have to keep so much about Hadoop in the back of your mind for such little, rare gains. It’s almost as bad as having a small child (perhaps a baby elephant?) on my desk.

What’s Next

The easy solution is to insulate me, the analyst, from the engineering. We could throw cash at the problem and dedicate an engineer or three to keeping a cluster operable. We could build a cluster in our data center. But this isn’t practical for any small company, especially when the projects don’t require you to keep a cluster running 24/7. Not only could the company not afford it, but it would be a waste of time and money.

The hard solution is coming up with something better. The whole team at AK believes that there is a better way, that working with big data can still be agile.

If possible, I should be able to access a data-set quickly and cleanly. The size and complexity of the tools that enable me to work with big data should be minimized. The barrier to entry should be low. While there are projects and companies that are trying to make using Hadoop easier and easier, I think the fundamental problem is with the one-very-large-framework-fits-all approach to big data. While Hadoop, and batch processing in general, has it’s time and place, there’s no reason I should need an elephantine framework to count anything, or find the mean of a list of numbers.

The rest of AK seems to agree. We all think the solution has to incorporate batch processing, somehow, but still embrace clever ways to navigate a large, constantly flowing data set. The crazy people here even think that our solution can be reliable enough that a Data Scientist can’t be too smart (or just incompetent enough) to break it.

The Effects of Ad Campaign Gravity

There exists an interesting and not immediately obvious relationship between the volume (impressions, unique users) of a campaign and its performance.  This generally occurs due to inaccurate measures of attribution.  The concept is simple once you think about it.  Basically, advertisers tend to blast ad’s all over the internet.  The signal to noise ratio is extremely low for display ads (<1% CTR, <0.01% direct conversions), making optimization difficult in many cases.  The ad space has devised various methods to increase the signal in their campaigns.  A few of these concepts are “view-throughs”, attribution windows, “hover actions” and many others.  As a data scientist you realize that while there is very definitely some type of branding effect in advertising, it is unfair to attribute every downstream purchase to “branding”.  While there is nothing a-priori wrong with adding in these types of metrics, it is their interpretation that is tricky. One of the first things you need to correct for in these scenarios is what I dub “campaign gravity”.  Larger campaigns receive more credit (and thus higher performance) merely due to their size and not their effectiveness.

Let’s run through a thought experiment and see what this looks like:  Suppose I’m running my e-commerce site, awesomewidgets.com.  For some reason I decide to run a very large ad campaign of just public service announcements (PSA’s) that have no reference to my awesome site.  At the same time I launch another ad campaign which is 10 times smaller with similar PSA’s.  I would expect the performance of these two campaigns to be identical.  After all, I’m not advertising my site and there should be no reason these campaigns would drive any incremental lift to my sales.  But, when I run the numbers I see that the larger campaign is “outperforming” the smaller one by about 10%.  Let’s take a look at why.

From the Universe of all users (or cookies, however you want to think about it) on the internet there are a few people that converted on my awesomewidgets.com.

I launch both of my PSA ad campaigns and I have the following situation:

What I’m really interested in is the intersection of all 3 circles.  Users in this space have seen ads from both campaigns and converted.  In a last touch attribution model, the way these campaigns are “credited” is via a race condition to see which campaign they saw last.  A user’s history in this case might look like: A,A,B,A,A,B,A,A.  The question is how often is campaign A versus campaign B last in a user’s chain?  The answer involves the ratio of volumes for each campaign.

Enough with the pretty pictures, let’s do the math:

  • Cost of campaign A: $1000
  • Cost of campaign B: $100

Suppose that in isolation they both have the same “CPA” of $1.00 (1000 actions on A and 100 on B).  Note that this CPA is basically fictitious since I’m not driving any actions to my site.  This is the latent baseline CPA of any campaign. However, when I look at them together they have different measured performance.

  • For every converter that sees ads from both campaigns, A “steals” about 90% (actually, 1-1/11) of the attribution away from campaign B
  • If 10% of converters have seen ads from both campaigns, A “steals” 90% of this 10% overlap which is 9%

What this means is that 9% of B‘s conversions are now being credited to A and B will appear to be performing worse. So, of the 100 actions that in isolation are attributed to B, 9 of them will now go to A and campaign B‘s CPA will go up.  The CPA for campaign B will now look like $100.00/91 actions ~= $1.10.  A 10% reduction in measured “performance”!

What does this mean?  If you are comparing performance across multiple campaigns of very different sizes using a last touch attribution model then bigger always wins.  This can make analysis tricky on the backend to say the least.  These types of gotchas abound in all complex systems and care must always be taken when doing analytics.

There is another reason larger campaigns will appear better and it has to do with improper sampling by smaller campaigns.  Look for a future post on that topic.

Hacking is Indeed a Data Science Skill

Hacking
Recently I ran into a task that required me to manipulate a bunch of disparate log level data. Ahh, the tedium of data mining! There were a few particularly annoying things about this task.

 

  1. The data was split into 3 distinct files. Impression logs, activity logs, and user-attribute logs.
  2. The user-attribute logs were stored sequentially. It looked like:

    request_id, date, user_id, attribute_id

    where there were a varying number of lines for each request_id, representing how many attributes we may have been handed at that time for that user.

  3. The data was reasonably large (~500GB)

What I wanted after this was a flattened, normalized data set to use for various modeling tasks. The output format needed to be:

request_id, date, user_id, activity_id, { attribute_ids }

The first approach I thought of was to get the entire set of unique attributes from the file using something like “cat | cut | sort -u”to create a database table and generate a bunch of inserts. This was dumb and obviously this gets annoying very quickly. Not to mention that my final data set would be a few 100GB and my research instance of Postgres would get real annoyed.

How about Hadoop?  While this isn’t a terrible answer, there are a few problems. Mainly, I’m under a deadline and getting 500GB to the cluster would take too long. What I really want is some Unix-foo that i can kick off and forget about. It feels like there is some “cut | join | awk” solution. These are times when i wish I had better Unix skills.  Maybe emacs has a function that does this and brings you lunch (c-x-lunch)?

So, what did I do?  Well, many definitions of data science include the technical skill of “hacking” as a necessary ingredient. One of the finer points of “hacking” has to be social engineering. It’s way easier to get the president of the bank drunk and have him tell you the combination to his lock than it is to crack the safe. So, along these lines i came up with a plan. Most engineers pride themselves on being extremely smart (and most are) and love challenges. This can also get them into trouble though. Next time you walk into an engineering meeting, ask an engineer what sorting algorithm Java uses and if it’s the right choice. One hour wasted!

Our CTO, Rob G., happens to be a brilliant engineer, so I called him up and casually brought up this annoying formatting problem I was having. He immediately started brainstorming solutions and he ended up talking himself into Java as the fastest way (wall clock) that he could get this done.  Fortunately, I’m not really a Java guy. So after Rob convinced himself that his solution was best, he also ended up talking himself into writing all the code. Awesome! Now, my annoying data task was “executing” and I could go back to work on more important things. This entire conversation took about 10 minutes. Much faster than Googling around for Unix foo. The next morning, my data set was all organized and sitting on one of our servers.  Hacking is indeed a useful data science skill!

I guess the moral here is twofold. 1) Sometimes asking for help (and figuring out ways to get it!) really is the best solution, and 2) distributing workloads across your team makes everybody work faster.

P.S. Obviously wasting the CTO’s time is never a good idea. Luckily, Rob is a champion of scheduling and apparently he had a few extra cycles, so no harm done to the greater good.