是否有可能在一个节点上运行多个映射器 [英] Is it possible to run multiple mappers on one node

查看:96
本文介绍了是否有可能在一个节点上运行多个映射器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有KMeans的代码,我的任务是计算加速,我已经通过在uni的集群中的不同数量的节点上运行它来完成它。但有可能改变mappers和/或reducers的数量,以便我可以在单节点上运行时检查加速中的变化。

当使用谷歌搜索时,我发现通过使用 conf.setNumReduceTasks(2); 我可以改变reducer的数量。但我没有看到我的输出有任何改变。 (我的输出是以毫秒为单位的时间)。



我使用的代码来自github: https://github.com/himank/K-Means/blob/master/src/KMeans.java
虽然我根据自己的要求做了一些修改,但主要功能是一样的。



以下是main函数的样子:

  public static void main(String [] args)throws Exception {
long startTime = System.currentTimeMillis();
IN = args [0];
OUT = args [1];
String input = IN;
字符串输出= OUT + System.nanoTime();
字符串again_input =输出;
int iteration = 0;
boolean isdone = false;
while(isdone == false){
JobConf conf = new JobConf(KMeans.class);
if(iteration == 0){
Path hdfsPath = new Path(input + CENTROID_FILE_NAME);
DistributedCache.addCacheFile(hdfsPath.toUri(),conf);
} else {
路径hdfsPath =新路径(again_input + OUTPUT_FILE_NAME);
DistributedCache.addCacheFile(hdfsPath.toUri(),conf);
}
conf.setJobName(JOB_NAME);
//conf.setNumReduceTasks(2);
conf.setMapOutputKeyClass(DoubleWritable.class);
conf.setMapOutputValueClass(DoubleWritable.class);
conf.setOutputKeyClass(DoubleWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setNumMapTasks(4);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,new Path(input + DATA_FILE_NAME));
FileOutputFormat.setOutputPath(conf,new Path(output));
JobClient.runJob(conf);
Path ofile =新路径(输出+ OUTPUT_FILE_NAME);

配置配置=新配置();
FileSystem fs = FileSystem.get(新的URI(hdfs://127.0.0.1:9000),配置);
路径filePath =新路径(输出+ OUTPUT_FILE_NAME);
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(filePath)));
列表< Double> centers_next = new ArrayList< Double>();
String line = br.readLine();
while(line!= null){
String [] sp = line.split(\t |);
double c = Double.parseDouble(sp [0]);
centers_next.add(c);
line = br.readLine();
}
br.close();
String prev;
if(iteration == 0){
prev = input + CENTROID_FILE_NAME;
} else {
prev = again_input + OUTPUT_FILE_NAME;
}
Path prevfile = new Path(prev);
FileSystem fs1 = FileSystem.get(新的URI(hdfs://127.0.0.1:9000),配置);
BufferedReader br1 = new BufferedReader(new InputStreamReader(fs1.open(prevfile)));
列表< Double> centers_prev = new ArrayList< Double>();
String l = br1.readLine();
while(l!= null){
String [] sp1 = l.split(SPLITTER);
double d = Double.parseDouble(sp1 [0]);
centers_prev.add(d);
l = br1.readLine();
}
br1.close();
Collections.sort(centers_next);
Collections.sort(centers_prev);
Iterator< Double> it = centers_prev.iterator();
for(double d:centers_next){
double temp = it.next();
if(Math.abs(temp-d)<= 0.1){
isdone = true;
} else {
isdone = false;
休息;
}
}
++迭代;
again_input = output;
output = OUT + System.nanoTime();
}
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
System.out.println(totalTime);
}

PS。我是Hadoop和MapReduce的新手。

解决方案

给定作业的地图数通常由输入分割数在输入文件中,而不是由setNumMapTasks()或mapred.map.tasks参数。为每个输入分割生成一个Map任务。 mapred.map.tasks参数只是InputFormat对地图数量的提示。可以使用setNumMapTasks()手动增加地图任务的数量,它可以用来增加地图任务的数量,但不会将数字设置为低于Hadoop通过分割输入数据所确定的数量。



http://wiki.apache.org/hadoop/HowManyMapsAndReduces

I have the code of KMeans and my task is to calculate the speedup, I've done it by running it on different numbers of nodes in my uni's clusters. But is it possible to change the number of mappers and/or reducers, so that I can check the change in speedup while running it on single node.

While googling, I found that by using conf.setNumReduceTasks(2); I can change the numbers of reducers. but I havn't see any change in my output. (My output is the time in ms).

The code I am using is from github: https://github.com/himank/K-Means/blob/master/src/KMeans.java Although I've made some changes according to my requirement, but the main functionality is the same.

Here is how main function looks like:

    public static void main(String[] args) throws Exception {
    long startTime = System.currentTimeMillis();
    IN = args[0];
    OUT = args[1];
    String input = IN;
    String output = OUT + System.nanoTime();
    String again_input = output;
    int iteration = 0;
    boolean isdone = false;
    while (isdone == false) {
        JobConf conf = new JobConf(KMeans.class);
        if (iteration == 0) {
            Path hdfsPath = new Path(input + CENTROID_FILE_NAME);
            DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
        } else {
            Path hdfsPath = new Path(again_input + OUTPUT_FILE_NAME);
            DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
        }
        conf.setJobName(JOB_NAME);
        //conf.setNumReduceTasks(2);
        conf.setMapOutputKeyClass(DoubleWritable.class);
        conf.setMapOutputValueClass(DoubleWritable.class);
        conf.setOutputKeyClass(DoubleWritable.class);
        conf.setOutputValueClass(Text.class);
        conf.setMapperClass(Map.class);
        conf.setNumMapTasks(4);
        conf.setReducerClass(Reduce.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths(conf, new Path(input + DATA_FILE_NAME));
        FileOutputFormat.setOutputPath(conf, new Path(output));
        JobClient.runJob(conf);
        Path ofile = new Path(output + OUTPUT_FILE_NAME);   

        Configuration configuration = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://127.0.0.1:9000"), configuration);
        Path filePath = new Path(output + OUTPUT_FILE_NAME);
        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(filePath)));
        List<Double> centers_next = new ArrayList<Double>();
        String line = br.readLine();
        while (line != null) {
            String[] sp = line.split("\t| ");
            double c = Double.parseDouble(sp[0]);
            centers_next.add(c);
            line = br.readLine();
        }
        br.close();
        String prev;
        if (iteration == 0) {
            prev = input + CENTROID_FILE_NAME;
        } else {
            prev = again_input + OUTPUT_FILE_NAME;
        }
        Path prevfile = new Path(prev);
        FileSystem fs1 = FileSystem.get(new URI("hdfs://127.0.0.1:9000"), configuration);
        BufferedReader br1 = new BufferedReader(new InputStreamReader(fs1.open(prevfile)));
        List<Double> centers_prev = new ArrayList<Double>();
        String l = br1.readLine();
        while (l != null) {
            String[] sp1 = l.split(SPLITTER);
            double d = Double.parseDouble(sp1[0]);
            centers_prev.add(d);
            l = br1.readLine();
        }
        br1.close();
        Collections.sort(centers_next);
        Collections.sort(centers_prev);
        Iterator<Double> it = centers_prev.iterator();
        for (double d : centers_next) {
            double temp = it.next();
            if (Math.abs(temp - d) <= 0.1) {
                isdone = true;
            } else {
                isdone = false;
                break;
            }
        }
        ++iteration;
        again_input = output;
        output = OUT + System.nanoTime();
    }
    long endTime   = System.currentTimeMillis();
    long totalTime = endTime - startTime;
    System.out.println(totalTime);
}

PS. I am new to Hadoop and MapReduce.

解决方案

The number of maps for a given job is usually driven by the number of input splits in the input files and not by the setNumMapTasks() or mapred.map.tasks parameter. a Map task is spawned for each input split. the mapred.map.tasks parameter is just a hint to the InputFormat for the number of maps. the number of map tasks can be increased manually using setNumMapTasks(), It can be used to increase the number of map tasks, but will not set the number below that which Hadoop determines via splitting the input data.

http://wiki.apache.org/hadoop/HowManyMapsAndReduces

这篇关于是否有可能在一个节点上运行多个映射器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆