Pyspark 数据帧中的 Timedelta - TypeError [英] Timedelta in Pyspark Dataframes - TypeError

查看:24
本文介绍了Pyspark 数据帧中的 Timedelta - TypeError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 pyspark 2.3.1 开发 Spark 2.3、Python 3.6

I am working on Spark 2.3, Python 3.6 with pyspark 2.3.1

我有一个 Spark DataFrame,其中每个条目都是一个工作步骤,我想将一些行合并到一个工作会话中.这应该在下面的函数 getSessions 中完成.我相信它有效.

I have a Spark DataFrame where each entry is a workstep, and I want to get some rows together into a work session. This should be done in the below function getSessions. I believe it works.

我进一步创建了一个包含我想要的所有信息的 RDD - 每个条目都是一个带有所需列的 Row 对象,看起来类型很好(一些数据被伪装):

I further create an RDD that contains all information that I want - each entry is a Row object with the desired columns, it looks like the types are fine (some data disguised):

rddSessions_flattened.take(1)

# [Row(counter=1, end=datetime.datetime(2017, 11, 6, 9, 15, 20), end_id=2758327, no_of_entries=5, shortID=u'DISGUISED', start=datetime.datetime(2017, 11, 6, 9, 13, 59), start_id=INTEGERNUMBER, strDuration='0:01:21', tNumber=u'DISGUISED', timeDuration=datetime.timedelta(0, 81))]

如果我现在想让一个 DataFrame 成为我们的 RDD,我会得到一个 TypeError.

If I now want to make a DataFrame our of the RDD, I get a TypeError.

df = rddSessions_flattened.toDF()

df.show()

# TypeError: not supported type: type 'datetime.timedelta'

(最后贴出完整的错误信息)

(the full error message posted at the very end)

有没有想过哪里出了问题以及如何解决这个问题?

Any thoughts what went wrong and how to fix this?

  • 我基本上依靠 Spark 来推断架构;我认为这是因为 spark.sql.types 模块有一个类 TimestampType
  • 我将如何以编程方式定义它?我不清楚 Apache Spark 编程指南.

欣赏你的想法!

def getSessions(values, threshold=threshold):
"""
Create sessions for one person on one case

Arguments:
    values:      sorted list of tuples (datetime, id)
    threshold:   time delta object; max time of a session

Return:
    sessions:    list of sessions (SData)
"""

SData = Row(
    'counter'
    , 'start'
    , 'start_id'
    , 'end'
    , 'end_id'
    , 'strDuration'
    , 'timeDuration'
    , 'no_of_entries'
)

counter = 1
no_of_rows = 1
sessions = []   # list of sessions

session_start = values[0][0]   # first entry of the first tuple in the list
start_row_id = values[0][1]
session_end = session_start
end_row_id = start_row_id

for row_time, row_id in values[1:]:
    if row_time - session_start > threshold:
        # new session found, so append previous session
        sessions.append(SData(
            counter
            , session_start
            , start_row_id
            , session_end
            , end_row_id
            , str(session_end - session_start)
            , session_end - session_start
            , no_of_rows
            )
        )         
        # get the information for the next session
        counter += 1
        no_of_rows = 1
        session_start = row_time
        start_row_id = row_id
    else:
        no_of_rows +=1

    # regardless if new session or not: session_end reset to current entry
    session_end = row_time
    end_row_id = row_id            

# very last session has to be recorded as there is no "next" row
sessions.append(SData(
    counter
    , session_start
    , start_row_id
    , session_end
    , end_row_id
    , str(session_end - session_start)
    , session_end - session_start
    , no_of_rows
    )
)
return sessions

<小时>

完整的错误信息:


Full error message:

not supported type: <type 'datetime.timedelta'>
Traceback (most recent call last):
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 58, in toDF
    return sparkSession.createDataFrame(self, schema, sampleRatio)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 687, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 384, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio, names=schema)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/session.py", line 364, in _inferSchema
    schema = _infer_schema(first, names=names)
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1096, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/app/cdh/lib/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1070, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'datetime.timedelta'>

推荐答案

TimestampTypepd.timedelta 不同.前者类似于 pd.timestamp 第二种在 spark 中有一个类似的类型,它是 CalendarIntervalType 但看起来自动推理对你不起作用,它不是微不足道的与.如果可能,我建议您将 pd.timedelta 转换为秒或毫秒,现在具有(秒或毫秒)的 整数,并在应用程序下游使用它,因为它更容易使用并且仍然代表您选择的单位的时间间隔.

TimestampType is not the same as pd.timedelta. The former is analogous to pd.timestamp The second have a analogous type in spark which is CalendarIntervalType but looks like the automatic inference does no work for you and it is not trivial to work with. I would recommend, if possible, you to convert your pd.timedelta to seconds or milliseconds having now an integer of (seconds or milliseconds) and work with it downstream in application, as it is but easier to work with and still represents a time interval in your chosen units.

这篇关于Pyspark 数据帧中的 Timedelta - TypeError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆