pyspark 更改日期时间列中的日期 [英] pyspark change day in datetime column

查看:40
本文介绍了pyspark 更改日期时间列中的日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此代码试图更改日期时间列的日期有什么问题

what is wrong with this code trying to change day of a datetime columns

import pyspark
import pyspark.sql.functions as sf
import pyspark.sql.types as sparktypes
import datetime

sc = pyspark.SparkContext(appName="test")
sqlcontext = pyspark.SQLContext(sc)

rdd = sc.parallelize([('a',datetime.datetime(2014, 1, 9, 0, 0)),
                      ('b',datetime.datetime(2014, 1, 27, 0, 0)),
                      ('c',datetime.datetime(2014, 1, 31, 0, 0))])
testdf = sqlcontext.createDataFrame(rdd, ["id", "date"])

print(testdf.show())
print(testdf.printSchema())

给出一个测试数据框:

+---+--------------------+
| id|                date|
+---+--------------------+
|  a|2014-01-09 00:00:...|
|  b|2014-01-27 00:00:...|
|  c|2014-01-31 00:00:...|
+---+--------------------+


root
 |-- id: string (nullable = true)
 |-- date: timestamp (nullable = true)

然后我定义一个 udf 来更改日期列的日期:

Then I define a udf to change day of date column:

def change_day_(date, day):
    return date.replace(day=day)

change_day = sf.udf(change_day_, sparktypes.TimestampType())
testdf.withColumn("PaidMonth", change_day(testdf.date, 1)).show(1)

这引发了一个错误:

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.lang.Integer]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

推荐答案

假设接收多个参数的 udf 接收多个.1"不是一列.

A udf which recieves multiple arguments is assumed to recieve multiple columns. The "1" is not a column.

这意味着您可以执行以下操作之一.要么按照评论中的建议将其设为一列:

This means you can do one of the following. Either make it a column as suggested in the comments:

testdf.withColumn("PaidMonth", change_day(testdf.date, lit(1))).show(1)

lit(1) 是一列

lit(1) is a column of ones

或者让原函数返回一个高阶函数:

or make the original function return a higher order function:

def change_day_(day):
    return lambda date: date.replace(day=day)

change_day = sf.udf(change_day_(1), sparktypes.TimestampType())
testdf.withColumn("PaidMonth", change_day(testdf.date)).show(1)

这基本上创建了一个替换为 1 的函数,因此可以接收一个整数.udf 将应用于单个列.

This basically creates a function which replaces with 1 and therefore can recieve an integer. The udf would apply on a single column.

这篇关于pyspark 更改日期时间列中的日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆