Wednesday, January 6, 2010

The New Hadoop API 0.20.x

Starting from Hadoop 0.20.x, there has been some change in the existing code base and classes under the package org.apache.hadoop.mapred.* have been deprecated. This post is a brief summary of the major changes and additions to the new Hadoop API.

Prior to Hadoop 0.20.x, a Map class had to extend a MapReduceBase and implement a Mapper as such:

public static class Map extends MapReduceBase implements Mapper {
...
}

and similarly, a map function had to use an OutputCollector and a Reporter object to emit (key,value) pairs and send progress updates to the main program. A typical map function looked like:

public void map(K1, V1, OutputCollector o, Reporter r) throws IOException {
  ...
  output.collect(key,value);
}

With the new Hadoop API, a mapper or reducer has to extend classes from the package org.apache.hadoop.mapreduce.* and there is no need to implement an interface anymore. Here is how a Map class is defined in the new API:

public class MapClass extends Mapper {
  ...
}

and a map function uses Context objects to emit records and send progress updates. A typical map function is now defined as:

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  ...
  context.write(key,value);
}

All of the changes for a Mapper above go the same way for a Reducer.

Another major change has been done in the way a job is configured and controlled. Earlier, a map reduce job was configured through a JobConf object and the job control was done using an instance of JobClient. The main body of a driver class used to look like:

JobConf conf = new JobConf(Driver.class);
conf.setPropertyX(..);
conf.setPropertyY(..);
...
...
...
JobClient.runJob(conf);

In the new Hadoop API, the same functionality is achieved as follows:

Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(Driver.class);
job.setPropertyX(..);
job.setPropertyY(..);
...
...
...
job.waitForCompletion(true);

The current Map Reduce tutorial on the official Hadoop page is written for the old API, so I uploaded a working word count example that uses the new methods and constructs here. Please let me know if you have any comments or suggestions.

16 comments:

  1. Thanks for posting this. I'm having a problem though.

    I used to pass key/value data into the mapper using JobConf.set() and setStrings() and retrieving them in Mapper.configure() with Job.conf.get() and getStrings().

    I have replaced the key/value insertion with Configuration.set.

    Mapper.configure(JobConf) seems to have been replaced with Mapper.setup(Context), so in setup() I call Context.getConfiguration().get().

    ...but the key isn't there! In fact, if I get the configurations iterator and print out all the keys, it is definitely not there, at all.

    Any help appreciated. Thanks.

    ReplyDelete
  2. Keith,

    I think you may be doing something wrong with the order of setting up the parameters. You must set the configuration parameters before creating the Job object. For instance, if you have something like:

    conf.set("parameter1", "value1");
    Job job = new Job(conf);
    conf.set("parameter2", "value2");

    you won't be able to get the value of "parameter2" from the context objects, it'll be null. Does your code have something similar to above?

    ReplyDelete
  3. I'll have to look at that, thanks for the hint.

    ReplyDelete
  4. Hello Soner,
    I have been struggling with the same issue. I have serialized my object into string as follows:

    Configuration conf = new Configuration();
    JobConf conf1 = new JobConf(conf,new.class)
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(baos);
    oos.writeObject(gList);

    String gridListString = new String(baos.toString());
    conf1.set("gridList", gridListString);
    oos.close();

    And now, in the mapper i am deserializing my string into object by overriding configure function as follows:
    public void configure(JobConf job)
    {
    super.configure(job);
    String mygridList = job.get("gridList");

    ByteArrayInputStream bios = new ByteArrayInputStream(getBytes(mygridList));

    ObjectInputStream ois = new ObjectInputStream(bios);

    gList = (GridDetails) (ois.readObject()); }
    ois.close();

    }

    While i am trying to use the gList in the map function, it returns a NullException.

    Could you please help me in this issue. Any suggestions are valuable to me.

    Thanks,
    Sam

    ReplyDelete
  5. Hello Soner,
    I have been struggling with the same issue. I have serialized my object into string as follows:

    Configuration conf = new Configuration();
    JobConf conf1 = new JobConf(conf,new.class)
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(baos);
    oos.writeObject(gList);

    String gridListString = new String(baos.toString());
    conf1.set("gridList", gridListString);
    oos.close();

    And now, in the mapper i am deserializing my string into object by overriding configure function as follows:
    public void configure(JobConf job)
    {
    super.configure(job);
    String mygridList = job.get("gridList");

    ByteArrayInputStream bios = new ByteArrayInputStream(getBytes(mygridList));

    ObjectInputStream ois = new ObjectInputStream(bios);

    gList = (GridDetails) (ois.readObject()); }
    ois.close();

    }

    While i am trying to use the gList in the map function, it returns a NullException.

    Could you please help me in this issue. Any suggestions are valuable to me.

    Thanks,
    Sam

    ReplyDelete
  6. Using the old API, in Mapper.configure() with Job.conf.get() I am able to get the configuration map.input.file (file being processed by the mapper).
    When I tried to print the list of all configurations using the configurations iterator, I dont see this configuration any more.
    Am I missing something?

    ReplyDelete
  7. yep. I had that problem too. need to set the parameter before you instantiate the job. thanks

    ReplyDelete
  8. Configuration conf = new Configuration();
    conf.setInt("num2",Integer.parseInt(args[2]);
    Job job = new Job(conf, "Size");

    I am able to set my variable num2 to a value from arguments.

    But how do I access it from Mapper class.

    public static class MapClass extends Mapper {

    private static int N;
    protected void setup(Context context)
    {
    N= context.getConfiguration().getInt("num2",1);

    }

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

    //I use N here

    }
    }
    }

    public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.setInt("num2",Integer.parseInt(args[3]));
    Job job = new Job(conf, "Size");

    ///////
    }

    ReplyDelete
  9. Job job = new Job(new Configuration());
    Configuration conf = job.getConfiguration();
    conf.set(name, value);

    Will this work ?

    ReplyDelete
  10. Thanks for the post. I have a specific question about the new implementation.

    The class Mapper.Context is a nested class defined in Mapper. So where is this class instantiated and passed as a parameter to the map() method when an application is running? As far as I know, one cannot instantiate a nested class outside of the superclass. Thanks!

    ReplyDelete
  11. FYI: http://stackoverflow.com/questions/7626077/mapreducebase-and-mapper-deprecated

    Thanks!

    ReplyDelete
  12. cool much needed info thanks

    ReplyDelete
  13. What is the replacement for TextInputFormat in the new API?? When I am using it, I get a warning that its deprecated.

    ReplyDelete
  14. This is why many companies are now ordering their API gages from online sources where they get better support because everything can end up being done digitally. This makes it relatively simple for those who are trying to get the best products for the job they do.

    apis jobs

    ReplyDelete