我可以在安装过程中的外部(R)过程连接到每个pyspark工人 [英] Can I connect an external (R) process to each pyspark worker during setup

查看:143
本文介绍了我可以在安装过程中的外部(R)过程连接到每个pyspark工人的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想有每个蟒蛇工人使用rpy2启动的R外壳。我可以某种类似于安装阶段期间,为此,我如何假定当您导入一个Python模块用于以后执行人的任务会发生这种情况?例如:

 导入numpy的是NPdf.mapPartitions(波长X:np.zeros(X))

在我来说,我要开始,而不是在每个执行人及进口ř库的R外壳,这将是这个样子:

 进口rpy2.robjects为robjects
从rpy2.robjects.packages进口importr
rlibrary = importr('testrlibrary')df.mapPartitions(波长X:rlibrary.rfunc(X))

但我不希望这调用 mapPartitions 内发生,因为那将在任务级发生,因为每个执行人的核心不是一次。这种方法可行,看起来更像下面的例子,但不是用我。

  DEF模型(分区):
    进口rpy2.robjects作为robjects
    从rpy2.robjects.packages进口importr
    rlibrary = importr('testrlibrary')
    rlibrary.rfunc(分区)df.mapPartitions(模型)


解决方案

这样的事情应该只是罚款:

 进口rpy2.robjects为robjects
从rpy2.robjects.packages进口importr高清length_(S):
    stringi = importr(stringi)
    返回stringi.stri_length(S)[0]sc.parallelize([富,酒吧,FOOBAR])。图(length_)

<一个href=\"https://bitbucket.org/rpy2/rpy2/src/fe7b154143bcc0316a036f2fef85cea90b2963ec/rpy/robjects/__init__.py?at=default&fileviewer=file-view-default#__init__.py-302\"相对=nofollow> 研究对象,从而重新presents - [R间preTER,<一个href=\"https://bitbucket.org/rpy2/rpy2/src/fe7b154143bcc0316a036f2fef85cea90b2963ec/rpy/robjects/__init__.py?at=default&fileviewer=file-view-default#__init__.py-347\"相对=nofollow>是一个单所以这将是一次初始化,R不重新导入已经连接库。有一个从调用一些开销要求多次,但相对于你的数据传递给和R的成本应该是微不足道的。

如果你想要的东西更复杂,你可以创建自己的单模块或使用的Borg 的模式来处理的进口,但它可能是矫枉过正。


  

我认为,当你输入一个Python模块用于以后执行人的任务会发生这种事。


这实际上取决于配置。默认情况下星火重用间preters任务之间,但这种行为可以被修改。

我提供了一些例子作为回答在Apache中的火花,是使用mapPartitions并结合使用广播变量之间的差异和地图。也许你会发现这些有用的。

I want to have each python worker start an R shell using rpy2. Can I do this during some sort of setup phase similar to how I assume this would happen when you import a Python module to be used for later executor tasks? For example:

import numpy as np

df.mapPartitions(lambda x: np.zeros(x))

In my case I want to instead start an R shell on each executor and import R libraries, which would look something like this:

import rpy2.robjects as robjects
from  rpy2.robjects.packages import importr
rlibrary = importr('testrlibrary')

df.mapPartitions(lambda x: rlibrary.rfunc(x))

But I don't want this to occur inside the call to mapPartitions, because then it would happen at the task-level as opposed to once per executor core. That approach works and looks more like the example below but is not useful for me.

def model(partition):
    import rpy2.robjects as robjects
    from  rpy2.robjects.packages import importr
    rlibrary = importr('testrlibrary')
    rlibrary.rfunc(partition)

df.mapPartitions(model)

解决方案

Something like this should work just fine:

import rpy2.robjects as robjects
from  rpy2.robjects.packages import importr

def length_(s):
    stringi = importr("stringi")  
    return stringi.stri_length(s)[0]

sc.parallelize(["foo", "bar", "foobar"]).map(length_)

R object, which represents R interpreter, is a singleton so it will be initialized only once and R doesn't re-import already attached libraries. There is some overhead from calling require multiple times but it should be negligible compared to the cost of passing your data to and from R.

If you want something more sophisticated you can create your own singleton module or use Borg pattern to handle imports but it could be an overkill.

I assume this would happen when you import a python module to be used for later executor tasks

It actually depends on a configuration. By default Spark reuses interpreters between tasks but this behavior can be modified.

I've provided some examples as an answer to In Apache spark, what is the difference between using mapPartitions and combine use of broadcast variable and map. Maybe you'll find these useful.

这篇关于我可以在安装过程中的外部(R)过程连接到每个pyspark工人的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆