从 Pyspark UDF 调用另一个自定义 Python 函数 [英] Calling another custom Python function from Pyspark UDF

查看:46
本文介绍了从 Pyspark UDF 调用另一个自定义 Python 函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设您有一个文件,我们将其命名为 udfs.py 并在其中:

defnested_f(x):返回 x + 1def main_f(x):返回nested_f(x) + 1

然后您想从 main_f 函数中创建一个 UDF 并在数据帧上运行它:

import pyspark.sql.functions as fn将熊猫导入为 pdpdf = pd.DataFrame([[1], [2], [3]], columns=['x'])df = spark.createDataFrame(pdf)_udf = fn.udf(main_f, 'int')df.withColumn('x1', _udf(df['x'])).show()

如果我们在定义两个函数的同一个文件 (udfs.py) 中执行此操作,则可以正常工作.但是,尝试从不同的文件(例如main.py)执行此操作会产生错误ModuleNotFoundError: No module named ...:

...导入udfs_udf = fn.udf(udfs.main_f, 'int')df.withColumn('x1', _udf(df['x'])).show()

我注意到,如果我实际上将nested_f嵌套在main_f中,如下所示:

def main_f(x):defnested_f(x):返回 x + 1返回nested_f(x) + 1

一切正常.但是,我在这里的目标是将逻辑很好地分离到多个函数中,我也可以单独测试.

认为这可以通过使用 spark.sparkContext.addPyFile('...udfs.py').然而:

  1. 我觉得这有点啰嗦(特别是如果您需要压缩文件夹等...)
  2. 这并不总是容易/可能的(例如 udfs.py 可能使用了许多其他模块,然后也需要提交,导致一些连锁反应......)
  3. addPyFile 还存在其他一些不便(例如 autoreload 可以停止工作 etc )

所以问题是:有没有办法同时完成所有这些工作:

  • 将 UDF 的逻辑很好地拆分为多个 Python 函数
  • 使用不同于定义逻辑的文件中的 UDF
  • 不需要使用 addPyFile
  • 提交任何依赖项

阐明这是如何工作的/为什么这不起作用的奖励积分!

解决方案

对于小的(一个或两个本地文件)依赖项,您可以使用 --py-files 并枚举它们,具有更大或更多的依赖项 - 最好打包以 zip 或 egg 文件的形式.

文件udfs.py:

def my_function(*args, **kwargs):# 代码

文件main.py:

from pyspark import SparkContext从 pyspark.sql 导入 SparkSession从 pyspark.sql.functions 导入 udf从 udfs 导入 my_functionsc = SparkContext()spark = SparkSession(sc)my_udf = udf(my_function)df = spark.createDataFrame([(1, "a"), (2, "b")])df.withColumn("my_f", my_udf("..."))

对于运行:

pyspark --py-files/path/to/udfs.py# 或者spark-submit --py-files/path/to/udfs.py main.py

如果你自己写了Python模块甚至第三方模块(不需要C编译),我个人需要用geoip2,最好是创建一个zip或egg文件.

# pip with -t 安装目录`src`中的所有模块和依赖项pip install geoip2 -t ./src# 或者从本地目录pip 安装 ./my_module -t ./src# 最好的是pip install -r requirements.txt -t ./src# 如果你需要添加一些附加文件cp ./some_scripts/* ./src/# 然后打包光盘 ./srczip -r ../libs.zip .光盘..pyspark --py-files libs.zipspark-submit --py-files libs.zip

使用 pyspark --master yarn(可能与其他非本地主选项)时要小心,在 pyspark shell 中使用 --py-files:

<预><代码>>>>导入系统>>>sys.path.insert(0, '/path/to/libs.zip') # 可以使用相对路径:.insert(0, 'libs.zip')>>>导入 MyModule # libs.zip/MyModule

EDIT - 关于如何在没有 addPyFile ()--py-files 的情况下在执行器上获取函数的问题的答案:

必须有一个给定的文件,其中包含单个执行程序的功能.并且可以通过 PATH env 访问.因此,我可能会编写一个 Python 模块,然后将其安装在执行程序上并在环境中可用.

Suppose you have a file, let's call it udfs.py and in it:

def nested_f(x):
    return x + 1

def main_f(x):
    return nested_f(x) + 1

You then want to make a UDF out of the main_f function and run it on a dataframe:

import pyspark.sql.functions as fn
import pandas as pd

pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)

_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

This works OK if we do this from within the same file as where the two functions are defined (udfs.py). However, trying to do this from a different file (say main.py) produces an error ModuleNotFoundError: No module named ...:

...
import udfs

_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

I noticed that if I actually nest the nested_f inside the main_f like this:

def main_f(x):
    def nested_f(x):
        return x + 1

    return nested_f(x) + 1

everything runs OK. However, my goal here is to have the logic nicely separated in multiple functions, which I can also test individually.

I think this can be solved by submitting the udfs.py file (or a whole zipped folder) to the executors using spark.sparkContext.addPyFile('...udfs.py'). However:

  1. I find this a bit long-winded (esp. if you need to zip folders etc...)
  2. This is not always easy/possible (e.g. udfs.py may be using lots of other modules which then also need to be submitted, leading to bit of chain reaction...)
  3. There are some other inconveniences with addPyFile (e.g. autoreload can stop working etc )

So the question is: is there a way to do all of these at the same time:

  • have the logic of the UDF nicely split to several Python functions
  • use the UDF from a different file than where the logic is defined
  • not needing to submit any dependencies using addPyFile

Bonus points for clarifying how this works/why this doesn't work!

解决方案

For small (one or two local files) dependencies you can use --py-files and enumerate them, with something bigger or more dependencies - it's better to pack it in a zip or egg file.

File udfs.py:

def my_function(*args, **kwargs):
    # code

File main.py:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function

sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)

df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))

For run:

pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py

If you have written your own Python module or even third-party modules (which don't need C compilation), I personally needed it with geoip2, it's better to create a zip or egg file.

# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src

# Best is 
pip install -r requirements.txt -t ./src

# If you need add some additionals files
cp ./some_scripts/* ./src/

# And pack it
cd ./src
zip -r ../libs.zip .
cd ..

pyspark --py-files libs.zip
spark-submit --py-files libs.zip

Be careful when using pyspark --master yarn (possibly with other non-local master options), in pyspark shell with --py-files:

>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip')  # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule  # libs.zip/MyModule

EDIT - The answer on question of how to get functions on executors without addPyFile () and --py-files:

It is necessary to have a given file with functions on individual executors. And reachable through PATH env. Therefore, I would probably write a Python Module, which I then install on the executors and was available in the environment.

这篇关于从 Pyspark UDF 调用另一个自定义 Python 函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆