如何计算Pyspark 2.2.0中不包括周末的日期之间的差异 [英] How to calculate difference between dates excluding weekends in Pyspark 2.2.0

查看:122
本文介绍了如何计算Pyspark 2.2.0中不包括周末的日期之间的差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有下面的pyspark df,可以通过代码重新创建

I have the below pyspark df which can be recreated by the code

df = spark.createDataFrame([(1, "John Doe", "2020-11-30"),(2, "John Doe", "2020-11-27"),(3, "John Doe", "2020-11-29")],
                            ("id", "name", "date")) 

   +---+--------+----------+
| id|    name|      date|
+---+--------+----------+
|  1|John Doe|2020-11-30|
|  2|John Doe|2020-11-27|
|  3|John Doe|2020-11-29|
+---+--------+----------+

我正在寻找创建一个udf来计算两行日期之间的差异(使用滞后功能)(不包括周末),因为pyspark 2.2.0没有内置的功能.例如.2020-11-30和之间的区别2020-11-27应该像周一和周一一样给1分别是星期五.

I am looking to create a udf to calculate difference between 2 rows of dates (using Lag function) excluding weekends as pyspark 2.2.0 does not has an in-built function to do so. eg. the difference between 2020-11-30 & 2020-11-27 should give 1 as they are Monday & Friday respectively.

我尝试在的帮助下创建以下内容python中不包括周末的两个日期之间的差额:

from pyspark.sql.functions import udf
import numpy as np
workdaUDF = udf(lambda z: workdays(z),IntegerType())
def workdays():
date1 = df.select(F.col('date')).collect()[1][0]
date2 = df.select(F.col('date')).collect()[0][0]
date_diff = np.busday_count(date1,date2)
return date_diff

df.withColumn("date_dif",workdaysUDF(F.col("date"))).show(truncate=False)

但是我收到以下错误

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

任何有关如何在数据框的每一行上执行此操作的帮助都将非常有用.

Any help on how I can make this work on each row of my dataframe would be really helpful.

PS:根据要应用该函数的日期的值,我的date1和date2变量需要是动态的.另外,由于数据框的大小,我不能使用找到了多个解决方案的熊猫.

PS : My date1 and date2 variables need to be dynamic depending on the value of the date on which the function is being applied to. Also, due to the dataframe size, I cannot use pandas for which I found multiple solutions.

谢谢.

推荐答案

您不能在UDF中调用 collect .您只能将列传递到UDF,因此应将日期列和 lag 日期列传递,如下所示:

You can't call collect in the UDF. You can only pass in columns to the UDF, so you should pass in the date column and the lag date column, as shown below:

import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

df = spark.createDataFrame([
    (1, "John Doe", "2020-11-30"),
    (2, "John Doe", "2020-11-27"),
    (3, "John Doe", "2020-11-29")],
    ("id", "name", "date")
) 

workdaysUDF = F.udf(lambda date1, date2: int(np.busday_count(date2, date1)) if (date1 is not None and date2 is not None) else None, IntegerType())
df = df.withColumn("date_dif", workdaysUDF(F.col('date'), F.lag(F.col('date')).over(Window.partitionBy('name').orderBy('id'))))
df.show()

+---+--------+----------+--------+
| id|    name|      date|date_dif|
+---+--------+----------+--------+
|  1|John Doe|2020-11-30|    null|
|  2|John Doe|2020-11-27|      -1|
|  3|John Doe|2020-11-29|       1|
+---+--------+----------+--------+

这篇关于如何计算Pyspark 2.2.0中不包括周末的日期之间的差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆