Pyspark数据帧中的Timedelta-TypeError [英] Timedelta in Pyspark Dataframes - TypeError

查看:157
本文介绍了Pyspark数据帧中的Timedelta-TypeError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用pyspark 2.3.1开发Spark 2.3,Python 3.6

I am working on Spark 2.3, Python 3.6 with pyspark 2.3.1

我有一个Spark DataFrame,其中的每个条目都是一个工作步骤,我想将一些行放在一起进入一个工作会话.这应该在下面的功能getSessions中完成.我相信它有效.

I have a Spark DataFrame where each entry is a workstep, and I want to get some rows together into a work session. This should be done in the below function getSessions. I believe it works.

我进一步创建了一个RDD,其中包含我想要的所有信息-每个条目都是带有所需列的Row对象,看起来类型很好(某些数据被伪装了):

I further create an RDD that contains all information that I want - each entry is a Row object with the desired columns, it looks like the types are fine (some data disguised):

rddSessions_flattened.take(1)

# [Row(counter=1, end=datetime.datetime(2017, 11, 6, 9, 15, 20), end_id=2758327, no_of_entries=5, shortID=u'DISGUISED', start=datetime.datetime(2017, 11, 6, 9, 13, 59), start_id=INTEGERNUMBER, strDuration='0:01:21', tNumber=u'DISGUISED', timeDuration=datetime.timedelta(0, 81))]

如果现在我想将DataFrame用作RDD,则会收到TypeError.

If I now want to make a DataFrame our of the RDD, I get a TypeError.

df = rddSessions_flattened.toDF()

df.show()

# TypeError: not supported type: type 'datetime.timedelta'

(完整的错误消息张贴在最后)

(the full error message posted at the very end)

有什么想法出了什么问题以及如何解决这个问题?

Any thoughts what went wrong and how to fix this?

  • 我基本上是依靠Spark来推断模式的;我认为这可以作为spark.sql.types模块具有TimestampType类
  • 我如何以编程方式定义它?我不清楚Apache Spark编程指南.

欣赏您的想法!

def getSessions(values, threshold=threshold):
"""
Create sessions for one person on one case

Arguments:
    values:      sorted list of tuples (datetime, id)
    threshold:   time delta object; max time of a session

Return:
    sessions:    list of sessions (SData)
"""

SData = Row(
    'counter'
    , 'start'
    , 'start_id'
    , 'end'
    , 'end_id'
    , 'strDuration'
    , 'timeDuration'
    , 'no_of_entries'
)

counter = 1
no_of_rows = 1
sessions = []   # list of sessions

session_start = values[0][0]   # first entry of the first tuple in the list
start_row_id = values[0][1]
session_end = session_start
end_row_id = start_row_id

for row_time, row_id in values[1:]:
    if row_time - session_start > threshold:
        # new session found, so append previous session
        sessions.append(SData(
            counter
            , session_start
            , start_row_id
            , session_end
            , end_row_id
            , str(session_end - session_start)
            , session_end - session_start
            , no_of_rows
            )
        )         
        # get the information for the next session
        counter += 1
        no_of_rows = 1
        session_start = row_time
        start_row_id = row_id
    else:
        no_of_rows +=1

    # regardless if new session or not: session_end reset to current entry
    session_end = row_time
    end_row_id = row_id            

# very last session has to be recorded as there is no "next" row
sessions.append(SData(
    counter
    , session_start
    , start_row_id
    , session_end
    , end_row_id
    , str(session_end - session_start)
    , session_end - session_start
    , no_of_rows
    )
)
return sessions


完整错误消息:


Full error message:

not supported type: <type 'datetime.timedelta'>
Traceback (most recent call last):
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 58, in toDF
    return sparkSession.createDataFrame(self, schema, sampleRatio)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 687, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 384, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio, names=schema)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 364, in _inferSchema
    schema = _infer_schema(first, names=names)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1096, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1070, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'datetime.timedelta'>

推荐答案

TimestampTypepd.timedelta不同.前者类似于pd.timestamp,第二种在spark中具有类似的类型,即CalendarIntervalType,但看起来自动推断对您不起作用,并且使用起来也不是一件容易的事. 如果可能,我建议您将pd.timedelta转换为秒或毫秒,现在integer为(秒或毫秒),并在下游应用程序中使用它,因为它更容易使用,但仍代表时间间隔(以您选择的单位为单位).

TimestampType is not the same as pd.timedelta. The former is analogous to pd.timestamp The second have a analogous type in spark which is CalendarIntervalType but looks like the automatic inference does no work for you and it is not trivial to work with. I would recommend, if possible, you to convert your pd.timedelta to seconds or milliseconds having now an integer of (seconds or milliseconds) and work with it downstream in application, as it is but easier to work with and still represents a time interval in your chosen units.

这篇关于Pyspark数据帧中的Timedelta-TypeError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆