Spark mapWithState将所有数据混洗到一个节点 [英] Spark mapWithState shuffles all data to one node

查看:214
本文介绍了Spark mapWithState将所有数据混洗到一个节点的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究Scala(2.11)/Spark(1.6.1)流项目,并使用mapWithState()跟踪以前批次中的可见数据.

I am working on a Scala (2.11) / Spark (1.6.1) streaming project and using mapWithState() to keep track of seen data from previous batches.

状态分为20个分区,由StateSpec.function(trackStateFunc _).numPartitions(20)创建.我曾希望将状态分布在整个群集中,但是似乎每个节点都拥有完整的状态,并且执行总是只在一个节点上执行.

The state is split in 20 partitions, created with StateSpec.function(trackStateFunc _).numPartitions(20). I had hoped to distribute the state throughout the cluster, but it seems that each node holds the complete state and the execution is always performed only exactly one node.

Locality Level Summary: Node local: 50,并且整个批次都是随机读取的.之后,我写信给Kafka,分区再次分布在整个集群中.我似乎无法找出为什么mapWithState()需要在单个节点上运行.如果状态是由一个节点而不是整个群集限制的,这是否会破坏对状态进行分区的概念?不可能通过密钥分配状态吗?

Locality Level Summary: Node local: 50 is shown in the UI for each batch and the complete batch is Shuffle read. Afterwards, I write to Kafka and the partitions are spread across the cluster again. I can't seem to find out why mapWithState() needs to be run on a single node. Doesn't this ruin the concept of partitioning the state if it is limited by one node instead of the complete cluster? Couldn't it be possible to distribute the state by key?

推荐答案

我似乎无法找出为什么mapWithState需要在 单节点

I can't seem to find out why mapWithState needs to be run on a single node

不是.默认情况下,Spark使用HashPartitioner在集群中的不同工作程序节点之间划分密钥.如果由于某种原因您看到所有数据都存储在不同的节点上,请检查密钥的分配.如果这是您用作键的自定义对象,请确保正确实施了hashCode方法.如果密钥分配有问题,可能会发生这种情况.如果要进行测试,请尝试使用随机数作为键,并查看Spark UI,然后查看此行为是否发生变化.

It doesn't. Spark by default uses a HashPartitioner to partition your keys among the different worker nodes in your cluster. If for some reason you're seeing all your data stored on a different node, check the distribution of your keys. If this is a custom object you're using as a key, make sure it's hashCode method is implemented properly. This can happen if something is wrong with the key distribution. If you'd like to test this, try using random numbers as your keys and looking a the Spark UI and seeing if this behavior changes.

我正在运行mapWithState,并且根据密钥对传入的数据进行分区,因为在保持状态之前以及在Spark上查看Storage选项卡时,我也有一个reduceByKey方法调用在用户界面中,我可以看到不同的RDD存储在集群中的不同工作节点上.

I'm running mapWithState and the data coming in is partitioned based on the key, as I also have a reduceByKey method call prior to holding the state, and when looking at the Storage tab on the Spark UI, I can see the different RDD's being stored on different worker nodes in the cluster.

这篇关于Spark mapWithState将所有数据混洗到一个节点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆