Pyspark数据帧中的Timedelta-TypeError [英] Timedelta in Pyspark Dataframes - TypeError
问题描述
我正在使用pyspark 2.3.1开发Spark 2.3,Python 3.6
I am working on Spark 2.3, Python 3.6 with pyspark 2.3.1
我有一个Spark DataFrame,其中的每个条目都是一个工作步骤,我想将一些行放在一起进入一个工作会话.这应该在下面的功能getSessions
中完成.我相信它有效.
I have a Spark DataFrame where each entry is a workstep, and I want to get some rows together into a work session. This should be done in the below function getSessions
. I believe it works.
我进一步创建了一个RDD,其中包含我想要的所有信息-每个条目都是带有所需列的Row对象,看起来类型很好(某些数据被伪装了):
I further create an RDD that contains all information that I want - each entry is a Row object with the desired columns, it looks like the types are fine (some data disguised):
rddSessions_flattened.take(1)
# [Row(counter=1, end=datetime.datetime(2017, 11, 6, 9, 15, 20), end_id=2758327, no_of_entries=5, shortID=u'DISGUISED', start=datetime.datetime(2017, 11, 6, 9, 13, 59), start_id=INTEGERNUMBER, strDuration='0:01:21', tNumber=u'DISGUISED', timeDuration=datetime.timedelta(0, 81))]
如果现在我想将DataFrame用作RDD,则会收到TypeError.
If I now want to make a DataFrame our of the RDD, I get a TypeError.
df = rddSessions_flattened.toDF()
df.show()
# TypeError: not supported type: type 'datetime.timedelta'
(完整的错误消息张贴在最后)
(the full error message posted at the very end)
有什么想法出了什么问题以及如何解决这个问题?
Any thoughts what went wrong and how to fix this?
- 我基本上是依靠Spark来推断模式的;我认为这可以作为spark.sql.types模块具有TimestampType类
- 我如何以编程方式定义它?我不清楚Apache Spark编程指南.
欣赏您的想法!
def getSessions(values, threshold=threshold):
"""
Create sessions for one person on one case
Arguments:
values: sorted list of tuples (datetime, id)
threshold: time delta object; max time of a session
Return:
sessions: list of sessions (SData)
"""
SData = Row(
'counter'
, 'start'
, 'start_id'
, 'end'
, 'end_id'
, 'strDuration'
, 'timeDuration'
, 'no_of_entries'
)
counter = 1
no_of_rows = 1
sessions = [] # list of sessions
session_start = values[0][0] # first entry of the first tuple in the list
start_row_id = values[0][1]
session_end = session_start
end_row_id = start_row_id
for row_time, row_id in values[1:]:
if row_time - session_start > threshold:
# new session found, so append previous session
sessions.append(SData(
counter
, session_start
, start_row_id
, session_end
, end_row_id
, str(session_end - session_start)
, session_end - session_start
, no_of_rows
)
)
# get the information for the next session
counter += 1
no_of_rows = 1
session_start = row_time
start_row_id = row_id
else:
no_of_rows +=1
# regardless if new session or not: session_end reset to current entry
session_end = row_time
end_row_id = row_id
# very last session has to be recorded as there is no "next" row
sessions.append(SData(
counter
, session_start
, start_row_id
, session_end
, end_row_id
, str(session_end - session_start)
, session_end - session_start
, no_of_rows
)
)
return sessions
完整错误消息:
Full error message:
not supported type: <type 'datetime.timedelta'>
Traceback (most recent call last):
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 58, in toDF
return sparkSession.createDataFrame(self, schema, sampleRatio)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 687, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 384, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio, names=schema)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 364, in _inferSchema
schema = _infer_schema(first, names=names)
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1096, in _infer_schema
fields = [StructField(k, _infer_type(v), True) for k, v in items]
File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1070, in _infer_type
raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'datetime.timedelta'>
推荐答案
TimestampType
与pd.timedelta
不同.前者类似于pd.timestamp
,第二种在spark中具有类似的类型,即CalendarIntervalType
,但看起来自动推断对您不起作用,并且使用起来也不是一件容易的事.
如果可能,我建议您将pd.timedelta
转换为秒或毫秒,现在integer
为(秒或毫秒),并在下游应用程序中使用它,因为它更容易使用,但仍代表时间间隔(以您选择的单位为单位).
TimestampType
is not the same as pd.timedelta
. The former is analogous to pd.timestamp
The second have a analogous type in spark which is CalendarIntervalType
but looks like the automatic inference does no work for you and it is not trivial to work with.
I would recommend, if possible, you to convert your pd.timedelta
to seconds or milliseconds having now an integer
of (seconds or milliseconds) and work with it downstream in application, as it is but easier to work with and still represents a time interval in your chosen units.
这篇关于Pyspark数据帧中的Timedelta-TypeError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!