从Pyspark UDF调用另一个自定义Python函数 [英] Calling another custom Python function from Pyspark UDF

查看:51
本文介绍了从Pyspark UDF调用另一个自定义Python函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设您有一个文件,我们称它为 udfs.py 并在其中:

  def nested_f(x):返回x + 1def main_f(x):返回nested_f(x)+ 1 

然后,您要使用 main_f 函数制作UDF并在数据帧上运行它:

 将pyspark.sql.functions导入为fn将熊猫作为pd导入pdf = pd.DataFrame([[1],[2],[3]],列= ['x'])df = spark.createDataFrame(pdf)_udf = fn.udf(main_f,'int')df.withColumn('x1',_udf(df ['x'])).show() 

如果我们从定义两个函数的相同文件( udfs.py )中执行此操作,则此方法正常.但是,尝试从另一个文件(例如 main.py )执行此操作会产生错误 ModuleNotFoundError:没有名为... 的模块:

 <代码> ...导入udfs_udf = fn.udf(udfs.main_f,'int')df.withColumn('x1',_udf(df ['x'])).show() 

我注意到,如果我将 nested_f 嵌套在 main_f 内,就像这样:

  def main_f(x):def nested_f(x):返回x + 1返回nested_f(x)+ 1 

一切正常.但是,我的目标是将逻辑很好地分离成多个功能,也可以分别进行测试.

认为,这可以通过使用 spark.sparkContext.addPyFile()将 udfs.py 文件(或整个压缩文件夹)提交给执行者来解决.'... udfs.py').但是:

  1. 我觉得这有点冗长(尤其是如果您需要压缩文件夹等...)
  2. 这并不总是容易/可能的(例如, udfs.py 可能正在使用许多其他模块,然后还需要提交其他模块,从而导致一些连锁反应...)
  3. addPyFile 还有其他一些不便之处(例如,解决方案

对于较小的(一个或两个本地文件)依赖项,可以使用--py-files并枚举它们,并具有更大或更多的依赖项-最好打包并以zip或egg文件格式保存.

文件 udfs.py :

  def my_function(* args,** kwargs):# 代码 

文件 main.py :

从pyspark导入

 导入SparkContext从pyspark.sql导入SparkSession从pyspark.sql.functions导入udf从udfs导入my_functionsc = SparkContext()spark = SparkSession(sc)my_udf = udf(my_function)df = spark.createDataFrame([(1,"a"),(2,"b")])df.withColumn("my_f",my_udf("...")) 

运行:

  pyspark --py-files/path/to/udfs.py# 或者spark-submit --py文件/path/to/udfs.py main.py 

如果您编写了自己的Python模块或什至是第三方模块(不需要C编译),则我个人需要使用 geoip2 ,最好创建一个zip或egg文件

 #pip -t将所有模块和依赖项安装在目录src中pip安装geoip2 -t ./src#或从本地目录pip安装./my_module -t ./src#最好是pip install -r requirements.txt -t ./src#如果需要添加一些其他文件cp ./some_scripts/* ./src/#打包光盘./srczip -r ../libs.zip.光盘..pyspark --py文件libs.zipspark-submit --py-files libs.zip 

-py-files :

 >>>导入系统>>>sys.path.insert(0,'/path/to/libs.zip')#您可以使用相对路径:.insert(0,'libs.zip')>>>导入MyModule#libs.zip/MyModule 

编辑-关于如何在没有 addPyFile()-py-files 的执行器上获取功能的问题的答案:

有一个给定的文件,该文件具有在单个执行程序上的功能.并可以通过PATH env到达.因此,我可能会编写一个Python模块,然后将其安装在执行程序上并在环境中可用.

Suppose you have a file, let's call it udfs.py and in it:

def nested_f(x):
    return x + 1

def main_f(x):
    return nested_f(x) + 1

You then want to make a UDF out of the main_f function and run it on a dataframe:

import pyspark.sql.functions as fn
import pandas as pd

pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)

_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

This works OK if we do this from within the same file as where the two functions are defined (udfs.py). However, trying to do this from a different file (say main.py) produces an error ModuleNotFoundError: No module named ...:

...
import udfs

_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

I noticed that if I actually nest the nested_f inside the main_f like this:

def main_f(x):
    def nested_f(x):
        return x + 1

    return nested_f(x) + 1

everything runs OK. However, my goal here is to have the logic nicely separated in multiple functions, which I can also test individually.

I think this can be solved by submitting the udfs.py file (or a whole zipped folder) to the executors using spark.sparkContext.addPyFile('...udfs.py'). However:

  1. I find this a bit long-winded (esp. if you need to zip folders etc...)
  2. This is not always easy/possible (e.g. udfs.py may be using lots of other modules which then also need to be submitted, leading to bit of chain reaction...)
  3. There are some other inconveniences with addPyFile (e.g. autoreload can stop working etc )

So the question is: is there a way to do all of these at the same time:

  • have the logic of the UDF nicely split to several Python functions
  • use the UDF from a different file than where the logic is defined
  • not needing to submit any dependencies using addPyFile

Bonus points for clarifying how this works/why this doesn't work!

解决方案

For small (one or two local files) dependencies you can use --py-files and enumerate them, with something bigger or more dependencies - it's better to pack it in a zip or egg file.

File udfs.py:

def my_function(*args, **kwargs):
    # code

File main.py:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function

sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)

df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))

For run:

pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py

If you have written your own Python module or even third-party modules (which don't need C compilation), I personally needed it with geoip2, it's better to create a zip or egg file.

# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src

# Best is 
pip install -r requirements.txt -t ./src

# If you need add some additionals files
cp ./some_scripts/* ./src/

# And pack it
cd ./src
zip -r ../libs.zip .
cd ..

pyspark --py-files libs.zip
spark-submit --py-files libs.zip

Be careful when using pyspark --master yarn (possibly with other non-local master options), in pyspark shell with --py-files:

>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip')  # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule  # libs.zip/MyModule

EDIT - The answer on question of how to get functions on executors without addPyFile () and --py-files:

It is necessary to have a given file with functions on individual executors. And reachable through PATH env. Therefore, I would probably write a Python Module, which I then install on the executors and was available in the environment.

这篇关于从Pyspark UDF调用另一个自定义Python函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆