Dataproc上的Spark流数据管道遇到突然的频繁套接字超时 [英] Spark streaming data pipelines on Dataproc experiencing sudden frequent socket timeouts

查看:103
本文介绍了Dataproc上的Spark流数据管道遇到突然的频繁套接字超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Google Cloud Dataproc上使用Spark流执行一个框架(用Python编写),该框架由几个连续的管道组成,每个管道代表Dataproc上的一个作业,该管道基本上从Kafka队列中读取并将转换后的输出写入Bigtable.所有管道组合起来,每天通过2个集群处理数GB的数据,一个集群具有3个工作节点,一个集群具有4个集群.

I am using Spark streaming on Google Cloud Dataproc for executing a framework (written in Python) which consists of several continuous pipelines, each representing a single job on Dataproc, which basically read from Kafka queues and write the transformed output to Bigtable. All pipelines combined handle several gigabytes of data per day via 2 clusters, one with 3 worker nodes and one with 4.

直到5月初(准确地说是5月3日),在Dataproc之上运行此Spark流框架一直相当稳定:我们开始遇到频繁的套接字超时异常,这些异常终止了我们的管道.它似乎与群集上的负载无关,因为它并未显着增加.它也全天随机发生,我检查了可能相关的代码更改,但找不到任何更改.此外,这似乎仅发生在具有4个工作节点的群集上,而具有3个节点的群集上的管道非常相似,并且完全没有超时.我已经两次重新创建了集群,但是问题仍然存在,并且会影响在此dataproc集群上运行的所有管道.具有3个节点的群集是n1-standard-4机器类型,而具有4个节点的麻烦的群集是n1-standard-8机器类型,除了它们的配置是相同的.

Running this Spark streaming framework on top of Dataproc has been fairly stable until the beginning of May (3rd of May to be precise): we started experiencing frequent socket timeout exceptions which terminate our pipelines. It doesn't seem to be related to the load on the cluster, as it has not significantly increased. It also happens quite randomly throughout the day and I have checked possibly related code changes but I could not find any. Moreover, this only seems to occur on the cluster with 4 worker nodes, while the pipelines on the cluster with 3 nodes are very similar and experience no timeouts at all. I have already recreated the cluster twice, but the issue remains and it affects all pipelines running on this dataproc cluster. Cluster with 3 nodes is a n1-standard-4 machine type, while the troublesome cluster with 4 nodes is a n1-standard-8 machine type, other then that their configuration is identical.

发生问题且作业终止时管道作业执行的示例输出:

Example output of a pipeline job execution when the problem occurs and the job terminates:

java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
16/05/23 14:45:45 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1464014740000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call
    r = self.func(t, *rdds)
  File "/tmp/b85990ba-e152-4d5b-8977-fb38915e78c4/transformfwpythonfiles.zip/transformationsframework/StreamManager.py", line 138, in process_kafka_rdd
    .foreach(lambda *args: None)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 747, in foreach
    self.mapPartitions(processPartition).count()  # Force evaluation
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold
    vals = self.mapPartitions(func).collect()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
    yield self._read_with_length(stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/usr/lib/python2.7/socket.py", line 380, in read
    data = self._sock.recv(left)
timeout: timed out

stacktrace的开始在我们的StreamManager模块中的方法process_kafka_rdd中:它在Kafka消息的直接流中处理单个离散的RDD.我们将Kafka与Spark流媒体集成是基于中描述的直接方法" http://spark.apache.org/docs/latest/streaming-kafka-integration.html

The start of the stacktrace is in our StreamManager module, method process_kafka_rdd: it processes a single discrete RDD within the direct stream of Kafka messages. Our integration of Kafka with Spark streaming is based upon the "direct approach" described on http://spark.apache.org/docs/latest/streaming-kafka-integration.html

推荐答案

我对Spark和套接字错误的体验是某些执行器突然死亡.当时有其他执行程序与之通信会引发套接字错误.

My experience with Spark and socket errors is that some executor has suddenly died. Some other executor communicating with it at the time raises the socket error.

以我的经验,执行器意外死亡的原因是某种资源匮乏,通常是内存不足.

In my experience, the cause of unexpected executor death is hitting some resource paucity, usually a shortage of memory.

(调整执行程序可以使用的内存量很重要.默认值通常太低.但是我怀疑您已经意识到了这一点.)

(It's important to tune the amount of memory executors can use. The defaults are typically way too low. But I suspect you are already aware of this.)

我认为Spark正在纱线上运行吗?不幸的是,以我的经验,Spark不能很好地报告问题发生的原因(当问题发生在纱线的内胆中).不幸的是,人们必须深入研究纱线记录,以找出实际导致执行人突然死亡的原因.执行者各自在纱线容器"中奔跑;在纱线日志中的某个位置应该有一个容器倒下的记录.

I assume Spark is running on top of yarn? Unfortunately, in my experience Spark does a poor job reporting the cause of the problem when it occurs down in the guts of yarn. Unfortunately one has to dig into the yarn logs to figure out what actually caused the sudden executor death. The executors each run in a yarn "container;" somewhere in the yarn logs there should be a record of a container falling over.

这篇关于Dataproc上的Spark流数据管道遇到突然的频繁套接字超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆