hadoop / yarn和非hdfs文件系统上的任务并行化 [英] hadoop/yarn and task parallelization on non-hdfs filesystems

查看:158
本文介绍了hadoop / yarn和非hdfs文件系统上的任务并行化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经实例化了一个Hadoop 2.4.1集群,并且我发现运行MapReduce应用程序将根据输入数据所处的文件系统的不同而并行化。



<使用HDFS,MapReduce作业将产生足够的容器以最大限度地利用所有可用内存。例如,一个具有172GB内存的3节点集群,每个映射任务分配2GB,将创建大约86个应用程序容器。



在不是HDFS的文件系统上(比如NFS或者我的使用案例,一个并行文件系统),MapReduce作业只会分配一部分可用任务(例如,使用相同的3节点集群,大约创建25-40个容器)。由于我使用的是并行文件系统,因此我不太关心如果使用NFS时会遇到的瓶颈。



是否有YARN(yarn- site.xml)或MapReduce(mapred-site.xml)配置,这将使我能够有效地最大限度地利用资源? 解决方案

取决于文件系统

局部性的工作方式是,您必须在Hadoop FileSYstem接口中为给定文件实现 getBlockLocations 。举个例子,你可以看到:



一个示例实现,来自 glusterfs-hadoop文件系统实现,位于这里:
$ (FileStatus文件,long start,long len)throws IOException {
文件f = pathToFile(file.getPath());} $ b pre $ public BlockLocation [] getFileBlockLocations
BlockLocation [] result = null;

result = attr.getPathInfo(f.getPath(),start,len);
if(result == null){
log.info(获取文件的目标主机时出现问题+ f.getPath());
返回null;
}

返回结果;





$ b

上面你可以看到文件的元数据是通过gluster特定的包装提供给其中调用gluster特定命令来确定哪些节点存储文件的实际内容。然后BlockLocation []数组作为提示给作业调度程序,它将尝试将任务分配到本地,以便分组确定其块位置。不过,最终,调度员的工作是处理拆分,而不是块。所以,分割可以小于或大于文件系统块。如果它更大,那么分裂的某个部分将通过网络流式传输的可能性很高。如果它更小,那么你可能会得到更多的地方,但可能是以更多的整体任务为代价。



在优化时,请记住每个输入分割最终都是馈送给映射器的。



在HDFS中,通过在hadoop兼容文件系统中实现更细粒度的阻塞(getBlockLocations),您可以增加块的数量,还有那些块的复制。

增加块数可以使特定块能够在本地上下文中运行的概率更高。

另外,您可以在运行时将输入拆分数(最大值和最小值)作为地图精简作业参数的一部分。通过更新此值,您可以提高性能(即使用机器),但是也可能会降低局部性(更多的分裂意味着,如果某些机器本身更快,mapreduce可能会将一个拆分流式传输到可能抢夺的非本地机器很多任务。)


I've instantiated a Hadoop 2.4.1 cluster and I've found that running MapReduce applications will parallelize differently depending on what kind of filesystem the input data is on.

Using HDFS, a MapReduce job will spawn enough containers to maximize use of all available memory. For example, a 3-node cluster with 172GB of memory with each map task allocating 2GB, about 86 application containers will be created.

On a filesystem that isn't HDFS (like NFS or in my use case, a parallel filesystem), a MapReduce job will only allocate a subset of available tasks (e.g., with the same 3-node cluster, about 25-40 containers are created). Since I'm using a parallel filesystem, I'm not as concerned with the bottlenecks one would find if one were to use NFS.

Is there a YARN (yarn-site.xml) or MapReduce (mapred-site.xml) configuration that will allow me to effectively maximize resource utilization?

解决方案

It depends on the file system.

The way locality will work , is that you must implement getBlockLocations, for a given file, inside of your Hadoop FileSYstem interface. For an example, you can see:

An example implementation, from the glusterfs-hadoop filesystem implementation, is here:

public BlockLocation[] getFileBlockLocations(FileStatus file,long start,long len) throws IOException{
    File f=pathToFile(file.getPath());
    BlockLocation[] result=null;

    result=attr.getPathInfo(f.getPath(), start, len);
    if(result==null){
        log.info("Problem getting destination host for file "+f.getPath());
        return null;
    }

    return result;
}

Above you can see that the metadata for files is provided through gluster specific wrappers to which call gluster specific commands to determine which nodes store the actual contents of a file. The BlockLocation[] array then servers as hints to the job scheduler, it will try to land tasks local to where splits determine that their block locations are.

But ultimately, the schedulers job is to process splits, not blocks. So, splits can be smaller than, or larger than, file system blocks. If its larger, then there is a high likliehood that some portion of the split will be streamed over the network. If its alot smaller, then you might get more locality, but possibly at cost of having more overall # of tasks.

When optimizing, remember that each input split is ultimately what is fed to the mappers.

In HDFS, the defaults tend to be better tuned than other file systems.

By implementing more fine grained blocking (getBlockLocations) in your hadoop compatible file system, you can increase the amount of blocks, and replication of those blocks also.

Increasing # of blocks can have an effect of allowing higher probability that a particular block will be able to run in a local context.

Also, you can toggle # of input splits (maximum and minimum) as part of the mapreduce job parameters at runtime. By updating this value, you might increase performance (i.e. use of machines) but you also might decrease locality (more splits mean that, if some machines are inherently faster, mapreduce could stream a split over to a non-local machine which could snatch up a lot of tasks.)

这篇关于hadoop / yarn和非hdfs文件系统上的任务并行化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆