Apache Flink:如何使用 Flink DataSet API 从一个数据集创建两个数据集 [英] Apache Flink: How to create two datasets from one dataset using Flink DataSet API

查看:27
本文介绍了Apache Flink:如何使用 Flink DataSet API 从一个数据集创建两个数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Flink 0.10.1 的 DataSet API 编写应用程序.我可以在 Flink 中使用单个运算符获得多个收集器吗?

I'm writing an application using DataSet API of Flink 0.10.1. Can I get multiple collectors using a single operator in Flink?

我想做的事情如下:

val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
  (iterator, collector1, collector2) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector1.collect(elem1)
      collector2.collect(elem2)
    }
  } 
} 

目前我调用 mapPartition 两次以从一个源数据集创建两个数据集.

Currently I'm calling mapPartition twice to make two datasets from one source dataset.

val lines = env.readTextFile(...)
val out_small = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem1)
    }
  } 
}
val out_large = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem2)
    }
  } 
}

由于 doParsing 函数非常昂贵,我想每行只调用一次.

As doParsing function is quite expensive, I want to call it just once per each line.

附言如果您能告诉我以更简单的方式完成此类工作的其他方法,我将不胜感激.

p.s. I would be very appreciated if you can let me know other approaches to do this kind of stuff in a simpler way.

推荐答案

Flink 不支持多个收集器.但是,您可以通过添加指示输出类型的附加字段来更改解析步骤的输出:

Flink does not support multiple collectors. However, you can change the output of your parsing step by adding an additional field that indicates the output type:

val lines = env.readTextFile(...)
val intermediate = lines **someOp** {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(0, elem1) // 0 indicates small
      collector.collect(1, elem2) // 1 indicates large
    }
  } 
} 

接下来,您使用输出 intermediate 两次并过滤每个输出的第一个属性.第一个过滤器过滤 0 的第二个过滤器 1(您还可以添加投影以摆脱第一个属性).

Next you consume the output intermediate twice and filter each for the first attribute. The first filter filters for 0 the second filter for 1 (you an also add a projection to get rid of the first attribute).

               +---> filter("0") --->
               | 
intermediate --+
               | 
               +---> filter("1") --->

这篇关于Apache Flink:如何使用 Flink DataSet API 从一个数据集创建两个数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆