Linux上的MapReduce Hadoop-输入有多个数据 [英] MapReduce Hadoop on Linux - Multiple data on input

查看:65
本文介绍了Linux上的MapReduce Hadoop-输入有多个数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Virtual Box和Hadoop版本3.2.1上使用Ubuntu 20.10(如果您需要更多信息,请给我评论).
这时的输出给了我这个:

I am using Ubuntu 20.10 on Virtual Box and Hadoop version 3.2.1 (if you need any more info comment me).
My output at this moment gives me this :

Aaron Wells Peirsol ,M,17,United States,Swimming,2000 Summer,0,1,0
Aaron Wells Peirsol ,M,21,United States,Swimming,2004 Summer,1,0,0
Aaron Wells Peirsol ,M,25,United States,Swimming,2008 Summer,0,1,0
Aaron Wells Peirsol ,M,25,United States,Swimming,2008 Summer,1,0,0

对于以上输出,我希望能够总结他所有的奖牌
(字符串末尾的三个数字分别代表金,银,青铜
参与者多年来在奥运会上赢得的奖牌.

For the above output I would like to be able to sum all of his medals
(the three numbers at the end of the string represent the gold,silver,bronze
medal the participant has won over the years on Olympic Games).

该项目没有规定年龄(17,21,25,25)
或何时发生(2000、2004、2008、2008年夏季),但我必须添加奖牌
以便能够根据获得最多金牌等的参与者对它们进行排序.

The project had no specification on which age (17,21,25,25)
or when it happend (2000,2004,2008,2008 Summer), but i have to add the medals
in order to be able to sort them by the participant that has won the most gold medals etc.

有什么想法吗?如果您需要我可以为您提供代码,但是我需要另一个MapReduce,我想它将使用我在上方导入的给定输入,并为我们提供类似的东西:

Any ideas? If you need I can provide you my code but I need another one MapReduce I guess that will use the given input I imported above and give us something like :

Aaron Wells Peirsol,M,25,United States,Swimming,2008 Summer,2,2,0

如果我们有办法删除"\ t"减少产量也将是非常有益的!

If we have a way to remove "\t" from the reduce output it would be very beneficial too!

谢谢大家,Gyftonikolos Nikolaos.

Thank you all for your time, Gyftonikolos Nikolaos.

推荐答案

尽管乍一看似乎有些棘手,但这是WordCount示例的另一种情况,仅此一次需要组合键和值才能产生作用从映射器到化简器中的数据以 key-value 对的形式出现.

Although it might seem a bit tricky at first, this is yet another case of the WordCount example, only this time composite key and values are needed in order to fuel the data from the mapper into the reducer in the form key-value pairs.

对于映射器,我们需要从输入文件的每一行中提取所有信息,并将列中的数据划分为两个类别":

For the mapper, we need to extract all info from each line of the input file and divide the data in the columns into two "categories":

  • 每个运动员的
  • 的主要信息始终相同
  • 逐行更改的统计信息,需要对其进行编辑

对于每个运动员的台词,我们都知道,永远不变的栏是运动员的姓名,性别,国家和运动.通过使用字符作为每种数据类型之间的分隔符,所有这些都将被视为 key .其余的列数据将放在 key-value 对的值侧,但是我们也需要在它们上使用定界符,以便首先区分各个年龄段的奖牌计数器和奥运会的一年.我们将使用:

For each athlete's lines, we understand that the columns that never change are the athlete's name, sex, country, and sport. All these are going to be considered a key by using the , character as a delimiter between each type of data. The rest of the column data are going to be put in the value side of the key-value pairs, but we need to use delimiters on them too, in order to firstly differentiate the medal counters from each age and olympic games year. We are going to use:

  • @ 字符作为年龄和年份之间的分隔符,
  • #字符作为奖牌计数器之间的分隔符,
  • _ 字符作为两者之间的分隔符
  • the @ character as a delimiter between the age and year,
  • the # character as a delimiter between the medal counters,
  • and the _ character as a delimiter between those two

Reduce 功能上,我们要做的实际上是对奖牌进行计数,以找到奖牌总数,并找到每位运动员的最新年龄和年份.

At the Reduce function all that we have to do is actually count the medals to find their total and find the latest age and year of each athlete.

为了使 not 在MapReduce作业的输出的键和值之间没有制表符,我们可以简单地将 NULL 设置为的键由reducer生成的key-value 对,并使用字符作为定界符,将计算出的所有数据放在每个对的值处.

In order to not have a tab character between the keys and values at the output of the MapReduce job we can simply set NULL as the key of the key-value pair generated by the reducer and put all the data that were compute at the value of each pair, using the , character as a delimiter.

此作业的代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;


public class Medals 
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <(name,sex,country,sport), (age@year_gold#silver#bronze)>
     */
    public static class Map extends Mapper<Object, Text, Text, Text> 
    {
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
        {
            String record = value.toString();
            String[] columns = record.split(",");

            // extract athlete's main info
            String name = columns[0];
            String sex = columns[1];
            String country = columns[3];
            String sport = columns[4];

            // extract athlete's stat info
            String age = columns[2];
            String year = columns[5]; 
            String gold = columns[6];
            String silver = columns[7];
            String bronze = columns[8];

            // set the main info as key and the stat info as value
            context.write(new Text(name + "," + sex + "," + country + "," + sport), new Text(age + "@" + year + "_" +  gold + "#" + silver + "#" + bronze));
        }
    }

    /* input:  <(name,sex,country,sport), (age@year_gold#silver#bronze)>
     * output: <(NULL, (name,sex,age,country,sport,year,golds,silvers,bronzes)>
     */
    public static class Reduce extends Reducer<Text, Text, NullWritable, Text>
    {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        {
            // extract athlete's main info
            String[] athlete_info = key.toString().split(",");
            String name = athlete_info[0];
            String sex = athlete_info[1];
            String country = athlete_info[2];
            String sport = athlete_info[3];

            int latest_age = 0;
            String latest_games = "";
            
            int gold_cnt = 0;
            int silver_cnt = 0;
            int bronze_cnt = 0;

            // for a single athlete, compute their stats...
            for(Text value : values)
            {
                String[] split_value = value.toString().split("_");
                String[] age_and_year = split_value[0].split("@");
                String[] medals = split_value[1].split("#");

                // find the last age and games the athlete has stats in the input file
                if(Integer.parseInt(age_and_year[0]) > latest_age)
                {
                    latest_age = Integer.parseInt(age_and_year[0]);
                    latest_games = age_and_year[1];
                }
                
                if(Integer.parseInt(medals[0]) == 1)
                    gold_cnt++;

                if(Integer.parseInt(medals[1]) == 1)
                    silver_cnt++;

                if(Integer.parseInt(medals[2]) == 1)
                    bronze_cnt++;
            }

            context.write(NullWritable.get(), new Text(name + "," + sex + "," + String.valueOf(latest_age) + "," + country + "," + sport + "," + latest_games + "," + String.valueOf(gold_cnt) + "," + String.valueOf(silver_cnt) + "," + String.valueOf(bronze_cnt)));
        }
    }


    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("olympic_stats");
        Path output_dir = new Path("medals");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job medals_job = Job.getInstance(conf, "Medals Counter");
        medals_job.setJarByClass(Medals.class);
        medals_job.setMapperClass(Map.class);
        medals_job.setReducerClass(Reduce.class);    
        medals_job.setMapOutputKeyClass(Text.class);
        medals_job.setMapOutputValueClass(Text.class);
        medals_job.setOutputKeyClass(NullWritable.class);
        medals_job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(medals_job, input_dir);
        FileOutputFormat.setOutputPath(medals_job, output_dir);
        medals_job.waitForCompletion(true);
    }
}

当然,结果就是您想要的样子,如下所示:

And of course the result is how you've wanted it to be as seen below:

这篇关于Linux上的MapReduce Hadoop-输入有多个数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆