在 Apache Beam 中连接行 [英] Joining rows in Apache Beam
问题描述
我无法理解 Apache Beam 中的连接(例如 http://www.waitingforcode.com/apache-beam/joins-apache-beam/read) 可以连接整行.
I'm having trouble understanding if the joins in Apache Beam (e.g. http://www.waitingforcode.com/apache-beam/joins-apache-beam/read) can join entire rows.
例如:
我有 2 个 CSV 格式的数据集,其中第一行是列标题.
I have 2 datasets, in CSV format, where the first rows are column headers.
第一个:
a,b,c,d
1,2,3,4
5,6,7,8
1,2,5,4
第二个:
c,d,e,f
3,4,9,10
我想在列 c 和 d 上离开 join,这样我就得到了:
I want to left join on columns c and d so that I end up with:
a,b,c,d,e,f
1,2,3,4,9,10
5,6,7,8,,
1,2,5,4,,
然而,Apache Beam 上的所有文档似乎都说 PCollection 对象在加入时必须是 KV<K, V>
类型,所以我将我的 PCollection 对象分解为 KV<K, V>
类型code>KV
However all the documentation on Apache Beam seems to say the PCollection objects need to be of type KV<K, V>
when joining, so I have broken down my PCollection objects to a collection of KV<String, String>
objects (where the key is the column header, and the value is row value). But in that case (where you just have a key with a value) I don't see how the row format can be maintained. How would KV(c,7) "know" that KV(a,5) is from the same row? Is Join meant for this sort of thing at all?
到目前为止我的代码:
PCollection<KV<String, String>> flightOutput = ...;
PCollection<KV<String, String>> arrivalWeatherDataForJoin = ...;
PCollection<KV<String, KV<String, String>>> output = Join.leftOuterJoin(flightOutput, arrivalWeatherDataForJoin, "");
推荐答案
是的,Join
是一个实用程序类,用于帮助像您这样的连接.它是 CoGropByKey
的包装器,参见 相应部分 在文档中.它的实现是 很短.它的 测试 也可能有有用的示例.
Yes, Join
is the utility class to help with joins like yours. It is a wrapper around CoGropByKey
, see the corresponding section in the docs. The implementation of it is pretty short. Its tests might also have helpful examples.
您的问题很可能是由您选择密钥的方式造成的.
Problem in your case is likely caused by how you're choosing the keys.
Join
库中的 KeyT
int KV
代表您用来匹配记录的密钥,它包含所有连接字段.所以在你的情况下,你可能需要分配这样的键(伪代码):
The KeyT
int KV<KeyT,V1>
in the Join
library represents the key which you are using to match the records, it contains all the join fields. So in your case you will probably need to assign keys something like this (pseudocode):
pCollection1:
Key Value
(3,4) (1,2,3,4)
(7,8) (5,6,7,8)
(5,4) (1,2,5,4)
pCollection2:
Key Value
(3,4) (3,4,9,10)
连接的结果看起来像这样(伪代码):
And what will come of the join will look something like this (pseudocode):
joinResultPCollection:
Key Value
(3,4) (1,2,3,4),(3,4,9,10)
(7,8) (5,6,7,8),nullValue
(5,4) (1,2,5,4),nullValue
因此,您可能需要在 join 之后添加另一个变换,才能将左侧和右侧实际合并为一个组合行.
So you will probably need to add another transform after join to actually merge the left and right side into a combined row.
因为你有一个 CSV,你可能可以使用像 "3,4"
这样的实际字符串作为键(和值).或者您可以使用 Lists<>
或您的自定义行类型.
Because you have a CSV, you probably could use actual strings like "3,4"
as keys (and values). Or you could use Lists<>
or your custom row types.
例如,这正是 Beam SQL Join 实现确实如此.
For example, this is exactly what Beam SQL Join implementation does.
这篇关于在 Apache Beam 中连接行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!