从 Pyspark UDF 调用另一个自定义 Python 函数 [英] Calling another custom Python function from Pyspark UDF
问题描述
假设您有一个文件,我们将其命名为 udfs.py
并在其中:
defnested_f(x):返回 x + 1def main_f(x):返回nested_f(x) + 1
然后您想从 main_f
函数中创建一个 UDF 并在数据帧上运行它:
import pyspark.sql.functions as fn将熊猫导入为 pdpdf = pd.DataFrame([[1], [2], [3]], columns=['x'])df = spark.createDataFrame(pdf)_udf = fn.udf(main_f, 'int')df.withColumn('x1', _udf(df['x'])).show()
如果我们在定义两个函数的同一个文件 (udfs.py
) 中执行此操作,则可以正常工作.但是,尝试从不同的文件(例如main.py
)执行此操作会产生错误ModuleNotFoundError: No module named ...
:
...导入udfs_udf = fn.udf(udfs.main_f, 'int')df.withColumn('x1', _udf(df['x'])).show()
我注意到,如果我实际上将nested_f
嵌套在main_f
中,如下所示:
def main_f(x):defnested_f(x):返回 x + 1返回nested_f(x) + 1
一切正常.但是,我在这里的目标是将逻辑很好地分离到多个函数中,我也可以单独测试.
我认为这可以通过使用 spark.sparkContext.addPyFile('...udfs.py')
.然而:
- 我觉得这有点啰嗦(特别是如果您需要压缩文件夹等...)
- 这并不总是容易/可能的(例如
udfs.py
可能使用了许多其他模块,然后也需要提交,导致一些连锁反应......) addPyFile
还存在其他一些不便(例如 autoreload 可以停止工作 etc )
所以问题是:有没有办法同时完成所有这些工作:
- 将 UDF 的逻辑很好地拆分为多个 Python 函数
- 使用不同于定义逻辑的文件中的 UDF
- 不需要使用
addPyFile
提交任何依赖项
阐明这是如何工作的/为什么这不起作用的奖励积分!
对于小的(一个或两个本地文件)依赖项,您可以使用 --py-files 并枚举它们,具有更大或更多的依赖项 - 最好打包以 zip 或 egg 文件的形式.
文件udfs.py
:
def my_function(*args, **kwargs):# 代码
文件main.py
:
from pyspark import SparkContext从 pyspark.sql 导入 SparkSession从 pyspark.sql.functions 导入 udf从 udfs 导入 my_functionsc = SparkContext()spark = SparkSession(sc)my_udf = udf(my_function)df = spark.createDataFrame([(1, "a"), (2, "b")])df.withColumn("my_f", my_udf("..."))
对于运行:
pyspark --py-files/path/to/udfs.py# 或者spark-submit --py-files/path/to/udfs.py main.py
如果你自己写了Python模块甚至第三方模块(不需要C编译),我个人需要用geoip2
,最好是创建一个zip或egg文件.
# pip with -t 安装目录`src`中的所有模块和依赖项pip install geoip2 -t ./src# 或者从本地目录pip 安装 ./my_module -t ./src# 最好的是pip install -r requirements.txt -t ./src# 如果你需要添加一些附加文件cp ./some_scripts/* ./src/# 然后打包光盘 ./srczip -r ../libs.zip .光盘..pyspark --py-files libs.zipspark-submit --py-files libs.zip
使用 pyspark --master yarn
(可能与其他非本地主选项)时要小心,在 pyspark shell 中使用 --py-files代码>:
EDIT - 关于如何在没有 addPyFile ()
和 --py-files
的情况下在执行器上获取函数的问题的答案:>
必须有一个给定的文件,其中包含单个执行程序的功能.并且可以通过 PATH env 访问.因此,我可能会编写一个 Python 模块,然后将其安装在执行程序上并在环境中可用.
Suppose you have a file, let's call it udfs.py
and in it:
def nested_f(x):
return x + 1
def main_f(x):
return nested_f(x) + 1
You then want to make a UDF out of the main_f
function and run it on a dataframe:
import pyspark.sql.functions as fn
import pandas as pd
pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)
_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()
This works OK if we do this from within the same file as where the two functions are defined (udfs.py
). However, trying to do this from a different file (say main.py
) produces an error ModuleNotFoundError: No module named ...
:
...
import udfs
_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()
I noticed that if I actually nest the nested_f
inside the main_f
like this:
def main_f(x):
def nested_f(x):
return x + 1
return nested_f(x) + 1
everything runs OK. However, my goal here is to have the logic nicely separated in multiple functions, which I can also test individually.
I think this can be solved by submitting the udfs.py
file (or a whole zipped folder) to the executors using spark.sparkContext.addPyFile('...udfs.py')
. However:
- I find this a bit long-winded (esp. if you need to zip folders etc...)
- This is not always easy/possible (e.g.
udfs.py
may be using lots of other modules which then also need to be submitted, leading to bit of chain reaction...) - There are some other inconveniences with
addPyFile
(e.g. autoreload can stop working etc )
So the question is: is there a way to do all of these at the same time:
- have the logic of the UDF nicely split to several Python functions
- use the UDF from a different file than where the logic is defined
- not needing to submit any dependencies using
addPyFile
Bonus points for clarifying how this works/why this doesn't work!
For small (one or two local files) dependencies you can use --py-files and enumerate them, with something bigger or more dependencies - it's better to pack it in a zip or egg file.
File udfs.py
:
def my_function(*args, **kwargs):
# code
File main.py
:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function
sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)
df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))
For run:
pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py
If you have written your own Python module or even third-party modules (which don't need C compilation), I personally needed it with geoip2
, it's better to create a zip or egg file.
# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src
# Best is
pip install -r requirements.txt -t ./src
# If you need add some additionals files
cp ./some_scripts/* ./src/
# And pack it
cd ./src
zip -r ../libs.zip .
cd ..
pyspark --py-files libs.zip
spark-submit --py-files libs.zip
Be careful when using pyspark --master yarn
(possibly with other non-local master options), in pyspark shell with --py-files
:
>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip') # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule # libs.zip/MyModule
EDIT - The answer on question of how to get functions on executors without addPyFile ()
and --py-files
:
It is necessary to have a given file with functions on individual executors. And reachable through PATH env. Therefore, I would probably write a Python Module, which I then install on the executors and was available in the environment.
这篇关于从 Pyspark UDF 调用另一个自定义 Python 函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!