在Python中使用Hadoop Streaming中的文件 [英] Using files in Hadoop Streaming with Python

查看:159
本文介绍了在Python中使用Hadoop Streaming中的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对Hadoop和MapReduce完全陌生,正试图通过它来工作。
我想在Python中开发一个mapreduce应用程序,其中使用了2个.CSV文件中的数据。我只是读取mapper中的两个文件,然后将键值对从文件打印到sys.stdout中。



当我在单个程序中使用它时,程序运行正常机器,但与Hadoop流,我得到一个错误。我认为我在读取Hadoop上的映射器中的文件时犯了一些错误。请帮助我解决代码问题,并告诉我如何在Hadoop Streaming中使用文件处理。 mapper.py代码如下。 (您可以从注释中了解代码):

 #!/ usr / bin / env python 
import sys
from numpy import genfromtxt

def read_input(inVal):
inVal中的行:
#将行分割成单词
yield line.strip( )

def main(separator ='\t'):
#输入来自STDIN(标准输入)
labels = []
data = []
incoming = read_input(sys.stdin)
传入vals:
#将结果写入STDOUT(标准输出);
#这里我们输出的是
#Reduce步骤的输入,即reducer.py的输入

#tab-delimited;
if len(vals)> 10:
data.append(vals)
else:
labels.append(vals)

在范围内(0,len(标签)):
打印%s%s%s\\\
%(labels [i],separator,data [i])


if __name__ ==__main__:
main()

从两个.csv文件输入到此映射器的记录有60000条如下所示(在单机上,不是hadoop集群):

  cat mnist_train_labels.csv mnist_train_data.csv | ./mapper.py 


解决方案

我能解决在搜索解决方案后3天后发布。

问题出在Hadoop的新版本(在我的情况下是2.2.0)。当从文件读取值时,映射器代码在某个时间点给出非零的退出代码(可能是因为它一次读取了大量值(784))。 Hadoop 2.2.0中有一个设置,它告诉Hadoop系统给出一个通用错误(子进程失败,代码为1)。该设置默认设置为True。我只需将此属性的值设置为False,并使代码运行时没有任何错误。



设置为: stream.non.zero。 exit.is.failure 即可。流式传输时将其设置为false。因此,流式命令会有点像:

  ** hadoop jar ... -D stream.non.zero.exit。 is.failure = false ... ** 

希望它可以帮助某人,并节省3天.. 。;)


I am completely new to Hadoop and MapReduce and am trying to work my way through it. I am trying to develop a mapreduce application in python, in which I use data from 2 .CSV files. I am just reading the two files in mapper and then printing the key value pair from the files to the sys.stdout

The program runs fine when I use it on a single machine, but with the Hadoop Streaming, I get an error. I think I am making some mistake in reading files in the mapper on Hadoop. Please help me out with the code, and tell me how to use file-handling in Hadoop Streaming. The mapper.py code is as below. (You can understand the code from the comments):

#!/usr/bin/env python
import sys
from numpy import genfromtxt

def read_input(inVal):
    for line in inVal:
        # split the line into words
        yield line.strip()

def main(separator='\t'):
    # input comes from STDIN (standard input)
    labels=[]
    data=[]    
    incoming = read_input(sys.stdin)
    for vals in incoming:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited;
        if len(vals) > 10:
            data.append(vals)
        else:
            labels.append(vals)

    for i in range(0,len(labels)):
        print "%s%s%s\n" % (labels[i], separator, data[i])


if __name__ == "__main__":
    main()

There are 60000 records which are entered to this mapper from two .csv files as follows (on single machine, not hadoop cluster):

cat mnist_train_labels.csv mnist_train_data.csv | ./mapper.py

解决方案

I was able to resolve the issue after searching a solution for like 3 days.

The problem is with the newer version of Hadoop (2.2.0 in my case). The mapper code, when reading values from files was giving an exit code of non-zero at some point (maybe because it was reading a huge list of values(784) at a time). There is a setting in the Hadoop 2.2.0, which tells the Hadoop System to give a general error (subprocess failed with code 1). This setting is set to True by default. I just had to set the value of this property to False, and it made my code run without any errors.

Setting is: stream.non.zero.exit.is.failure. Just set it to false when streaming. So the streaming command would be somewhat like:

**hadoop jar ... -D stream.non.zero.exit.is.failure=false ...**

Hope it helps someone, and saves 3 days... ;)

这篇关于在Python中使用Hadoop Streaming中的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆