在Hadoop Map / Reduce中为多个映射器配置映射端连接 [英] Configure Map Side join for multiple mappers in Hadoop Map/Reduce

查看:112
本文介绍了在Hadoop Map / Reduce中为多个映射器配置映射端连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个关于在Hadoop中为多个映射器配置Map / Side内部连接的问题。
假设我有两个非常大的数据集A和B,我使用相同的分区和排序算法将它们分成较小的部分。对于A,假设我有(1)到(10),对于B我有b(1)到b(10)。确保a(1)和b(1)包含相同的密钥,a(2)和b(2)具有相同的密钥,依此类推。我想设置10个映射器,具体来说,映射器(1)映射器(10)。据我所知,Map / Side连接是mapper之前的一个预处理任务,因此,我想加入一个(1)和b(1)for mapper(1),加入一个(2)和b 2)用于映射器(2),等等。

在阅读了一些参考资料后,我仍不清楚如何配置这10个映射器。据我所知,使用CompositeInputFormat我可以加入两个文件,但似乎只配置一个映射器并在对(连续10个任务)后加入20个文件对。如何在真正的Map / Reduce(并行10个任务)中同时配置所有这10个映射器并加入10对?据我了解,10个映射器需要10个CompositeInputFormat设置,因为要加入的文件都是不同的。我坚信这是实用可行的,但我无法弄清楚我应该使用什么确切的命令。

任何提示和建议都非常受欢迎和赞赏。

Shi




非常感谢David的答复,David和托马斯!



感谢您对Map-side Join前提条件的强调。是的,我知道排序,API等等。在阅读你的评论之后,我认为我的实际问题是在CompositeInputFormat中加入两个文件的多个拆分的正确表达式是什么。例如,我将dataA和dataB分别在2个文件中排序和缩减:

<数据> b / A / dataA-r-00001

/ B / dataB-r-00000

/ B / dataB-r-00001

我现在使用的表达式命令是:

inner(tbl (org.apache.hadoop.mapred.KeyValueTextInputFormat, / A /数据A-R-00000),TBL(org.apache.hadoop.mapred.KeyValueTextInputFormat, / B /数据B-R-00000))



它可以工作,但正如您所提到的,它只启动两个映射器(因为内部连接可防止分裂),如果文件很大,效率可能非常低。如果我想使用更多的映射器(说另外两个映射器加入dataA-r-00001和dataB-r-00001),我该如何构建表达式,是这样的:
$ b $ (tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/ A / dataA-r-00000'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/ B / (org.apache.hadoop.mapred.KeyValueTextInputFormat,'/ A / dataA-r-00001'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/ B / dataB- r-00001'));



但我认为这可能是错误的,因为上面的命令实际上执行四个文件的内部连接(这将导致我因为文件* r-00000和* r-00001有不重叠的键)。

或者我可以使用这两个dirs作为输入,例如:



字符串joinexpression =内部(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/ A /'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,' / B /'));



内部连接会根据文件结尾自动匹配成对,例如00000到00000,00001到00001?我被困在这一点,因为我需要构造表达式并将它传递给

conf.set(mapred.join.expr,joinexpression);



因此,如果我想要使用更多映射器同时连接多个文件对,我应该如何构建正确的表达式?

解决方案

有map和reduce边连接。
您建议使用地图边连接,该地图边连接在映射器内部执行,而不是在它之前执行。
双方必须具有相同的键和值类型。所以你不能加入 LongWritable 和一个文字,尽管它们可能具有相同的值。



有一些细节需要注意:


  • 输入文件必须进行排序,因此它可能是reducer输出

  • 您可以通过设置应该对数据集进行排序的作业中的reducers数量来控制join-map-phase中mappers的数量
  • li>


整个过程基本上是这样的:您有数据集A和数据集B,它们共享相同的键,比如 LongWritable


  1. 运行两个作业,将两个数据集按键排序,两个作业都需要设置数字减少到相同的数字,比如2。

  2. 这会导致每个数据集的2个排序的文件

  3. 你设置了加入数据集的工作,这项工作将产生2个映射器。如果您在之前的工作中将减少的数字设置得更高,可能会更多。

  4. 在减少步骤中执行任何您喜欢的操作。

如果要加入的文件数量不相等,则会在作业设置期间导致异常。



设置一个连接是一种痛苦,主要是因为如果你的版本小于0.21.x,你必须使用旧的API for mapper和reducer。



本文很好地描述了它是如何工作的。滚动到底部,不幸的是,这个文档在最新的Hadoop文档中不见了。



另一个很好的参考是Hadoop权威指南,它更详细地解释所有这些以及举例。


I have a question about configuring Map/Side inner join for multiple mappers in Hadoop. Suppose I have two very large data sets A and B, I use the same partition and sort algorithm to split them into smaller parts. For A, assume I have a(1) to a(10), and for B I have b(1) to b(10). It is assured that a(1) and b(1) contain the same keys, a(2) and b(2) have the same keys, and so on. I would like to setup 10 mappers, specifically, mapper(1) to mapper(10). To my understanding, Map/Side join is a pre-processing task prior to the mapper, therefore, I would like to join a(1) and b(1) for mapper(1), to join a(2) and b(2) for mapper(2), and so on.

After reading some reference materials, it is still not clear to me how to configure these ten mappers. I understand that using CompositeInputFormat I would be able to join two files, but it seems only configuring one mapper and joining the 20 files pair after pair (in 10 sequential tasks). How to configure all these ten mappers and join ten pairs at the same time in a genuine Map/Reduce (10 tasks in parallel)? To my understanding, ten mappers would require ten CompositeInputFormat settings because the files to join are all different. I strongly believe this is practical and doable, but I couldn't figure out what exact commands I should use.

Any hint and suggestion is highly welcome and appreciated.

Shi


Thanks a lot for the replies, David and Thomas!

I appreciate your emphasis about the pre-requirements on Map-side Join. Yes, I am aware about the sort, API, etc. After reading your comments, I think my actual problem is what is the correct expression for joining multiple splits of two files in CompositeInputFormat. For example, I have dataA and dataB sorted and reduced in 2 files respectively:

/A/dataA-r-00000

/A/dataA-r-00001

/B/dataB-r-00000

/B/dataB-r-00001

The expression command I am using now is:

inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,"/A/dataA-r-00000"),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,"/B/dataB-r-00000"))

It works but as you mentioned, it only starts two mappers (because the inner join prevents from splitting) and could be very inefficient if the files are big. If I want to use more mappers (say another 2 mappers to join dataA-r-00001 and dataB-r-00001), how should I construct the expression, is it something like:

String joinexpression = "inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/dataA-r-00000'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/dataB-r-00000'), tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/dataA-r-00001'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/dataB-r-00001'))" ;

But I think that could be mistaken, because the command above actually perform inner join of four files (which will result in nothing in my case because file *r-00000 and *r-00001 have non-overlapping keys).

Or I could just use the two dirs as inputs, like:

String joinexpression = "inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/'))" ;

The inner join will match the pairs automatically according to the file endings, say "00000" to "00000", "00001" to "00001"? I am stuck at this point because I need to construct the expression and pass it to

conf.set("mapred.join.expr", joinexpression);

So in one word, how should I build the proper expression if I want to use more mappers to join multiple pairs of files simultaneously?

解决方案

There are map- and reduce side joins. You proposed to use a map side join, which is executed inside a mapper and not before it. Both sides must have the same key and value types. So you can't join a LongWritable and a Text, although they might have the same value.

There are subtle more things to note:

  • input files have to be sorted, so it has likely to be a reducer output
  • You can control the number of mappers in your join-map-phase by setting the number of reducers in the job that should've sorted the datasets

The whole procedure basically works like this: You have dataset A and dataset B, both share the same key, let's say LongWritable.

  1. Run two jobs that sort the two datasetsby their keys, both jobs HAVE TO set the number of reducers to an equal number, say 2.
  2. this will result in 2 sorted files for each dataset
  3. now you setup your job that joins the datasets, this job will spawn with 2 mappers. It could be more if you're setting the reduce numbers higher in the previous job.
  4. do whatever you like in the reduce step.

If the number of the files to be joined is not equal it will result in an exception during job setup.

Setting up a join is kind of painful, mainly because you have to use the old API for mapper and reducer if your version is less than 0.21.x.

This document describes very well how it works. Scroll all the way to the bottom, sadly this documentation is somehow missing in the latest Hadoop docs.

Another good reference is "Hadoop the Definitive Guide", which explains all of this in more detail and with examples.

这篇关于在Hadoop Map / Reduce中为多个映射器配置映射端连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆