Wednesday, April 23, 2014

LocalCache in hadoop MRv2 aka YARN

The old DistributedCache is deprecated in the new API of hadoop 2.2.3. Now the sugguested method is Job.addCacheFile

By default, the cached file was add to a special folder on each slave node.

Assume we have two slave nodes, namely "n1" and "n2". In the configuration file of "yarn-site.xml" which is under /HADOOP_PATH/etc/hadoop/, you will find a property
like this:


Here "/home/hadoop/localdirs" is the home path for all cached files. 

>ls /home/hadoop/localdirs
filecache  nmPrivate  usercache

If we add cache files per application, the cache files will be put under "usercache".

Job job = Job.getInstance(conf, "HELLO");
FileStatus[] fileStatus = fs.listStatus(new Path("/user/hadoop/data/"));
for (FileStatus f : fileStatus)
job.addCacheFile(new URI(f.getPath().toUri().toString() + "#_" + f.getPath().getName()));

Now switch to machine n1 or n2, have a look at the local cache directory you will find it somewhere:

>find /hadoop/localdirs/ -name "hg19.fa"


To use the cache files in the MR application is interesting. Our Mapper will looks like this

public static class MyMapper extends Mapper

                private ListcacheFiles;
protected void setup(Context context) throws IOException
URI[] uris = context.getCacheFiles();
for (URI u : uris)
System.out.println("CACHED:" + u.getPath());
Path[] uris = context.getLocalCacheFiles();
for (Path u : uris)
System.out.println("CACHED:" + u.toString());


//static Log LOG = LogFactory.getLog(MyMapper.class);
public void map(Text keySampleName, Text valueSplitFileName, Context context) throws IOException, InterruptedException
                        //use the cachedFiles here.
//context.write(keySampleName, valueSplitFileName);


The new API to retrieve cached files is  "context.getCacheFiles()" while the "context.getLocalCacheFiles()" is deprecated.  However, context.getCacheFiles() returns a URI array which each element is a HDFS path(hdfs://master:50011/user/hadoop/...),
while context.getLocalCacheFiles() returns a Path array which each element is a local path (//hadoop/localdirs/usercache/...)

Besides, each container in every slave node will have a copy of all cached files. Which means in slave node "n1", if "n1" has 10 containers, then you get 10 copies of cached files in n1.