Pyspark 数据帧中的 Timedelta - TypeError [英] Timedelta in Pyspark Dataframes - TypeError
问题描述
我正在使用 pyspark 2.3.1 开发 Spark 2.3、Python 3.6
I am working on Spark 2.3, Python 3.6 with pyspark 2.3.1
我有一个 Spark DataFrame,其中每个条目都是一个工作步骤,我想将一些行合并到一个工作会话中.这应该在下面的函数 getSessions
中完成.我相信它有效.
I have a Spark DataFrame where each entry is a workstep, and I want to get some rows together into a work session. This should be done in the below function getSessions
. I believe it works.
我进一步创建了一个包含我想要的所有信息的 RDD - 每个条目都是一个带有所需列的 Row 对象,看起来类型很好(一些数据被伪装):
I further create an RDD that contains all information that I want - each entry is a Row object with the desired columns, it looks like the types are fine (some data disguised):
rddSessions_flattened.take(1)
# [Row(counter=1, end=datetime.datetime(2017, 11, 6, 9, 15, 20), end_id=2758327, no_of_entries=5, shortID=u'DISGUISED', start=datetime.datetime(2017, 11, 6, 9, 13, 59), start_id=INTEGERNUMBER, strDuration='0:01:21', tNumber=u'DISGUISED', timeDuration=datetime.timedelta(0, 81))]
如果我现在想让一个 DataFrame 成为我们的 RDD,我会得到一个 TypeError.
If I now want to make a DataFrame our of the RDD, I get a TypeError.
df = rddSessions_flattened.toDF()
df.show()
# TypeError: not supported type: type 'datetime.timedelta'
(最后贴出完整的错误信息)
(the full error message posted at the very end)
有没有想过哪里出了问题以及如何解决这个问题?
Any thoughts what went wrong and how to fix this?
- 我基本上依靠 Spark 来推断架构;我认为这是因为 spark.sql.types 模块有一个类 TimestampType
- 我将如何以编程方式定义它?我不清楚 Apache Spark 编程指南.
欣赏你的想法!
def getSessions(values, threshold=threshold):
"""
Create sessions for one person on one case
Arguments:
values: sorted list of tuples (datetime, id)
threshold: time delta object; max time of a session
Return:
sessions: list of sessions (SData)
"""
SData = Row(
'counter'
, 'start'
, 'start_id'
, 'end'
, 'end_id'
, 'strDuration'
, 'timeDuration'
, 'no_of_entries'
)
counter = 1
no_of_rows = 1
sessions = [] # list of sessions
session_start = values[0][0] # first entry of the first tuple in the list
start_row_id = values[0][1]
session_end = session_start
end_row_id = start_row_id
for row_time, row_id in values[1:]:
if row_time - session_start > threshold:
# new session found, so append previous session
sessions.append(SData(
counter
, session_start
, start_row_id
, session_end
, end_row_id
, str(session_end - session_start)
, session_end - session_start
, no_of_rows
)
)
# get the information for the next session
counter += 1
no_of_rows = 1
session_start = row_time
start_row_id = row_id
else:
no_of_rows +=1
# regardless if new session or not: session_end reset to current entry
session_end = row_time
end_row_id = row_id
# very last session has to be recorded as there is no "next" row
sessions.append(SData(
counter
, session_start
, start_row_id
, session_end
, end_row_id
, str(session_end - session_start)
, session_end - session_start
, no_of_rows
)
)
return sessions
<小时>
完整的错误信息:
Full error message:
not supported type: <type 'datetime.timedelta'>
Traceback (most recent call last):
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 58, in toDF
return sparkSession.createDataFrame(self, schema, sampleRatio)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 687, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 384, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio, names=schema)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 364, in _inferSchema
schema = _infer_schema(first, names=names)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1096, in _infer_schema
fields = [StructField(k, _infer_type(v), True) for k, v in items]
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1070, in _infer_type
raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'datetime.timedelta'>
推荐答案
TimestampType
与 pd.timedelta
不同.前者类似于 pd.timestamp
第二种在 spark 中有一个类似的类型,它是 CalendarIntervalType
但看起来自动推理对你不起作用,它不是微不足道的与.如果可能,我建议您将 pd.timedelta
转换为秒或毫秒,现在具有(秒或毫秒)的 整数
,并在应用程序下游使用它,因为它更容易使用并且仍然代表您选择的单位的时间间隔.
TimestampType
is not the same as pd.timedelta
. The former is analogous to pd.timestamp
The second have a analogous type in spark which is CalendarIntervalType
but looks like the automatic inference does no work for you and it is not trivial to work with.
I would recommend, if possible, you to convert your pd.timedelta
to seconds or milliseconds having now an integer
of (seconds or milliseconds) and work with it downstream in application, as it is but easier to work with and still represents a time interval in your chosen units.
这篇关于Pyspark 数据帧中的 Timedelta - TypeError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!