在 Hadoop Map/Reduce 中为多个映射器配置 Map Side join [英] Configure Map Side join for multiple mappers in Hadoop Map/Reduce

查看:25
本文介绍了在 Hadoop Map/Reduce 中为多个映射器配置 Map Side join的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个关于在 Hadoop 中为多个映射器配置 Map/Side 内部联接的问题.假设我有两个非常大的数据集 A 和 B,我使用相同的分区和排序算法将它们分成更小的部分.对于 A,假设我有 a(1) 到 a(10),而对于 B,我有 b(1) 到 b(10).确保 a(1) 和 b(1) 包含相同的密钥,a(2) 和 b(2) 具有相同的密钥,依此类推.我想设置 10 个映射器,特别是映射器(1)到映射器(10).据我了解,Map/Side join 是映射器之前的预处理任务,因此,我想加入 a(1) 和 b(1) for mapper(1),加入 a(2) 和 b(2) 用于映射器(2),依此类推.

I have a question about configuring Map/Side inner join for multiple mappers in Hadoop. Suppose I have two very large data sets A and B, I use the same partition and sort algorithm to split them into smaller parts. For A, assume I have a(1) to a(10), and for B I have b(1) to b(10). It is assured that a(1) and b(1) contain the same keys, a(2) and b(2) have the same keys, and so on. I would like to setup 10 mappers, specifically, mapper(1) to mapper(10). To my understanding, Map/Side join is a pre-processing task prior to the mapper, therefore, I would like to join a(1) and b(1) for mapper(1), to join a(2) and b(2) for mapper(2), and so on.

看了一些参考资料,我还是不太清楚这十个mapper是怎么配置的.我知道使用 CompositeInputFormat 我将能够加入两个文件,但似乎只配置了一个映射器并成对加入了 20 个文件(在 10 个连续任务中).如何在真正的 Map/Reduce(并行 10 个任务)中配置所有这十个映射器并同时加入十个对?据我了解,十个映射器需要十个 CompositeInputFormat 设置,因为要加入的文件全都不同.我坚信这是实用且可行的,但我无法弄清楚我应该使用哪些确切的命令.

After reading some reference materials, it is still not clear to me how to configure these ten mappers. I understand that using CompositeInputFormat I would be able to join two files, but it seems only configuring one mapper and joining the 20 files pair after pair (in 10 sequential tasks). How to configure all these ten mappers and join ten pairs at the same time in a genuine Map/Reduce (10 tasks in parallel)? To my understanding, ten mappers would require ten CompositeInputFormat settings because the files to join are all different. I strongly believe this is practical and doable, but I couldn't figure out what exact commands I should use.

非常欢迎和感谢任何提示和建议.

Any hint and suggestion is highly welcome and appreciated.

非常感谢大卫和托马斯的回复!

Thanks a lot for the replies, David and Thomas!

感谢您强调 Map-side Join 的先决条件.是的,我知道排序、API 等.阅读您的评论后,我认为我的实际问题是在 CompositeInputFormat 中连接两个文件的多个拆分的正确表达式是什么.例如,我分别在 2 个文件中对 dataA 和 dataB 进行了排序和缩减:

I appreciate your emphasis about the pre-requirements on Map-side Join. Yes, I am aware about the sort, API, etc. After reading your comments, I think my actual problem is what is the correct expression for joining multiple splits of two files in CompositeInputFormat. For example, I have dataA and dataB sorted and reduced in 2 files respectively:

/A/dataA-r-00000

/A/dataA-r-00000

/A/dataA-r-00001

/A/dataA-r-00001

/B/dataB-r-00000

/B/dataB-r-00000

/B/dataB-r-00001

/B/dataB-r-00001

我现在使用的表达式命令是:

The expression command I am using now is:

inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,"/A/dataA-r-00000"),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,"/B/dataB-r-00000"))

inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,"/A/dataA-r-00000"),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,"/B/dataB-r-00000"))

它可以工作,但正如您所提到的,它只启动两个映射器(因为内部联接防止拆分)并且如果文件很大,则效率可能非常低.如果我想使用更多的映射器(比如另外 2 个映射器连接 dataA-r-00001 和 dataB-r-00001),我应该如何构造表达式,是这样的:

It works but as you mentioned, it only starts two mappers (because the inner join prevents from splitting) and could be very inefficient if the files are big. If I want to use more mappers (say another 2 mappers to join dataA-r-00001 and dataB-r-00001), how should I construct the expression, is it something like:

String joinexpression = "inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/dataA-r-00000'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/dataB-r-00000'), tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/dataA-r-00001'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/dataB-r-00001'))";

String joinexpression = "inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/dataA-r-00000'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/dataB-r-00000'), tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/dataA-r-00001'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/dataB-r-00001'))" ;

但我认为这可能是错误的,因为上面的命令实际上执行了四个文件的内部连接(在我的情况下这不会产生任何结果,因为文件 *r-00000 和 *r-00001 具有不重叠的键).

But I think that could be mistaken, because the command above actually perform inner join of four files (which will result in nothing in my case because file *r-00000 and *r-00001 have non-overlapping keys).

或者我可以只使用两个目录作为输入,例如:

Or I could just use the two dirs as inputs, like:

String joinexpression = "inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/'))";

String joinexpression = "inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/'))" ;

内连接会根据文件结尾自动匹配对,比如00000"到00000",00001"到00001"?我被困在这一点上,因为我需要构造表达式并将其传递给

The inner join will match the pairs automatically according to the file endings, say "00000" to "00000", "00001" to "00001"? I am stuck at this point because I need to construct the expression and pass it to

conf.set("mapred.join.expr", joinexpression);

conf.set("mapred.join.expr", joinexpression);

所以一句话,如果我想使用更多的映射器同时连接多对文件,我应该如何构建正确的表达式?

So in one word, how should I build the proper expression if I want to use more mappers to join multiple pairs of files simultaneously?

推荐答案

有 map- 和 reduce 侧连接.您建议使用地图侧连接,它在映射器内部而不是在它之前执行.双方必须具有相同的键和值类型.所以你不能加入一个 LongWritable 和一个 Text,尽管它们可能具有相同的值.

There are map- and reduce side joins. You proposed to use a map side join, which is executed inside a mapper and not before it. Both sides must have the same key and value types. So you can't join a LongWritable and a Text, although they might have the same value.

还有一些微妙的注意事项:

There are subtle more things to note:

  • 输入文件必须排序,所以它可能是一个reducer输出
  • 您可以通过设置作业中应该对数据集进行排序的 reducer 的数量来控制 join-map-phase 中映射器的数量

整个过程基本上是这样的:你有数据集 A 和数据集 B,它们共享相同的密钥,比如 LongWritable.

The whole procedure basically works like this: You have dataset A and dataset B, both share the same key, let's say LongWritable.

  1. 运行两个作业,通过它们的键对两个数据集进行排序,两个作业都必须将 reducer 的数量设置为相同的数量,比如 2.
  2. 这将为每个数据集生成 2 个排序文件
  3. 现在您设置了连接数据集的作业,此作业将生成 2 个映射器.如果您在之前的工作中将减少数字设置得更高,则可能会更多.
  4. 在缩减步骤中做任何你喜欢的事情.

如果要加入的文件数量不相等,将导致作业设置过程中出现异常.

If the number of the files to be joined is not equal it will result in an exception during job setup.

设置join有点痛苦,主要是因为如果你的版本低于0.21.x,你必须使用旧的mapper和reducer API.

Setting up a join is kind of painful, mainly because you have to use the old API for mapper and reducer if your version is less than 0.21.x.

本文档很好地描述了它的工作原理. 一直滚动到底部,遗憾的是,最新的 Hadoop 文档中以某种方式缺少此文档.

This document describes very well how it works. Scroll all the way to the bottom, sadly this documentation is somehow missing in the latest Hadoop docs.

另一个很好的参考资料是Hadoop 权威指南",它更详细地解释了所有这些并附有示例.

Another good reference is "Hadoop the Definitive Guide", which explains all of this in more detail and with examples.

这篇关于在 Hadoop Map/Reduce 中为多个映射器配置 Map Side join的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆