Hadoop MapReduce Streaming输出与本地运行MapReduce的输出不同 [英] Hadoop MapReduce Streaming output different from the output of running MapReduce locally

查看:125
本文介绍了Hadoop MapReduce Streaming输出与本地运行MapReduce的输出不同的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在运行一个用python编写的简单mapreduce作业,并且我注意到,当我在本地测试该脚本时,然后在hadoop上运行该作业时会得到一个不同的输出. 我的输入是一种类型:

I am running a simple mapreduce job written in python and I noticed that when I test the script locally, i obtain a different out then when I run the job on hadoop. My input is something of a kind:

key1        val1
key1        val2
key1        val3
key1        val4
key2        val1
key2        val3
key2        val5
key3        val5
key4        val4

我的映射器创建了一个值字典,其中包含其对应的键列表(字符串)(例如val1 key1,key2; val2 key1; val3 key1,key2 ....).然后,对于字典中的每个值,我将打印所有可能的密钥对.因此,我的映射器的输出如下:

My mapper creates a dictionary of values with their corresponding list (string) of keys (e.g. val1 key1,key2 ; val2 key1 ; val3 key1,key2 ....). Then for each value in the dictionary I print all the possible key pairs. So the output of my mapper looks like:

key1_key2   1   # obtained from val1
key1_key2   1   # obtained from val3
key1_key4   1   # obtained from val4
key2_key3   1   # obtained from val5

reducer计算相同密钥对的数量并打印计数. 我的映射器代码是:

The reducer counts the number of identical key pairs and prints the count. My mapper code is:

val_dic = dict()
def print_dic(dic):
    for val, key_array in dic.iteritems():
        key_pair= ""
        i=0
        j=1
        for i in range(len(key_array)-1):
            for j in range(i+1,len(key_array)):
                key_pair = key_array[i]+"_"+key_array[j]
                print "{0}\t{1}".format(key_pair,"1")
for line in sys.stdin:  
    key, val = line.strip().split("\t")
    if (not val in val_dic):
        val_dic[val]=[]
    val_dic[val].append(key) 
print_dic(val_dic)

Reducer正在计算所有相同的值:

The reducer is counting all the identical values:

   current_pair = None
    current_count = 0 
    for line in sys.stdin:
    key_pair, count = line.strip().split("\t")
    count = int(count)
        if current_pair == key_pair:
            current_count += count
        else:
            print "{0}\t{1}".format(current_pair,str(current_count))
            current_pair = key_pair
            current_count = count
    print "{0}\t{1}".format(current_pair,str(current_count))

但是,当我在更大的数据集的hadoop上运行它时,似乎丢失了一半的结果. 当我使用以下命令在本地计算机上进行测试时 猫输入| mapper.py |排序| reducer.py>外地 如果输入的理由很小,则可以正常工作,但是在较大的数据集(例如1M条目)上,本地输出文件的条目几乎是在hadoop上运行mapreduce作业获得的条目的两倍. 代码中有错误吗?还是我错过了什么? 任何帮助都将受到高度赞赏.

However when I run it on hadoop on a larger dataset it seems that half the results are missing. When I test it on the local machine using cat input | mapper.py | sort |reducer.py > out-local If the input is reasonalbe small,it works fine, but on bigger data sets (e.g. 1M entries), the local output file has almost twice as many entries than the one obtained from running the mapreduce job on hadoop. Is there an error in the code? or am I missing something? Any help is highly appreciated.

推荐答案

您的映射器会生成给定值的所有成对键 的成对组合.

Your mapper generates all pairwise combinations of keys that it sees for a given value.

map-reduce的模型是,映射器以令人尴尬地平行的方式处理,输入的每个记录,并发出键值对.它将记录映射到键值对.实际上,典型的本机(Java)映射器一次只能查看"一条记录,因此永远无法像流式映射器那样操作.

The model of map-reduce is that the mapper processes, in an embarrassingly parallel fashion, each record of the inputs, and emits key-value pairs. It maps records to key-value pairs. Indeed, a typical native (Java) mapper can only "see" one record at a time, so could never operate the way your streaming mapper does.

在流式api中,您可以有点作弊"并处理整个

In the streaming api, you can sort of "cheat" a bit and process the entire input split at once - for the entire chunk of the file given to you, you can process all of the input records in that chunk, and so it's possible to do some other operations than just map individual key-value pairs. But in general you do not have access to the entire input; the input is broken up into splits, and a mapper gets each split. If one split contains the whole input, then you don't have any parallelism in the map phase, and there is no reason to use hadoop at all.

几乎可以肯定的是,您的输入文件被分为两个拆分,现在您的映射器不再能够找到与给定值相对应的所有键对,因为您没有在其中的所有输入记录.你的分裂.因此,例如,考虑将您提供的输入文件大致分为两个,一个包含所有"key1",一个包含其他"key1".一次在所有输入上本地运行map-reduce集会产生您期望的输出:

What is almost certainly happening here is that your input file gets broken up into two splits, and now your mapper no longer can find all key pairs corresponding to a given value, because you don't have all of the input records in your split. So for instance, consider breaking up the input file you've supplied into roughly two, one with all the "key1s", and one with the others. Running your map-reduce set locally on all of the input at once produces the output you'd expect:

$ cat input1 input2 | ./map.py | sort | ./reduce.py 
None    0
key1_key2   2
key1_key4   1
key2_key3   1

但是hadoop的工作流程是,每个输入都有不同的映射器,并且它们仅在shuffle/reduce阶段组合在一起:

But the hadoop workflow is that different mappers get each input, and they're only combined in the shuffle/reduce phase:

$ cat input1 | ./map.py > output1
$ cat input2 | ./map.py > output2
$ cat output1 output2 | sort | ./reduce.py 
None    0
key2_key3   1

所以现在您缺少结果.这是不可避免的,因为在任何情况下使用hadoop都是有意义的,没有任何一个映射器会看到所有数据.

so now you're missing results. This is inevitable, because in any case that would make sense to use hadoop for, no individual mapper is going to see all of the data.

您需要重构事物,以便映射仅发出(值,键)对,然后约简器将所有键收集在一起以获得给定值,然后生成带有计数的所有键对.然后,另一个map-reduce步骤将必须进行计数.

You'll need to refactor things, so that the map simply emits (value, key) pairs, and then the reducer gathers all of the keys together for a given value and then generates all key pairs with a count. Then another map-reduce step will have to do the count.

因此,您将拥有一个map1.py和一个reduce1.py:

So you'd have a map1.py and a reduce1.py:

#!/usr/bin/env python 
# map1.py

import sys

for line in sys.stdin:  
    line = line.strip()
    key, val = line.strip().split("\t")
    print val, "\t", key

#!/usr/bin/env python
# reduce1.py

import sys

def emit_keypairs(keylist):
    for i in range(len(keylist)-1):
        for j in range(i+1,len(keylist)):
            key_pair = keylist[i]+"_"+keylist[j]
            print "{0}\t{1}".format(key_pair,"1")

current_word = None
current_keylist = []

for line in sys.stdin:
    line = line.strip()
    word, key = line.split('\t', 1)

    if current_word == word:
        current_keylist.append(key)
    else:
        if current_word:
            emit_keypairs(current_keylist)
        current_word = word
        current_keylist = [key]

# do not forget to output the last word if needed!
if current_word == word:
    emit_keypairs(current_keylist)

运行这些,然后基本上只在输出上运行一个字数统计.这对于分割输入文件将非常有用:

Run those, and then basically just run a wordcount on the output. This will be robust to splitting the input file:

$ cat input1 | ./map1.py > map1
$ cat input2 | ./map1.py > map2
$ cat map1 map2 | sort | ./reduce1.py 

key1_key2   1
key1_key2   1
key1_key4   1
key2_key3   1

,然后另一个具有单词计数的map-reduce阶段将产生预期的结果.

and then another map-reduce phase with wordcount will produce the expected results.

这篇关于Hadoop MapReduce Streaming输出与本地运行MapReduce的输出不同的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆