在pyspark UDF中使用广播的数据帧 [英] Using broadcasted dataframe in pyspark UDF

查看:81
本文介绍了在pyspark UDF中使用广播的数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否有可能在pyspark SQl应用程序的UDF中使用广播的数据帧.

Is it possible to use a broadcasted data frame in the UDF of a pyspark SQl application.

我的代码在pyspark数据帧内调用广播的数据帧,如下所示.

My Code calls the broadcasted Dataframe inside a pyspark dataframe like below.

fact_ent_df_data = 
       sparkSession.sparkContext.broadcast(fact_ent_df.collect()) 
def generate_lookup_code(col1,col2,col3): 
     fact_ent_df_count=fact_ent_df_data.
     select(fact_ent_df_br.TheDate.between(col1,col2),
                  fact_ent_df_br.Ent.isin('col3')).count() 
     return fact_ent_df_count 
sparkSession.udf.register("generate_lookup_code" , generate_lookup_code ) 
sparkSession.sql('select sample4,generate_lookup_code(sample1,sample2,sample 3) as count_hol from table_t') 

当我使用广播的df_bc时,在赋值错误之前我正在使用局部变量.任何帮助表示赞赏我得到的错误是

I am getting local variable used before assignment error when i use the broadcasted df_bc. Any help is appreciated And the Error i am getting is

Traceback (most recent call last):
  File "C:/Users/Vignesh/PycharmProjects/gettingstarted/aramex_transit/spark_driver.py", line 46, in <module>
    sparkSession.udf.register("generate_lookup_code" , generate_lookup_code )
  File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\sql\udf.py", line 323, in register
    self.sparkSession._jsparkSession.udf().registerPython(name, register_udf._judf)
  File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\sql\udf.py", line 148, in _judf
    self._judf_placeholder = self._create_judf()
  File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\sql\udf.py", line 157, in _create_judf
    wrapped_func = _wrap_function(sc, self.func, self.returnType)
  File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\sql\udf.py", line 33, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\rdd.py", line 2391, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\serializers.py", line 575, in dumps
    return cloudpickle.dumps(obj, 2)
  File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\cloudpickle.py", line 918, in dumps
    cp.dump(obj)
  File "D:\spark-2.3.2-bin-hadoop2.6\spark-2.3.2-bin-hadoop2.6\python\pyspark\cloudpickle.py", line 249, in dump
    raise pickle.PicklingError(msg)
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o24.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

推荐答案

将Spark Broadcast变量考虑为Python简单数据类型,例如 list ,因此问题在于如何将变量传递给 UDF 函数.这是一个例子:假设我们有一个年龄列表 d 和一个带有列 name age 的数据框.因此,我们要检查每个人的年龄是否在年龄列表中.

Think about Spark Broadcast variable as a Python simple data type like list, So the problem is how to pass a variable to the UDF functions. Here is an example: Suppose we have ages list d and a data frame with columns name and age. So we want to check if the age of each person is in ages list.

from pyspark.sql.functions import udf, col

l = [13, 21, 34] # ages list
d = [('Alice', 10), ('bob', 21)] # data frame rows

rdd = sc.parallelize(l)
b_rdd = sc.broadcast(rdd.collect()) # define broadcast variable
df = spark.createDataFrame(d , ["name", "age"])

def check_age (age, age_list):
    if age in l:
        return "true"
    return "false"
def udf_check_age(age_list):
    return udf(lambda x : check_age(x, age_list))

df.withColumn("is_age_in_list", udf_check_age(b_rdd.value)(col("age"))).show()

输出:

+-----+---+--------------+
| name|age|is_age_in_list|
+-----+---+--------------+
|Alice| 10|         false|
|  bob| 21|          true|
+-----+---+--------------+

这篇关于在pyspark UDF中使用广播的数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆