在 pyspark UDF 中使用类方法 [英] Using a class method inside of a pyspark UDF

查看:33
本文介绍了在 pyspark UDF 中使用类方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

数据工程师您好!

我正在尝试使用名为 星界

I am trying to write a pyspark udf using a method from a class called Astral

这里是udf:

def time_from_solar_noon(d, y):
    noon = astral.Astral().solar_noon_utc
    time = noon(d, y)
    return time 

solarNoon = F.udf(lambda d, y: time_from_solar_noon(d,y), TimestampType())

按照我的理解,该类将针对数据框中的每一行进行实例化,从而导致工作非常缓慢.

Now the way I understand it, the class will be instantiated for every single line in my dataframe, resulting in a very slow job.

如果我从我的函数中取出类实例化:

If I take the class instantiation out of my function :

noon = astral.Astral().solar_noon_utc

def time_from_solar_noon(d, y):
    time = noon(d, y)
    return time 

我收到以下错误消息:

  [Previous line repeated 326 more times]
    RecursionError: maximum recursion depth exceeded while calling a Python object

所以这是我的问题,我认为应该可以通过执行程序/线程至少有一个类实例化,而不是在我的数据框中逐行实例化,我该怎么做?

So here is my question, I think it should be possible to have at least one class instantiation by executor/thread, instead of one by line in my dataframe, how would I do that ?

感谢您的帮助

推荐答案

就像数据库连接一样,您可以使用 mapPartitions 来实例化有限数量的这些类实例:

Just like with database connections, you can instantiate only a limited number of these class instances, by using mapPartitions:

In [1]: from datetime import date
   ...: from astral import Astral
   ...: 
   ...: df = spark.createDataFrame(
   ...:     ((date(2019, 10, 4), 0),
   ...:      (date(2019, 10, 4), 19)),
   ...:     schema=("date", "longitude"))
   ...: 
   ...: 
   ...: def solar_noon(rows):
   ...:     a = Astral()  # initialize the class once per partition
   ...:     return ((a.solar_noon_utc(date=r.date, longitude=r.longitude), *r)
   ...:             for r in rows)  # reuses the same Astral instance for all rows in this partition
   ...: 
   ...: 
   ...: (df.rdd
   ...:  .mapPartitions(solar_noon)
   ...:  .toDF(schema=("solar_noon_utc", *df.columns))
   ...:  .show()
   ...:  )
   ...: 
   ...:  
+-------------------+----------+---------+                                      
|     solar_noon_utc|      date|longitude|
+-------------------+----------+---------+
|2019-10-04 13:48:58|2019-10-04|        0|
|2019-10-04 12:32:58|2019-10-04|       19|
+-------------------+----------+---------+

这是相当有效的,因为函数 (solar_noon) 被赋予每个工人,并且每个分区只初始化一次类,它可以容纳多行.

This is fairly efficient, as the function (solar_noon) is given to each worker and the class is only initialized once per partition, which can hold many rows.

这篇关于在 pyspark UDF 中使用类方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆