如何使用 PySpark 结构化流计算时间戳之间的差异 [英] How to compute difference between timestamps with PySpark Structured Streaming

查看:80
本文介绍了如何使用 PySpark 结构化流计算时间戳之间的差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在使用 PySpark 结构化流时遇到以下问题.

I have the following problem with PySpark Structured Streaming.

我的流数据中的每一行都有一个用户 ID 和一个时间戳.现在,对于每一行和每个用户,我想添加一个具有时间戳差异的列.

Every line in my stream data has a user ID and a timestamp. Now, for every line and for every user, I want to add a column with the difference of the timestamps.

例如,假设我收到的第一行内容是:用户 A,08:00:00".如果第二行显示用户 A,08:00:10",那么我想在第二行中添加一个名为间隔"的列,表示10 秒".

For example, suppose the first line that I receive says: "User A, 08:00:00". If the second line says "User A, 08:00:10" then I want to add a column in the second line called "Interval" saying "10 seconds".

有没有人知道如何实现这一目标?我尝试使用 Structured Streaming 文档的窗口函数示例,但没用.

Is there anyone who knows how to achieve this? I tried to use the window functions examples of the Structured Streaming documentation but it was useless.

非常感谢

推荐答案

既然我们在谈论结构化流,并且每一行和每个用户"告诉我你应该使用一个使用某种流式聚合(groupBygroupByKey)进行流式查询.

Since we're speaking about Structured Streaming and "every line and for every user" that tells me that you should use a streaming query with some sort of streaming aggregation (groupBy and groupByKey).

对于流聚合,您只能依赖结构化流中的微批处理流执行.这使得单个用户的记录可以是两个不同微批次的一部分.这说明你需要一个状态.

For streaming aggregation you can only rely on micro-batch stream execution in Structured Streaming. That gives that records for a single user could be part of two different micro-batches. That gives that you need a state.

这一切让您需要一个有状态的流聚合.

That all together gives that you need a stateful streaming aggregation.

有了这个,我想你想要一个 任意状态操作,即KeyValueGroupedDataset.mapGroupsWithStateKeyValueGroupedDataset.flatMapGroupsWithState(参见KeyValueGroupedDataset):

With that, I think you want one of the Arbitrary Stateful Operations, i.e. KeyValueGroupedDataset.mapGroupsWithState or KeyValueGroupedDataset.flatMapGroupsWithState (see KeyValueGroupedDataset):

许多用例需要比聚合更高级的有状态操作.例如,在许多用例中,您必须从事件数据流中跟踪会话.为了进行这种会话化,您必须将任意类型的数据保存为状态,并使用每个触发器中的数据流事件对状态执行任意操作.

Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger.

从 Spark 2.2 开始,这可以使用操作 mapGroupsWithState 和更强大的操作 flatMapGroupsWithState 来完成.这两种操作都允许您在分组数据集上应用用户定义的代码来更新用户定义的状态.

Since Spark 2.2, this can be done using the operation mapGroupsWithState and the more powerful operation flatMapGroupsWithState. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state.

状态将是每个用户找到的最后一条记录.这看起来可行.

A state would be per user with the last record found. That looks doable.

我担心的是:

  1. 这个流式查询要处理多少用户?(越多状态越大)

  1. How many users is this streaming query going to deal with? (the more the bigger the state)

何时清理状态(流中不再需要的用户)?(这将保持合理大小的状态)

When to clean up the state (of users that are no longer expected in a stream)? (which would keep the state of a reasonable size)

这篇关于如何使用 PySpark 结构化流计算时间戳之间的差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆