使用Kinesis Analytics构建实时会话 [英] Using Kinesis Analytics to construct real time sessions

查看:126
本文介绍了使用Kinesis Analytics构建实时会话的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

某处是否有示例,或者有人可以解释如何使用Kinesis Analytics构建实时会话. (即会话化)

Is there an example somewhere or can someone explain how to using Kinesis Analytics to construct real time sessions. (ie sessionization)

这里提到了这种可能: https://aws.amazon.com/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/在讨论自定义窗口时,没有给出示例.

It is mentioned that this possible here: https://aws.amazon.com/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/ in the discussion of custom windows but does not give an example.

通常,这是在SQL中使用LAG函数完成的,因此您可以计算连续行之间的时间差.这篇文章: https://blog.modeanalytics.com/finding-user-sessions- sql/描述了如何使用常规的(非流式)SQL.但是,我在Kinesis Analytics中看不到对LAG功能的支持.

Typically this is done in SQL using the LAG function so you can compute the time difference between consecutive rows. This post: https://blog.modeanalytics.com/finding-user-sessions-sql/ describes how to do it with conventional (non-streaming) SQL. However, I don't see support for the LAG function in Kinesis Analytics.

我特别喜欢两个例子.假设两者都将由user_id和时间戳组成的流作为输入.为会话定义一系列来自同一用户的事件,间隔少于5分钟

In particular I would love two examples. Assume that both take as input a stream consisting of a user_id and a timestamp. Define a session a sequence of events from the same user separated by less than 5 minutes

1)第一个输出的流具有附加列event_count session_start_timestamp.每次发生事件时,都应该使用另外两列输出事件.

1) The first outputs a stream that has the additional columns event_count session_start_timestamp. Every time an event comes in this should output an event with these two additional columns.

2)第二个示例是流,一旦会话结束(即,过去5分钟,没有来自用户的数据),每个会话将输出一个事件.此事件将具有userId,start_timestamp,end_timestamp和event_count

2) The second example would be a stream that outputs a single event per session once the session has ended (ie 5 minutes have past with no data from a user). This event would have userId, start_timestamp, end_timestamp, and event_count

Kinesis Analytics有可能吗?

Is this possible with Kinesis Analytics?

以下是使用Apache Spark执行此操作的示例:

Here is an example of doing this with Apache Spark: https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/Applications/01%20Sessionization.html

但是我很想用一个(或两个)Kinesis Analytics流来做到这一点.

But I would love to do this with one (or two) Kinesis Analytics streams.

推荐答案

借助AWS解决方案架构师,我得以采用以下策略进行会话:

With the help of an AWS Solution Architect I was able to sessionize with this strategy:

Source stream sample:  

epoc_time: INTEGER
uuid: CHAR(6)

epoc_time   uuid
1530000000  myuuid
1530000001  myuuid
1530000002  myuuid
1530000003  myuuid
1530002000  myuuid
1530002001  myuuid
1530002002  myuuid
1530002003  myuuid

步骤1:获取当前行与上一行之间的时差,如果该时差大于您的会话不活动时间要求(在我的情况下,请选择15分钟/900秒),请对其进行标记.

Step 1: Get the time difference between the current and preceding row and if that difference is greater than your session inactivity time requirement ( in my case ill choose 15 min / 900 seconds) stamp it.

CASE WHEN (epoc_time - lag(epoc_time,1) OVER (PARTITION BY uuid ROWS 1 PRECEDING)) > 900 THEN epoc_time
     WHEN (epoc_time - lag(epoc_time,1) OVER (PARTITION BY uuid ROWS 1 PRECEDING)) IS NULL THEN epoc_time
     ELSE NULL as session


epoc_time   uuid    session
1530000000  myuuid  1530000000
1530000001  myuuid  
1530000002  myuuid  
1530000003  myuuid  
1530002000  myuuid  1530002000
1530002001  myuuid  
1530002002  myuuid  
1530002003  myuuid

步骤2:抓住uuid窗口所在的会话列中的最后一个值,将其与uuid结合以创建唯一的会话.我选择了该范围作为Kinesis的默认保留期(24小时).

Step 2: Grab the last value in the session column windowed by the uuid, combine it with the uuid to create a unique session. I chose the range as the default retention period for Kinesis (24 hours).

CAST(LAST_VALUE(session) IGNORE NULLS OVER (PARTITION BY uuid RANGE INTERVAL '24' HOUR PRECEDING) as CHAR(10)) 
|| uuid as sessionId,

epoc_time   uuid    session     sessionId
1530000000  myuuid  1530000000  1530000000myuuid
1530000001  myuuid              1530000000myuuid
1530000002  myuuid              1530000000myuuid
1530000003  myuuid              1530000000myuuid
1530002000  myuuid  1530002000  1530002000myuuid
1530002001  myuuid              1530002000myuuid
1530002002  myuuid              1530002000myuuid
1530002003  myuuid              1530002000myuuid

最终SQL可能看起来像这样:

Final SQL could look something like this:

CREATE OR REPLACE STREAM "INTERMEDIATE_SQL_STREAM" (
    epoc_time INTEGER,
    uuid CHAR(6),
    session INTEGER
    );


CREATE OR REPLACE STREAM "DESTINATION_STREAM" (
    epoc_time INTEGER,
    uuid CHAR(6),
    sessionId CHAR(16)
    );


CREATE OR REPLACE  PUMP "STREAM_PUMP" AS INSERT INTO "INTERMEDIATE_SQL_STREAM"
SELECT STREAM
            epoc_time,
            uuid,
            CASE WHEN (epoc_time - lag(epoc_time,1) OVER (PARTITION BY uuid ROWS 1 PRECEDING)) > 900 THEN epoc_time
                 WHEN (epoc_time - lag(epoc_time,1) OVER (PARTITION BY uuid ROWS 1 PRECEDING)) IS NULL THEN epoc_time
                 ELSE NULL 
            END as session    
FROM "SOURCE_SQL_STREAM_001";


CREATE OR REPLACE  PUMP "STREAM_PUMP2" AS INSERT INTO "DESTINATION_STREAM"
SELECT STREAM
            epoc_time,
            uuid,
            CAST(LAST_VALUE(session) IGNORE NULLS OVER (PARTITION BY uuid RANGE INTERVAL '24' HOUR PRECEDING) as CHAR(10)) || uuid as sessionId    
FROM "INTERMEDIATE_SQL_STREAM";

这篇关于使用Kinesis Analytics构建实时会话的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆