使用 Python Mapper 进行 Hadoop 流处理的多个输出文件 [英] Multiple Output Files for Hadoop Streaming with Python Mapper
问题描述
我想在这里对这个问题的答案进行一些澄清:
I am looking for a little clarification on the the answers to this question here:
我的用例如下:
我有一个仅映射的 mapreduce 作业,它接受一个输入文件,进行大量解析和处理,然后写回.但是,某些行的格式可能不正确,如果是这种情况,我想将原始行写入单独的文件.
I have a map-only mapreduce job that takes an input file, does a lot of parsing and munging, and then writes back out. However, certain lines may or may not be in an incorrect format, and if that is the case, I would like to write the original line to a separate file.
似乎一种方法是将文件名添加到我正在打印的行中并使用 multipleOutputFormat 参数.例如,如果我最初有:
It seems that one way to do this would be to prepend the name of the file to the line I am printing and use the multipleOutputFormat parameter. For example, if I originally had:
if line_is_valid(line):
print name + ' ' + comments
我可以改为:
if line_is_valid(line):
print valid_file_name + ' ' + name + ' ' + comments
else:
print err_file_name + ' ' + line
我对这个解决方案的唯一问题是我不希望 file_name 出现在文本文件的第一列.我想我可以运行另一个作业来去除每个文件的第一列,但这似乎有点傻.所以:
The only problem I have with this solution is that I don't want the file_name to appear as the first column in the textfiles. I suppose I could then run another job to strip out the first column of each file, but that seems kind of silly. So:
1) 这是使用 python mapreduce 作业管理多个输出文件的正确方法吗?
1) Is this the correct way to manage multiple output files with a python mapreduce job?
2) 摆脱初始列的最佳方法是什么?
2) What is the best way to get rid of that initial column?
推荐答案
您可以执行以下操作,但它涉及一些 Java 编译,如果您希望完成用例,我认为这应该不是问题无论如何使用Python-从 Python 中,据我所知,不能直接从最终输出中跳过文件名,因为您的用例在单个作业中需要.但是下面显示的内容可以轻松实现!
You can do something like the following, but it involves a little Java compiling, which I think shouldn't be a problem, if you want your use case done anyway with Python- From Python, as far as I know it's not directly possible to skip the filename from the final output as your use case demands in a single job. But what's shown below can make it possible with ease!
这里是需要编译的Java类-
Here is the Java class that's need to compiled -
package com.custom;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
public class CustomMultiOutputFormat extends MultipleTextOutputFormat<Text, Text> {
/**
* Use they key as part of the path for the final output file.
*/
@Override
protected String generateFileNameForKeyValue(Text key, Text value, String leaf) {
return new Path(key.toString(), leaf).toString();
}
/**
* We discard the key as per your requirement
*/
@Override
protected Text generateActualKey(Text key, Text value) {
return null;
}
}
编译步骤:
- 将文本完全保存到文件中(没有不同的名称)CustomMultiOutputFormat.java
当你在上面保存的文件所在的目录中时,输入 -
- Save the text to a file exactly (no different name) CustomMultiOutputFormat.java
While you are in the directory where the above saved file is, type -
$JAVA_HOME/bin/javac -cp $(hadoop classpath) -d .CustomMultiOutputFormat.java
在尝试之前确保 JAVA_HOME 设置为/path/to/your/SUNJDK上面的命令.
Make sure JAVA_HOME is set to /path/to/your/SUNJDK before attempting the above command.
使用(准确输入)制作您的 custom.jar 文件 -
Make your custom.jar file using (type exactly) -
$JAVA_HOME/bin/jar cvf custom.jar com/custom/CustomMultiOutputFormat.class
最后,像这样运行你的工作 -
Finally, run your job like -
hadoop jar/path/to/your/hadoop-streaming-*.jar -libjars custom.jar -outputformat com.custom.CustomMultiOutputFormat -file your_script.py -input inputpath --numReduceTasks 0 -output outputpath -映射器 your_script.py
执行这些操作后,您应该会在 outputpath 中看到两个目录,一个是 valid_file_name,另一个是 err_file_name.所有以valid_file_name 为标签的记录将进入valid_file_name 目录,所有以err_file_name 为标签的记录将进入err_file_name 目录.
After doing these you should see two directories inside your outputpath one with valid_file_name and other with err_file_name. All records having valid_file_name as a tag will go to valid_file_name directory and all records having err_file_name would go to err_file_name directory.
我希望所有这些都有意义.
I hope all these makes sense.
这篇关于使用 Python Mapper 进行 Hadoop 流处理的多个输出文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!