Custom Input/Output Formats in Hadoop Streaming

Like I’ve mentioned before, working with Hadoop’s documentation is not my favorite thing in the world, so I thought I’d provide a straightforward explanation of one of Hadoop’s coolest features – custom input/output formats in Hadoop streaming jobs.

Use Case

It is common for us to have jobs that get results across a few weeks, or months, and it’s convenient to look at the data by day, week, or month. Sometimes including the date (preferably in a nice standard format) in the output isn’t quite enough, and for whatever reason it’s just more convenient to have each output file correspond to some logical unit.

Suppose we wanted to count unique users in our logs by state, by day. The streaming job probably starts looking something like:

hadoop jar /path/to/hadoop-streaming.jar \
        -input log_data/ \
        -output geo_output/ \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -cacheFile ip_geo_mapping.dat#geo.dat  

And the output from the job might look like:

2011-06-20,CA,1512301
2011-06-21,CA,1541111
2011-06-22,CA,1300001
...
2011-06-20,IL,23244
2011-06-21,IL,23357
2011-0-21,IL,12213
...

This is kind of a pain. If we do this for a month of data and all 50 states appear every day, that’s at least 1500 records – not quite so easy to eyeball. So, let’s ask Hadoop to give us a file per day, named YYYY-MM-DD.csv, that contains all the counts for that day. 30 files containing 50 records each is much more manageable.

Write Some Java

The first step is to write some Java. I know, this is a tutorial about writing Input/Output formats for Hadoop streaming jobs. Unfortunately, there is no way to write a custom output format other than in Java.

The good news is that once you’re set up to develop, both input and output formats tend to take minimal effort to write. Hadoop provides a class just for putting output records into different files based on the content of each record. Since we’re looking to split records based on the first field of each record, let’s subclass it.

public class DateFieldMultipleOutputFormat
    extends MultipleTextOutputFormat<Text, Text> {

    @Override
    protected String generateFileNameForKeyValue(Text key, Text value, String name) {
        String date = key.toString().split(",")[0];
        return date + ".csv";
    }
}

It’s a pretty simple exercise, even if you’ve never written a single line of Java. All the code does is take the first field of the key and use it as the output filename. Honestly, the hardest part is going to be setting up your IDE to work with Hadoop (the second hardest part was finding this blog post).

Use It

The most recent Hadoop documentation I can find, still has documentation on using custom Input/Output formats in Hadoop 0.14. Very helpful.

It turns out life is still easy. When you look at a less-misleading part of the Hadoop Streaming documentation, all the pieces you need are right there. There are flags, -inputformat and -outputformat that let you specify Java classes as your input and output format. They have to be in the Java classpath when Hadoop runs, which is taken care of by the -libjars generic Hadoop option. There is no need to compile a custom streaming jar or anything crazy (I found worse suggestions on StackOverflow while figuring this out).

Using this newfound wisdom, it’s pretty trivial to add the output format to the existing streaming job. The next version of the job is going to look something like:

hadoop jar /path/to/hadoop-streaming.jar \
        -libjars /path/to/custom-formats.jar \
        -input log_data/ \
        -output geo_output/ \
        -outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -cacheFile ip_geo_mapping.dat#geo.dat  

Pay Attention to the Details

If you write an output format like the one above and try to run a job, you’ll notice that some output records disappear. The overly-simple explanation is that Hadoop ends up opening the file for a specific date once for every reducer that date appears in, clobbering the data that was there before. Fortunately, it’s also easy to tell Hadoop how to send all the data from a date to the same reducer, so each file is opened exactly once. There isn’t even any Java involved.

All it takes is specifying the right partitioner class for the job on the command line. This partitioner is configured just like unix cut, so Data Scientists should have an easy time figuring out how to use it. To keep data from disappearing, tell the partitioner that the first field of the comma-delimited output record is the value to partition on.

With those options included, the final streaming job ends up looking like:

hadoop jar /path/to/hadoop-streaming.jar \
        -libjars /path/to/custom-formats.jar \
        -D map.output.key.field.separator=, \
        -D mapred.text.key.partitioner.options=-k1,1 \
        -input log_data/ \
        -output geo_output/ \
        -outputformat net.agkn.example.outputformat.DateFieldMultipleOutputFormat \
        -mapper geo_mapper.py \
        -reducer user_geo_count_reducer.py \
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
        -cacheFile ip_geo_mapping.dat#geo.dat  

On the next run all the data should appear again, and the output directory should contain a file per day there is output. It’s not hard to take this example and get a little more complicated – it’d take minimal changes to make this job output to a file per state, for example. Not bad for a dozen lines of code and some command line flags.

Comments

  1. Facing the issue of class not found.. tried giving entire path along with package name but still does not works..

    • Same problem:

      -outputformat : class not found : analytics/hadoop/outputformat/TestMultipleOutputFormat
      Streaming Job Failed!

  2. blinsay says:

    Sounds like you’re both missing the ‘-libjars’ option.

  3. i’m also having problems with -libjars, no matter what I do I get “-inputreader: class not found: mypackage.NLineRecordReader”

    Here’s the command i’m running:
    hadoop jar ../contrib/streaming/hadoop-streaming-1.0.3.jar -libjars NLineRecordReader.jar -files test_stream.sh -inputreader mypackage.NLineRecordReader -input /Users/hadoop/test/test.txt -output /Users/hadoop/test/output -mapper “test_stream.sh” -reducer NONE

    • It’s been a while since I’ve played with custom formats in Streaming, but according to the 1.0.3 docs, it doesn’t look like things have changed between when I wrote this and now. I can’t help anymore without seeing your code, but I’d suspect that you’ve got some kind of arcane and annoying problem with your jar not being set up the way Hadoop likes it.

      Sorry!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

Join 224 other followers

%d bloggers like this: