Apache Flink:如何使用 Flink DataSet API 从一个数据集创建两个数据集 [英] Apache Flink: How to create two datasets from one dataset using Flink DataSet API
问题描述
我正在使用 Flink 0.10.1 的 DataSet API 编写应用程序.我可以在 Flink 中使用单个运算符获得多个收集器吗?
I'm writing an application using DataSet API of Flink 0.10.1. Can I get multiple collectors using a single operator in Flink?
我想做的事情如下:
val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
(iterator, collector1, collector2) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector1.collect(elem1)
collector2.collect(elem2)
}
}
}
目前我调用 mapPartition 两次以从一个源数据集创建两个数据集.
Currently I'm calling mapPartition twice to make two datasets from one source dataset.
val lines = env.readTextFile(...)
val out_small = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem1)
}
}
}
val out_large = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem2)
}
}
}
由于 doParsing 函数非常昂贵,我想每行只调用一次.
As doParsing function is quite expensive, I want to call it just once per each line.
附言如果您能告诉我以更简单的方式完成此类工作的其他方法,我将不胜感激.
p.s. I would be very appreciated if you can let me know other approaches to do this kind of stuff in a simpler way.
推荐答案
Flink 不支持多个收集器.但是,您可以通过添加指示输出类型的附加字段来更改解析步骤的输出:
Flink does not support multiple collectors. However, you can change the output of your parsing step by adding an additional field that indicates the output type:
val lines = env.readTextFile(...)
val intermediate = lines **someOp** {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(0, elem1) // 0 indicates small
collector.collect(1, elem2) // 1 indicates large
}
}
}
接下来,您使用输出 intermediate
两次并过滤每个输出的第一个属性.第一个过滤器过滤 0
的第二个过滤器 1
(您还可以添加投影以摆脱第一个属性).
Next you consume the output intermediate
twice and filter each for the first attribute. The first filter filters for 0
the second filter for 1
(you an also add a projection to get rid of the first attribute).
+---> filter("0") --->
|
intermediate --+
|
+---> filter("1") --->
这篇关于Apache Flink:如何使用 Flink DataSet API 从一个数据集创建两个数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!