我可以在安装过程中的外部(R)过程连接到每个pyspark工人 [英] Can I connect an external (R) process to each pyspark worker during setup
问题描述
我想有每个蟒蛇工人使用rpy2启动的R外壳。我可以某种类似于安装阶段期间,为此,我如何假定当您导入一个Python模块用于以后执行人的任务会发生这种情况?例如:
导入numpy的是NPdf.mapPartitions(波长X:np.zeros(X))
在我来说,我要开始,而不是在每个执行人及进口ř库的R外壳,这将是这个样子:
进口rpy2.robjects为robjects
从rpy2.robjects.packages进口importr
rlibrary = importr('testrlibrary')df.mapPartitions(波长X:rlibrary.rfunc(X))
但我不希望这调用 mapPartitions
内发生,因为那将在任务级发生,因为每个执行人的核心不是一次。这种方法可行,看起来更像下面的例子,但不是用我。
DEF模型(分区):
进口rpy2.robjects作为robjects
从rpy2.robjects.packages进口importr
rlibrary = importr('testrlibrary')
rlibrary.rfunc(分区)df.mapPartitions(模型)
这样的事情应该只是罚款:
进口rpy2.robjects为robjects
从rpy2.robjects.packages进口importr高清length_(S):
stringi = importr(stringi)
返回stringi.stri_length(S)[0]sc.parallelize([富,酒吧,FOOBAR])。图(length_)
<一个href=\"https://bitbucket.org/rpy2/rpy2/src/fe7b154143bcc0316a036f2fef85cea90b2963ec/rpy/robjects/__init__.py?at=default&fileviewer=file-view-default#__init__.py-302\"相对=nofollow> 研究
对象,从而重新presents - [R间preTER,<一个href=\"https://bitbucket.org/rpy2/rpy2/src/fe7b154143bcc0316a036f2fef85cea90b2963ec/rpy/robjects/__init__.py?at=default&fileviewer=file-view-default#__init__.py-347\"相对=nofollow>是一个单所以这将是一次初始化,R不重新导入已经连接库。有一个从调用一些开销要求
多次,但相对于你的数据传递给和R的成本应该是微不足道的。
如果你想要的东西更复杂,你可以创建自己的单模块或使用的Borg 的模式来处理的进口,但它可能是矫枉过正。
我认为,当你输入一个Python模块用于以后执行人的任务会发生这种事。
块引用>这实际上取决于配置。默认情况下星火重用间preters任务之间,但这种行为可以被修改。
我提供了一些例子作为回答在Apache中的火花,是使用mapPartitions并结合使用广播变量之间的差异和地图。也许你会发现这些有用的。
I want to have each python worker start an R shell using rpy2. Can I do this during some sort of setup phase similar to how I assume this would happen when you import a Python module to be used for later executor tasks? For example:
import numpy as np df.mapPartitions(lambda x: np.zeros(x))
In my case I want to instead start an R shell on each executor and import R libraries, which would look something like this:
import rpy2.robjects as robjects from rpy2.robjects.packages import importr rlibrary = importr('testrlibrary') df.mapPartitions(lambda x: rlibrary.rfunc(x))
But I don't want this to occur inside the call to
mapPartitions
, because then it would happen at the task-level as opposed to once per executor core. That approach works and looks more like the example below but is not useful for me.def model(partition): import rpy2.robjects as robjects from rpy2.robjects.packages import importr rlibrary = importr('testrlibrary') rlibrary.rfunc(partition) df.mapPartitions(model)
解决方案Something like this should work just fine:
import rpy2.robjects as robjects from rpy2.robjects.packages import importr def length_(s): stringi = importr("stringi") return stringi.stri_length(s)[0] sc.parallelize(["foo", "bar", "foobar"]).map(length_)
R
object, which represents R interpreter, is a singleton so it will be initialized only once and R doesn't re-import already attached libraries. There is some overhead from callingrequire
multiple times but it should be negligible compared to the cost of passing your data to and from R.If you want something more sophisticated you can create your own singleton module or use Borg pattern to handle imports but it could be an overkill.
I assume this would happen when you import a python module to be used for later executor tasks
It actually depends on a configuration. By default Spark reuses interpreters between tasks but this behavior can be modified.
I've provided some examples as an answer to In Apache spark, what is the difference between using mapPartitions and combine use of broadcast variable and map. Maybe you'll find these useful.
这篇关于我可以在安装过程中的外部(R)过程连接到每个pyspark工人的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!