如何将mapper-reducer的输出直接发送到另一个mapper-reducer而不将输出保存到hdfs中 [英] How to directly send the output of a mapper-reducer to a another mapper-reducer without saving the output into the hdfs

查看:136
本文介绍了如何将mapper-reducer的输出直接发送到另一个mapper-reducer而不将输出保存到hdfs中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最终解决问题在底部检查我的解决方案




最近我试图在Mahout in Action中的chaper6(列表6.1〜6.4)中运行推荐器示例。但我遇到了一个问题,我搜索了一下,但我找不到解决方案。



下面是问题:我有一对mapper-reducer

  public final class WikipediaToItemPrefsMapper扩展
Mapper< LongWritable,Text,VarLongWritable,VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile((\\d +));
$ b $ @Override
protected void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {
String line = value.toString();
Matcher m = NUM​​BERS.matcher(line);
m.find();
VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
VarLongWritable itemID = new VarLongWritable();
while(m.find()){
itemID.set(Long.parseLong(m.group()));
context.write(userID,itemID);



$ b public class WikipediaToUserVectorReducer
extends
Reducer< VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
$ b $ public void reduce(VarLongWritable userID,
Iterable< VarLongWritable> itemPrefs,Context context)
throws IOException,InterruptedException {
Vector userVector = new RandomAccessSparseVector(Integer。 MAX_VALUE,100); (VarLongWritable itemPref:itemPrefs){
userVector.set((int)itemPref.get(),1.0f);

}
context.write(userID,new VectorWritable(userVector));


code $


Reducer输出一个userID和一个userVector,像这样:
98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}
然后我想用另一对mapper-reducer来处理这些数据

  public class UserVectorSplitterMapper 
extends
Mapper< VarLongWritable,VectorWritable, IntWritable,VectorOrPrefWritable> {

public void map(VarLongWritable key,VectorWritable value,Context context)
throws IOException,InterruptedException {
long userID = key.get();
Vector userVector = value.get();
Iterator< Vector.Element> it = userVector.iterateNonZero();
IntWritable itemIndexWritable = new IntWritable();
while(it.hasNext()){
Vector.Element e = it.next();
int itemIndex = e.index();
float preferenceValue =(float)e.get();
itemIndexWritable.set(itemIndex);
context.write(itemIndexWritable,
VectorOrPrefWritable(userID,preferenceValue));
}
}
}

当我尝试运行工作,它表明错误说


org.apache.hadoop.io.Text不能转换为org.apache.mahout.math.VectorWritable

第一个mapper-reducer将输出写入hdfs,第二个mapper-reducer尝试读取输出,mapper可以将98955转换为VarLongWritable,但不能将
{590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}转换为VectorWritable,所以我想知道是否有办法使第一个mapper-reducer直接将输出发送到第二对,那么就不需要进行数据转换。我已经在查找Hadoop的行动,并hadoop:权威指南,似乎没有这样的方式来做到这一点,有什么建议?



解决方案

:通过使用 SequenceFileOutputFormat ,我们可以输出并保存第一个MapReduce工作流的reduce结果DFS,那么第二个MapReduce工作流可以在创建映射器时使用 SequenceFileInputFormat 类作为参数来读取临时文件作为输入。由于矢量将保存在具有特定格式的二进制序列文件中, SequenceFileInputFormat 可以读取它并将其转换回矢量格式。



以下是一些示例代码:

  confFactory ToItemPrefsWorkFlow = new confFactory 
(new Path(/ dbout),//输入文件路径
new Path(/ mahout / output.txt),//输出文件路径
TextInputFormat.class,/ /输入格式
VarLongWritable.class,//映射器密钥格式
Item_Score_Writable.class,//映射器值格式
VarLongWritable.class,// reducer密钥格式
VectorWritable.class, // reducer值格式
** SequenceFileOutputFormat.class ** //减速器输出格式

);
ToItemPrefsWorkFlow.setMapper(WikipediaToItemPrefsMapper.class);
ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
JobConf conf1 = ToItemPrefsWorkFlow.getConf();


confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
(new Path(/ mahout / output.txt),
new Path(/ mahout / UserVectorToCooccurrence),
SequenceFileInputFormat.class,//注意第二个工作流的映射器的输入格式现在是SequenceFileInputFormat.class
//UserVectorToCooccurrenceMapper.class,
IntWritable.class,
IntWritable .class,
IntWritable.class,
VectorWritable.class,
SequenceFileOutputFormat.class
);
UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();

JobClient.runJob(conf1);
JobClient.runJob(conf2);

如果您有任何问题,请随时与我联系

解决方案

您需要显式配置第一个作业的输出以使用SequenceFileOutputFormat并定义输出键和值类: p>

  job.setOutputFormat(SequenceFileOutputFormat.class); 
job.setOutputKeyClass(VarLongWritable.class);
job.setOutputKeyClass(VectorWritable.class);

没有看到您的驱动程序代码,我猜你正在使用TextOutputFormat作为输出,第一个作业,TextInputFormat作为第二个输入 - 并且此输入格式将< Text,Text> 的对发送给第二个映射器

Problem Solved Eventually check my solution in the bottom


Recently I am trying to run the recommender example in the chaper6 (listing 6.1 ~ 6.4)from the Mahout in Action. But I encountered a problem and I have googled around but I can't find the solution.

Here is the problem: I have a pair of mapper-reducer

public final class WikipediaToItemPrefsMapper extends
    Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}
}

public class WikipediaToUserVectorReducer
    extends
    Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
        Iterable<VarLongWritable> itemPrefs, Context context)
        throws IOException, InterruptedException {
    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (VarLongWritable itemPref : itemPrefs) {
        userVector.set((int) itemPref.get(), 1.0f);
    }
    context.write(userID, new VectorWritable(userVector));
}
}

The reducer output a userID and a userVector and it looks like this: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}

Then I want to use another pair of mapper-reducer to process this data

public class UserVectorSplitterMapper
    extends
    Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
        throws IOException, InterruptedException {
    long userID = key.get();
    Vector userVector = value.get();
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    IntWritable itemIndexWritable = new IntWritable();
    while (it.hasNext()) {
        Vector.Element e = it.next();
        int itemIndex = e.index();
        float preferenceValue = (float) e.get();
        itemIndexWritable.set(itemIndex);
        context.write(itemIndexWritable, 
                new VectorOrPrefWritable(userID, preferenceValue));
    }
}
}

When I try to run the job, it cast error says

org.apache.hadoop.io.Text cannot be cast to org.apache.mahout.math.VectorWritable

the first mapper-reducer write the output into the hdfs, and the second mapper-reducer try to read the output, the mapper can cast the 98955 to VarLongWritable, but can't convert {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} to VectorWritable, So I am wondering is there a way to make the first mapper-reducer directly send the output to the second pair, then there is no need to do the data converting. I have looked up Hadoop in action, and hadoop: the definitive guide, it seems there is no such a way to do that, any suggestions?


Problem solved

Solution: By using SequenceFileOutputFormat, we can output and save the reduce result of the first MapReduce workflow on the DFS, then the second MapReduce workflow can read the temporary file as input by using SequenceFileInputFormat class as parameter when creating the mapper. Since the vector would be saved in binary sequence file which has specific format, the SequenceFileInputFormat can read it and transform it back to vector format.

Here are some example code:

confFactory ToItemPrefsWorkFlow = new confFactory
            (new Path("/dbout"), //input file path
             new Path("/mahout/output.txt"), //output file path
             TextInputFormat.class, //input format
             VarLongWritable.class, //mapper key format
             Item_Score_Writable.class, //mapper value format
             VarLongWritable.class, //reducer key format
             VectorWritable.class, //reducer value format
             **SequenceFileOutputFormat.class** //The reducer output format             

    );
    ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
    ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
    JobConf conf1 = ToItemPrefsWorkFlow.getConf();


    confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
            (new Path("/mahout/output.txt"),
             new Path("/mahout/UserVectorToCooccurrence"),
             SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
             //UserVectorToCooccurrenceMapper.class,
             IntWritable.class,
             IntWritable.class,
             IntWritable.class,
             VectorWritable.class,
             SequenceFileOutputFormat.class                                      
             );
     UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
     UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
    JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();

    JobClient.runJob(conf1);
    JobClient.runJob(conf2);

If you have any problem with this please feel free to contact me

解决方案

You need to explicitly configure the output of the first job to use the SequenceFileOutputFormat and define the output key and value classes:

job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(VarLongWritable.class);
job.setOutputKeyClass(VectorWritable.class);

Without seeing your driver code, i'm guessing you're using TextOutputFormat as the output, of the first job, and TextInputFormat as the input to the second - and this input format sends pairs of <Text, Text> to the second mapper

这篇关于如何将mapper-reducer的输出直接发送到另一个mapper-reducer而不将输出保存到hdfs中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆