PySpark 2.1:带有UDF的导入模块破坏了Hive连接 [英] PySpark 2.1: Importing module with UDF's breaks Hive connectivity
问题描述
我目前正在使用Spark 2.1,并且具有一个主脚本,该主脚本调用包含我所有转换方法的帮助程序模块.换句话说:
I'm currently working with Spark 2.1 and have a main script that calls a helper module that contains all my transformation methods. In other words:
main.py
helper.py
在我的helper.py
文件顶部,有几个自定义UDF,这些自定义UDF的定义方式如下:
At the top of my helper.py
file I have several custom UDFs that I have defined in the following manner:
def reformat(s):
return reformat_logic(s)
reformat_udf = udf(reformat, StringType())
在将所有UDF分解为帮助文件之前,我可以使用spark.sql('sql statement')
通过我的SparkSession对象连接到Hive元存储.但是,在将UDF移到帮助程序文件并将该文件导入主脚本顶部之后,SparkSession对象将无法再连接到Hive并返回到默认的Derby数据库.尝试查询Hive表(例如Hive support is required to insert into the following tables...
Before I broke off all the UDFs into the helper file, I was able to connect to my Hive metastore through my SparkSession object using spark.sql('sql statement')
. However, after I moved the UDFs to the helper file and imported that file at the top of my main script, the SparkSession object could no longer connect to Hive and went back to the default Derby database. I also get errors when trying to query my Hive tables such as Hive support is required to insert into the following tables...
通过将UDF移到一个完全独立的文件中,并且仅在需要它们的函数中运行该模块的import语句,我就能够解决我的问题(不确定这是否是一种好习惯,但是它可以工作).无论如何,有人知道我为什么在Spark和UDF时会看到这种奇怪的行为吗?有人知道跨应用程序共享UDF的好方法吗?
I've been able to solve my issue by moving my UDFs into a completely separate file and only running the import statements for that module inside the functions that need them (not sure if this is good practice, but it works). Anyway, does anyone understand why I'm seeing such peculiar behavior when it comes to Spark and UDFs? And does anyone know a good way to share UDFs across applications?
推荐答案
Spark 2.2.0之前的版本UserDefinedFunction
急切创建UserDefinedPythonFunction
对象,该对象表示JVM上的Python UDF.此过程需要访问SparkContext
和SparkSession
.如果在调用UserDefinedFunction.__init__
时没有活动实例,Spark将为您自动初始化上下文.
Prior to Spark 2.2.0 UserDefinedFunction
eagerly creates UserDefinedPythonFunction
object, which represents Python UDF on JVM. This process requires access to SparkContext
and SparkSession
. If there are no active instances when UserDefinedFunction.__init__
is called, Spark will automatically initialize the contexts for you.
在导入UserDefinedFunction
对象后调用SparkSession.Builder.getOrCreate
时,它将返回现有的SparkSession
实例,并且只能应用某些配置更改(enableHiveSupport
不在其中).
When you call SparkSession.Builder.getOrCreate
after importing UserDefinedFunction
object it returns existing SparkSession
instance and only some configuration changes can be applied (enableHiveSupport
is not among these).
要解决此问题,您应该在导入UDF之前初始化SparkSession
:
To address this problem you should initialize SparkSession
before you import UDF:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
from helper import reformat_udf
此行为在 SPARK-19163 中进行了描述,并已在Spark 2.2中修复.0. API的其他改进包括装饰器语法( SPARK-19160 )和改进的文档字符串处理( SPARK-19161 ).
This behavior is described in SPARK-19163 and fixed in Spark 2.2.0. Other API improvements include decorator syntax (SPARK-19160) and improved docstrings handling (SPARK-19161).
这篇关于PySpark 2.1:带有UDF的导入模块破坏了Hive连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!