应用程序运行一段时间后 Pyspark 套接字超时异常 [英] Pyspark socket timeout exception after application running for a while

查看:18
本文介绍了应用程序运行一段时间后 Pyspark 套接字超时异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 pyspark 来估计逻辑回归模型的参数.我使用 spark 计算似然和梯度,然后使用 scipy 的最小化函数进行优化 (L-BFGS-B).

I am using pyspark to estimate parameters for a logistic regression model. I use spark to calculate the likelihood and gradients and then use scipy's minimize function for optimization (L-BFGS-B).

我使用纱线客户端模式来运行我的应用程序.我的应用程序可以毫无问题地开始运行.但是,过一会就报如下错误:

I use yarn-client mode to run my application. My application could start to run without any problem. However, after a while it reports the following error:

Traceback (most recent call last):
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
    res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
    options={'disp': False})
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
    callback=callback, **options)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
    f, g = func_and_grad(x)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
    f = fun(x, *args)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
    return function(*(wrapper_args + args))
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
    return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
    vals = self.mapPartitions(func).collect()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
    yield self._read_with_length(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
    data = self._sock.recv(left)
socket.timeout: timed out

当我将 Spark 日志级别设置为ALL"时,我也发现了 python 损坏的管道 错误.

I also found python broken pipe error when I set spark log level to "ALL".

我使用的是 Spark 1.6.2 和 Java 1.8.0_91.知道发生了什么吗?

I am using Spark 1.6.2 and Java 1.8.0_91. Any idea what's going on?

我发现这与我在程序中使用的优化例程有关.

I found this is related to the optimization routine I used in my program.

我正在做的是使用 EM 算法(作为迭代算法)使用最大似然法估计统计模型.在每次迭代期间,我需要通过解决最小化问题来更新参数.Spark 负责计算我的似然性和梯度,然后将它们传递给 Scipy 的最小化例程,我使用 L-BFGS-B 方法.似乎这个例程中的某些东西使我的 Spark 工作崩溃.但我不知道是例程的哪一部分导致了这个问题.

What I was doing is estimating a statistical model with maximum likelihood method using EM algorithm (as iterative algorithm). During each iteration, I need to update the parameters by solving a minimization problem. Spark is responsible for calculating my likelihood and gradient, which are then passed to Scipy's minimize routine where I use L-BFGS-B method. It seems that something in this routine that crashes my Spark job. But I have no idea which part of the routine is responsible for this issue.

另一个观察结果是,在使用相同的示例和相同的程序时,我更改了分区数.当分区数量很少时,我的程序可以毫无问题地完成.但是,当分区数变大时,程序开始崩溃.

Another observation is that, while using the same sample and same program, I changed the number of partitions. When the number of partition is small my program could finish without any problem. However, when the number of partitions becomes large, the program starts to crash.

推荐答案

我遇到了类似的问题.我进行了一次迭代,有时执行时间太长以至于超时.增加 spark.executor.heartbeatInterval 似乎可以解决问题.我将它增加到 3600 秒,以确保我不会再次遇到超时并且从那时起一切正常.

I had similar problem. I had an iteration, and sometimes execution took so long it timed out. Increasing spark.executor.heartbeatInterval seemed to solve the problem. I increased it to 3600s to ensure I don't run into timeouts again and everything is working fine since then.

来自:http://spark.apache.org/docs/latest/configuration.html :

spark.executor.heartbeatInterval 10s 每个执行器对驱动程序的心跳之间的间隔.心跳让驱动程序知道执行程序仍然存在,并使用正在进行的任务的指标更新它.

spark.executor.heartbeatInterval 10s Interval between each executor's heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks.

这篇关于应用程序运行一段时间后 Pyspark 套接字超时异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆