Wednesday, April 23, 2014

LocalCache in hadoop MRv2 aka YARN

The old DistributedCache is deprecated in the new API of hadoop 2.2.3. Now the sugguested method is Job.addCacheFile

By default, the cached file was add to a special folder on each slave node.

Assume we have two slave nodes, namely "n1" and "n2". In the configuration file of "yarn-site.xml" which is under /HADOOP_PATH/etc/hadoop/, you will find a property
like this:

 yarn.nodemanager.local-dirs
 /home/hadoop/localdirs


Here "/home/hadoop/localdirs" is the home path for all cached files. 

>ls /home/hadoop/localdirs
filecache  nmPrivate  usercache


If we add cache files per application, the cache files will be put under "usercache".





Job job = Job.getInstance(conf, "HELLO");
FileStatus[] fileStatus = fs.listStatus(new Path("/user/hadoop/data/"));
for (FileStatus f : fileStatus)
{
job.addCacheFile(new URI(f.getPath().toUri().toString() + "#_" + f.getPath().getName()));
}



Now switch to machine n1 or n2, have a look at the local cache directory you will find it somewhere:

>find /hadoop/localdirs/ -name "hg19.fa"

/hadoop/localdirs/usercache/hadoop/appcache/application_1398026166795_0007/container_1398026166795_0007_01_000010/hg19.fa

To use the cache files in the MR application is interesting. Our Mapper will looks like this





public static class MyMapper extends Mapper
{

@Override
                private ListcacheFiles;
protected void setup(Context context) throws IOException
{
/*
URI[] uris = context.getCacheFiles();
for (URI u : uris)
{
System.out.println("CACHED:" + u.getPath());
}
*/
Path[] uris = context.getLocalCacheFiles();
for (Path u : uris)
{
System.out.println("CACHED:" + u.toString());

}
}

//static Log LOG = LogFactory.getLog(MyMapper.class);
@Override
public void map(Text keySampleName, Text valueSplitFileName, Context context) throws IOException, InterruptedException
{
                        //use the cachedFiles here.
//context.write(keySampleName, valueSplitFileName);
}

}



The new API to retrieve cached files is  "context.getCacheFiles()" while the "context.getLocalCacheFiles()" is deprecated.  However, context.getCacheFiles() returns a URI array which each element is a HDFS path(hdfs://master:50011/user/hadoop/...),
while context.getLocalCacheFiles() returns a Path array which each element is a local path (//hadoop/localdirs/usercache/...)

Besides, each container in every slave node will have a copy of all cached files. Which means in slave node "n1", if "n1" has 10 containers, then you get 10 copies of cached files in n1.