我可以在 group by 中使用自定义分区器吗? [英] Can I use a custom partitioner with group by?

查看:37
本文介绍了我可以在 group by 中使用自定义分区器吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我知道我的数据集是不平衡的,并且我知道密钥的分布.我想利用它来编写自定义分区器以充分利用运算符实例.

Let's say that I know that my dataset is unbalanced and I know the distribution of the keys. I'd like leverage this to write a custom partitioner to get the most out of the operator instances.

我知道 DataStream#partitionCustom.但是,如果我的流被键控,它还能正常工作吗?我的工作看起来像这样:

I know about DataStream#partitionCustom. However, if my stream is keyed, will it still work properly? My job would look something like:

KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())

DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()

我想要实现的是:

  • 根据某个键有一个流 keyBy,这样reduce 函数只会被来自该键的元素调用.
  • 根据一些自定义分区将工作拆分到节点上.
  • 自定义分区根据并行运算符实例的数量返回一个数字(这将是固定的,不受重新缩放的影响).
  • 自定义分区从 keyBy 返回不同的值.然而,keyBy(x) = keyBy(y) =>partition(x) = partition(y).
  • 具有预聚合,以在分区前最大限度地减少网络流量.
  • Having a stream keyBy according to some key so that the reduce function will only be called with elements from that key.
  • The group by split the work across nodes based on some custom partitioning.
  • The custom partitioning returning a number based on the number of parallel operator instances (which will be fixed and not subject to rescaling).
  • The custom partioning returning different values from the keyBy. However, keyBy(x) = keyBy(y) => partition(x) = partition(y).
  • Having pre-aggregation to minimize network traffic before partitioning.

用例示例:

  • 数据集:[(0, A), (0, B), (0, C), (1, D), (2, E)]
  • 并行算子实例数:2
  • 按函数分组:返回对的第一个元素
  • 分区函数:键 0 返回 0,键 1 和 2 返回 1. 优点:处理可能将键 0 和 1 发送到同一个操作员实例的数据倾斜,这意味着一个操作员实例将收到 80%的数据集.

推荐答案

不幸的是,这是不可能的.DataStreamUtils.reinterpretAsKeyedStream() 要求对数据进行相同的分区,就像您调用 keyBy() 一样.

That is unfortunately not possible. DataStreamUtils.reinterpretAsKeyedStream() requires that the data is identically partitioned as if you would have called keyBy().

此限制的原因是密钥组以及密钥如何映射到密钥组.密钥组是 Flink 分配密钥状态的单位.键组的数量决定了一个算子的最大并行度,通过setMaxParallelism()进行配置.使用内部散列函数将密钥分配给密钥组.通过更改密钥的分区,同一密钥组的密钥将分布在多台机器上,这将不起作用.

The reason for this limitation are key groups and how keys are mapped to key groups. A key group is Flink's unit of how keyed state is distributed. The number of key groups determines the maximum parallelism of an operator and is configured with setMaxParallelism(). Keys are assigned to key groups with an internal hash function. By changing the partitioning of keys, keys for the same key group will be distributed across multiple machines which will not work.

为了调整键对机器的分配,您需要更改键对键组的分配.但是,没有公共或可访问的接口来做到这一点.因此,Flink 1.6 不支持自定义密钥分配.

In order to tweak the assignment of key to machines, you would need to change the assignment of keys to key groups. However, there is no a public or accessible interface to do that. Therefore, custom key distributions are not supported in Flink 1.6.

这篇关于我可以在 group by 中使用自定义分区器吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆