应用程序运行一段时间后,Pyspark套接字超时异常 [英] Pyspark socket timeout exception after application running for a while

查看:518
本文介绍了应用程序运行一段时间后,Pyspark套接字超时异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用pyspark估计逻辑回归模型的参数.我使用spark计算似然度和梯度,然后使用scipy的最小化函数(L-BFGS-B)进行优化.

I am using pyspark to estimate parameters for a logistic regression model. I use spark to calculate the likelihood and gradients and then use scipy's minimize function for optimization (L-BFGS-B).

我使用yarn-client模式运行我的应用程序.我的应用程序可以开始运行,没有任何问题.但是,过一会儿它报告以下错误:

I use yarn-client mode to run my application. My application could start to run without any problem. However, after a while it reports the following error:

Traceback (most recent call last):
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/simulation/20160716-1626/spark_1m_data.py", line 115, in <module>
    res = trainEM2(distData, params0, verbose=True, em_tol=1e-5, opt_method='L-BFGS-B')
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 166, in trainEM
    options={'disp': False})
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/_minimize.py", line 447, in minimize
    callback=callback, **options)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 330, in _minimize_lbfgsb
    f, g = func_and_grad(x)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/lbfgsb.py", line 278, in func_and_grad
    f = fun(x, *args)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/site-packages/scipy/optimize/optimize.py", line 289, in function_wrapper
    return function(*(wrapper_args + args))
  File "/home/panc/research/MixedLogistic/software/mixedlogistic/mixedlogistic_spark/Train2.py", line 146, in fun_observedQj
    return dataAndWeightsj_old.map(lambda _: calObservedQj(_[0], _[1], vparamsj, params0)).sum()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 995, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 869, in fold
    vals = self.mapPartitions(func).collect()
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 772, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File "/apps/hathi/spark-1.6.2/python/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 139, in load_stream
16/07/16 20:59:10 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
    yield self._read_with_length(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/apps/hathi/spark-1.6.2/python/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/apps/rhel6/Anaconda-2.0.1/lib/python2.7/socket.py", line 384, in read
    data = self._sock.recv(left)
socket.timeout: timed out

当我将火花记录级别设置为"ALL"时,我也发现了python broken pipe错误.

I also found python broken pipe error when I set spark log level to "ALL".

我正在使用Spark 1.6.2和Java 1.8.0_91.知道发生了什么事吗?

I am using Spark 1.6.2 and Java 1.8.0_91. Any idea what's going on?

我发现这与我在程序中使用的优化例程有关.

I found this is related to the optimization routine I used in my program.

我正在做的是使用EM算法(作为迭代算法)使用最大似然法估计统计模型.在每次迭代期间,我需要通过解决最小化问题来更新参数. Spark负责计算我的似然度和梯度,然后将其传递到Scipy的最小化例程中,在该例程中,我使用L-BFGS-B方法.似乎该例程中的某些内容使我的Spark作业崩溃了.但是我不知道例程的哪一部分负责此问题.

What I was doing is estimating a statistical model with maximum likelihood method using EM algorithm (as iterative algorithm). During each iteration, I need to update the parameters by solving a minimization problem. Spark is responsible for calculating my likelihood and gradient, which are then passed to Scipy's minimize routine where I use L-BFGS-B method. It seems that something in this routine that crashes my Spark job. But I have no idea which part of the routine is responsible for this issue.

另一个观察结果是,在使用相同的示例和相同的程序时,我更改了分区数.当分区数量很少时,我的程序可以顺利完成.但是,当分区数变大时,程序开始崩溃.

Another observation is that, while using the same sample and same program, I changed the number of partitions. When the number of partition is small my program could finish without any problem. However, when the number of partitions becomes large, the program starts to crash.

推荐答案

我遇到了类似的问题.我进行了一次迭代,有时执行花费了很长时间,所以超时了.增加spark.executor.heartbeatInterval似乎可以解决问题.我将其增加到3600s,以确保不会再次遇到超时,并且此后一切正常.

I had similar problem. I had an iteration, and sometimes execution took so long it timed out. Increasing spark.executor.heartbeatInterval seemed to solve the problem. I increased it to 3600s to ensure I don't run into timeouts again and everything is working fine since then.

来自: http://spark.apache.org/docs/latest/configuration. html :

spark.executor.heartbeatInterval 10s每个执行者对驱动程序的心跳之间的间隔.心跳使驾驶员知道执行器仍在运行,并使用正在进行的任务的度量标准对其进行更新.

spark.executor.heartbeatInterval 10s Interval between each executor's heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks.

这篇关于应用程序运行一段时间后,Pyspark套接字超时异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆