我可以将自定义分区程序与group by一起使用吗? [英] Can I use a custom partitioner with group by?

查看:141
本文介绍了我可以将自定义分区程序与group by一起使用吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

比方说,我知道我的数据集是不平衡的,而且我知道密钥的分布.我想利用此功能编写一个自定义分区程序,以充分利用运算符实例.

Let's say that I know that my dataset is unbalanced and I know the distribution of the keys. I'd like leverage this to write a custom partitioner to get the most out of the operator instances.

我了解

I know about DataStream#partitionCustom. However, if my stream is keyed, will it still work properly? My job would look something like:

KeyedDataStream afterCustomPartition = keyedStream.partitionCustom(new MyPartitioner(), MyPartitionKeySelector())

DataStreamUtils.reinterpretAsKeyedStream(afterCustomPartition, new MyGroupByKeySelector<>()).sum()

我想要实现的是:

  • Having a stream keyBy according to some key so that the reduce function will only be called with elements from that key.
  • The group by split the work across nodes based on some custom partitioning.
  • The custom partitioning returning a number based on the number of parallel operator instances (which will be fixed and not subject to rescaling).
  • The custom partioning returning different values from the keyBy. However, keyBy(x) = keyBy(y) => partition(x) = partition(y).
  • Having pre-aggregation to minimize network traffic before partitioning.

用例示例:

  • 数据集:[(0,A),(0,B),(0,C),(1,D),(2,E)]
  • 并行运算符实例数:2
  • 按功能分组:返回该对中的第一个元素
  • 分区功能:为键0返回0,为键1和2返回1.优点:处理可能将键0和1发送到同一操作员实例的数据偏移,这意味着一个操作员实例将收到80%数据集.

推荐答案

DataStreamUtils.reinterpretAsKeyedStream()要求对数据进行相同的分区,就像调用keyBy()一样.

That is unfortunately not possible. DataStreamUtils.reinterpretAsKeyedStream() requires that the data is identically partitioned as if you would have called keyBy().

此限制的原因是密钥组以及密钥如何映射到密钥组.密钥组是Flink分配密钥状态的单位.密钥组的数量决定了运算符的最大并行度,并使用setMaxParallelism()进行配置.使用内部哈希函数将密钥分配给密钥组.通过更改密钥的分区,同一密钥组的密钥将分布在无法使用的多台计算机上.

The reason for this limitation are key groups and how keys are mapped to key groups. A key group is Flink's unit of how keyed state is distributed. The number of key groups determines the maximum parallelism of an operator and is configured with setMaxParallelism(). Keys are assigned to key groups with an internal hash function. By changing the partitioning of keys, keys for the same key group will be distributed across multiple machines which will not work.

为了调整对计算机的密钥分配,您需要更改对密钥组的密钥分配.但是,没有公共或可访问的接口可以执行此操作.因此,Flink 1.6不支持自定义密钥分发.

In order to tweak the assignment of key to machines, you would need to change the assignment of keys to key groups. However, there is no a public or accessible interface to do that. Therefore, custom key distributions are not supported in Flink 1.6.

这篇关于我可以将自定义分区程序与group by一起使用吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆