使用多处理并行运行rpy2会引发无法捕获的怪异异常 [英] Running rpy2 in parallel using multiprocessing raises weird exception that cannot be caught

查看:85
本文介绍了使用多处理并行运行rpy2会引发无法捕获的怪异异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以这是我一直无法解决的问题,我也不知道制作MCVE的好方法.从本质上讲,已经在此处进行了简要讨论,但正如评论所显示的,存在一些分歧,最终裁决仍是出去.因此,我再次发布了一个类似的问题,希望能得到更好的答案.

So this is a problem that I have not been able to solve, and neither do I know of a good way to make a MCVE out of. Essentially, it has been briefly discussed here, but as the comments show, there was some disagreement, and the final verdict is still out. Hence I am posting a similar question again, hoping to get a better answer.

我从每分钟获取的成千上万个传感器中获取传感器数据.我的兴趣在于预测数据.为此,我使用了ARIMA系列预测模型.长话短说,在与我的研究小组的其他成员讨论之后,我们决定使用R包forecast中可用的Arima函数,而不是相同的statsmodels实现.

I have sensor data from a couple of thousand sensors, that I get every minute. My interest lies in forecasting the data. For this I am using the ARIMA family of forecasting models. Long story short, after discussion with the rest of my research group, we decided to use the Arima function available in the R package forecast, instead of the statsmodels implementation of the same.

自从我有了几千个传感器的数据以来,我想至少分析一整周的数据(开始时),并且由于一周有7天,所以我的数据量是7倍和我一起感应数据.基本上是14k传感器/天的组合.对于每个传感器日组合,找到最佳ARIMA顺序(可将BIC最小化)并预测一周的第二天数据大约需要1分钟.这意味着要花11天以上的时间才能在单个内核上处理一个星期的数据!

Since, I have data from a few thousand sensors, for which I would like to at least analyse a whole week's worth of data (to begin with), and since a week has 7 days, I have 7 times the number of sensors data with me. Essentially a some 14k sensor-day combinations. Finding the best ARIMA order (which minimizes BIC) and forecasting the next day of week data takes about 1 minute for each sensor-day combination. Which means upwards of 11 days to just process one week data on a single core!

当我还有15个内核一直闲置时,这显然是浪费.因此,显然,这是并行处理的问题.请注意,每个传感器日组合不会影响任何其他传感器日组合.另外,我的其余代码也进行了很好的配置和优化.

This is obviously a waste, when I have 15 more cores just idling away the whole time. So, obviously, this is a problem for parallel processing. Note that each sensor-day combination does not influence any other sensor-day combination. Also, the rest of my code is fairly well profiled, and optimized.

问题是我遇到了一个我无法在任何地方捕获的奇怪错误.这是转载的错误:

The issue is that I get this weird error that I cannot catch anywhere. Here is the error reproduced:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/home/kartik/miniconda3/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/home/kartik/miniconda3/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/home/kartik/miniconda3/lib/python3.5/multiprocessing/pool.py", line 429, in _handle_results
    task = get()
  File "/home/kartik/miniconda3/lib/python3.5/multiprocessing/connection.py", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
  File "/home/kartik/miniconda3/lib/python3.5/site-packages/rpy2/robjects/robject.py", line 55, in _reduce_robjectmixin
    rinterface_level=rinterface_factory(rdumps, rtypeof)
ValueError: Mismatch between the serialized object and the expected R type (expected 6 but got 24)

以下是我发现的此错误的一些特征:

Here are a few characteristics of this error that I have discovered:

  1. rpy2程序包中引发
  2. 它与线程3有关.由于Python的索引为零,所以我猜这是第四个线程.因此,4x6 = 24,这等于最终错误声明中显示的数字
  3. rpy2仅在我的代码中的一个地方使用,可能需要将返回的值重新编码为Python类型.在try: ... except: ...中保护该行不会捕获该异常
  4. 放弃多处理并在循环内调用该函数时,不会引发异常
  5. 该异常不会使程序崩溃,只是将其永久挂起(直到我按Ctrl + C使其终止)
  6. 到目前为止,我尝试过的所有操作对于解决错误均无效
  1. It is raised in the rpy2 package
  2. It has something to do with Thread 3. Since Python is zero indexed, I am guessing this is the fourth thread. Therefore, 4x6 = 24, which adds up to the numbers shown in the final error statement
  3. rpy2 is being used in only one place in my code where it might have to recode returned values into Python types. Protecting that line in try: ... except: ... does not catch that exception
  4. The exception is not raised when I ditch the multiprocessing and call the function within a loop
  5. The exception does not crash the program, just suspends it forever (till I Ctrl+C it into terminating)
  6. All that I tried till now, have had no effect in resolving the error

尝试过的事情

我尝试了所有方法,从极限过程编码,处理最少情况的函数(只有一个并行调用的函数)到极限封装,在极端封装中,if __name__ == '__main__':中的可执行块调用了一个函数,读取数据,进行必要的分组,然后将组传递给另一个函数,该函数导入multiprocessing并并行调用另一个函数,该函数导入导入rpy2的处理模块,然后将数据传递给Arima在R中起作用.

Things Tried

I have tried everything from extreme procedural coding, with functions to deal with the least cases (that is only one function to be called in parallel), to extreme encapsulation, where the executable block in the if __name__ == '__main__': calls a function which reads in the data, does the necessary grouping, then passes the groups to another function, which imports multiprocessing and calls another function in parallel, which imports the processing module that imports rpy2, and passes the data to the Arima function in R.

基本上,是否在函数嵌套的深处调用rpy2并对其进行初始化并不重要,这样它就不知道可以初始化另一个实例,或者如果在全局范围内对其进行了一次调用和初始化,则会引发错误如果涉及到multiprocessing.

Basically, it doesn't matter if rpy2 is called and initialized deep inside function nests, such that it has no idea another instance might be initialized, or if it is called and initialized once, globally, the error is raised if multiprocessing is involved.

这里尝试显示至少一些基本的伪代码,以便可以重现该错误.

Here is an attempt to present at least some basic pseudo code such that the error might be reproduced.

import numpy as np
import pandas as pd

def arima_select(y, order):
    from rpy2 import robjects as ro
    from rpy2.robjects.packages import importr
    from rpy2.robjects import pandas2ri
    pandas2ri.activate()
    forecast = importr('forecast')

    res = forecast.Arima(y, order=ro.FloatVector(order))
    return res

def arima_wrapper(data):
    data = data[['tstamp', 'val']]
    data.set_index('tstamp', inplace=True)
    return arima_select(data, (1,1,1))

def applyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])

def wrapper():
    df = pd.read_csv('file.csv', parse_dates=[1], infer_datetime_format=True)
    df['day'] = df['tstamp'].dt.day
    res = applyParallel(df.groupby(['sensor', 'day']), arima_wrapper)
    print(res)

显然,以上代码可以进一步封装,但我认为它应该可以非常准确地重现该错误.

Obviously, the above code can be encapsulated further, but I think it should reproduce the error quite accurately.

在上面的伪代码中,print(data.head(6))的输出紧接在arima_wrapper中的data.set_index('tstamp', inplace=True)下面时,这是print(data.head(6)):

Here is the output of print(data.head(6)) when placed immediately below data.set_index('tstamp', inplace=True) in arima_wrapper from the pseudo code above:

或者,可以简单地使用以下命令生成整个一周的传感器数据:

Or alternatively, data for a sensor, for a whole week can be generated simply with:

def data_gen(start_day):
    r = pd.Series(pd.date_range('2016-09-{}'.format(str(start_day)),
                                periods=24*60, freq='T'),
                  name='tstamp')
    d = pd.Series(np.random.randint(10, 80, 1440), name='val')
    s = pd.Series(['sensor1']*1440, name='sensor')
    return pd.concat([s, r, d], axis=1)
df = pd.concat([data_gen(day) for day in range(1,8)], ignore_index=True)

观察和问题

第一个观察结果是,仅当涉及到multiprocessing时才会出现此错误,而不是在循环中调用函数(arima_wrapper)时才会出现此错误.因此,它必须以某种方式与多处理问题相关联. R不是很友好的多进程,但是以伪代码中所示的方式编写时,R的每个实例都不应该知道其他实例的存在.

Observations and Questions

The first observation is that this error is only raised when multiprocessing is involved, not when the function (arima_wrapper) is called in a loop. Therefore, it must be associated somehow with multiprocessing issues. R is not very multiprocess friendly, but when written in the way shown in the pseudo code, each instance of R should not know about the existence of the other instances.

伪代码的构造方式,对于multiprocessing产生的多个子进程中的每个调用,必须存在rpy2的初始化.如果是这样,则rpy2的每个实例都应生成自己的R实例,该实例仅执行一个函数并终止.这不会引起任何错误,因为它类似于单线程操作.我的理解在这里是准确的,还是我完全或部分地忽略了这一点?

The way the pseudo code is structured, there must be an initialization of rpy2 for each call inside the multiple subprocesses spawned by multiprocessing. If that were true, each instance of rpy2 should have spawned its own instance of R, which should just execute one function, and terminate. That would not raise any errors, because it would be similar to the single threaded operation. Is my understanding here accurate, or am I completely or partially missing the point?

是否所有rpy2实例都以某种方式共享R的实例,那么我可能会合理地预期该错误.正确的说法是:R是在rpy2的所有实例之间共享的,还是rpy2的每个实例都存在一个R的实例?

Were all instances of rpy2 to somehow share an instance of R, then I might reasonably expect the error. What is true: is R shared among all instances of rpy2, or is there an instance of R for each instance of rpy2?

如何解决此问题?

由于SO讨厌其中包含多个问题的问题线程,因此我将优先考虑我的问题,以便接受部分答案.这是我的优先级列表:

Since SO hates question threads with multiple questions in them, I will prioritize my questions such that partial answers will be accepted. Here is my priority list:

  1. 如何解决此问题?一个不会引起问题的有效代码示例将被接受为答案,即使它没有回答任何其他问题,前提是没有其他答案能更好地解决或更早发布该问题.
  2. 我对Python导入的理解是否准确,还是我错过了有关R的多个实例的要点?如果我错了,我应该如何编辑导入语句,以便在每个子流程中创建一个新实例?该问题的答案很可能会将我引向一个可能的解决方案,并且如果没有更好的答案或被发布得早,则该答案将被接受
  3. R是在rpy2的所有实例之间共享吗?还是rpy2的每个实例都有R的实例?仅当答案能够解决问题时,该问题的答案才会被接受.
  1. How might this issue be overcome? A working code example that does not raise the issue will be accepted as answer even if it does not answer any other question, provided no other answer does better, or was posted earlier.
  2. Is my understanding of Python imports accurate, or am I missing the point about multiple instances of R? If I am wrong, how should I edit the import statements such that a new instance is created within each subprocess? Answers to this question are likely to point me towards a probable solution, and will be accepted, provided no answer does better, or was posted earlier
  3. Is R shared among all instances of rpy2 or is there an instance of R for each instance of rpy2? Answers to this question will be accepted only if they lead to a resolution of the problem.

推荐答案

对问题工作中呈现的伪代码中arima_select函数的以下更改:

The following changes to the arima_select function in the pesudo code presented in the question work:

import numpy as np
import pandas as pd
from rpy2 import rinterface as ri

ri.initr()

def arima_select(y, order):

    def rimport(packname):
        as_environment = ri.baseenv['as.environment']
        require = ri.baseenv['require']
        require(ri.StrSexpVector([packname]),
                quiet = ri.BoolSexpVector((True, )))
        packname = ri.StrSexpVector(['package:' + str(packname)])
        pack_env = as_environment(packname)
        return pack_env

    frcst = rimport("forecast")
    args = (('y', ri.FloatSexpVector(y)),
            ('order', ri.FloatSexpVector(order)),
            ('include.constant', ri.StrSexpVector(const)))
    return frcst['Arima'].rcall(args, ri.globalenv)

其余的伪代码保持相同.请注意,此后我进一步优化了代码,它并不需要问题中提出的所有功能.基本上,以下是必要和充分的:

Keeping the rest of the pseudo code the same. Note that I have since optimized the code further, and it does not require all the functions presented in the question. Basically, the following is necessary and sufficient:

import numpy as np
import pandas as pd
from rpy2 import rinterface as ri

ri.initr()

def arima(y, order=(1,1,1)):
    # This is the same as arima_select above, just renamed to arima
    ...

def applyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count(), worker_init) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])

def main():
    # Create your df in your favorite way:
    def data_gen(start_day):
        r = pd.Series(pd.date_range('2016-09-{}'.format(str(start_day)),
                                    periods=24*60, freq='T'),
                      name='tstamp')
        d = pd.Series(np.random.randint(10, 80, 1440), name='val')
        s = pd.Series(['sensor1']*1440, name='sensor')
        return pd.concat([s, r, d], axis=1)
    df = pd.concat([data_gen(day) for day in range(1,8)], ignore_index=True)

    applyParallel(df.groupby(['sensor', pd.Grouper(key='tstamp', freq='D')]),
                  arima) # Note one may use partial from functools to pass order to arima

请注意,我也不会直接从applyParallel调用arima,因为我的目标是为给定系列(传感器和日期)找到最佳模型.我使用函数arima_wrapper迭代订单组合,并在每次迭代时调用arima.

Note that I also do not call arima directly from applyParallel since my goal is to find the best model for the given series (for a sensor and day). I use a function arima_wrapper to iterate through the order combinations, and call arima at each iteration.

这篇关于使用多处理并行运行rpy2会引发无法捕获的怪异异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆