流的重用是否是流的副本 [英] Reuse of a Stream is a copy of stream or not

查看:68
本文介绍了流的重用是否是流的副本的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

例如,有一个键控流:

val keyedStream: KeyedStream[event, Key] = env
    .addSource(...)
    .keyBy(...)

// several transformations on the same stream
keyedStream.map(....)
keyedStream.window(....)
keyedStream.split(....)
keyedStream...(....)

我认为这是Flink中相同流的重用,我发现当我重用它时,流的内容不受其他转换的影响,所以我认为它是同一流的副本./p>

  • 但是我不知道这是对还是错.

  • 如果是,这将使用大量资源(哪些资源?)来保存副本?

解决方案

应用了多个运算符的DataStream(或KeyedStream)将复制所有传出消息.例如,如果您有一个程序,例如:

val keyedStream: KeyedStream[event, Key] = env
  .addSource(...)
  .keyBy(...)

val stream1: DataStream = keyedStream.map(new MapFunc1)
val stream2: DataStream = keyedStream.map(new MapFunc2)

程序执行为

           /-hash-> Map(MapFunc1) -> ...
 Source >-<
           \-hash-> Map(MapFunc2) -> ...

源复制每个记录,并将其发送给两个下游运算符(MapFunc1MapFunc2).运算符的类型(在我们的示例Map中)无关紧要.

这样做的代价是通过网络将每个记录发送两次.如果所有接收操作员都具有相同的并行性,则可以通过发送每个记录一次并在接收任务管理器中将其复制来优化它,但是目前尚未完成.

您可以通过添加一个接收操作符(例如,一个身份映射操作符)和另一个keyBy来手动优化程序,您可以从中将keyBy派生到多个接收者.因为所有记录都已经是本地的,所以这不会导致网络改组.不过,所有运算符必须具有相同的并行度.

For example, there is a keyed stream:

val keyedStream: KeyedStream[event, Key] = env
    .addSource(...)
    .keyBy(...)

// several transformations on the same stream
keyedStream.map(....)
keyedStream.window(....)
keyedStream.split(....)
keyedStream...(....)

I think this is the reuse of same stream in Flink, what I found is that when I reused it, the content of stream is not affected by the other transformation, so I think it is a copy of a same stream.

  • But I don't know if it is right or not.

  • If yes, this will use a lot of resources(which resources?) to keep the copies ?

解决方案

A DataStream (or KeyedStream) on which multiple operators are applied replicates all outgoing messages. For instance, if you have a program such as:

val keyedStream: KeyedStream[event, Key] = env
  .addSource(...)
  .keyBy(...)

val stream1: DataStream = keyedStream.map(new MapFunc1)
val stream2: DataStream = keyedStream.map(new MapFunc2)

The program is executed as

           /-hash-> Map(MapFunc1) -> ...
 Source >-<
           \-hash-> Map(MapFunc2) -> ...

The source replicates each record and sends it to both downstream operators (MapFunc1 and MapFunc2). The type of the operators (in our example Map) does not matter.

The cost of this is sending each record twice over the network. If all receiving operators have the same parallelism it could be optimized by sending each record once and duplicating it at the receiving task manager, but this is currently not done.

You manually optimize the program, by adding a single receiving operator (e.g., an identity Map operator) and another keyBy from which you fork to the multiple receivers. This will not result in a network shuffle, because all records are already local. All operator must have the same parallelism though.

这篇关于流的重用是否是流的副本的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆