排序流联合以标识Apache Flink中的用户会话 [英] Sorting union of streams to identify user sessions in Apache Flink

查看:94
本文介绍了排序流联合以标识Apache Flink中的用户会话的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个事件流

  • L = (l1,l3,l8,...)-是稀疏的,代表用户登录到IP
  • E = (e2,e4,e5,e9,...)-是特定IP日志流
  • L = (l1, l3, l8, ...) - is sparser and represents user logins to a IP
  • E = (e2, e4, e5, e9, ...) - is a stream of logs the particular IP

较低的索引表示时间戳记...如果将两个流合并在一起并按时间对其进行排序,则会得到:

the lower index represents a timestamp... If we joined the two streams together and sorted them by time we would get:

  • l1 e2 l3 e4,e5 l8 e9 ,...
  • l1, e2, l3, e4, e5, l8, e9, ...

是否可以实现自定义Window/Trigger函数,以将事件分组为会话(不同用户登录之间的时间):

Would it be possible to implement custom Window / Trigger functions to group the event to sessions (time between logins of different users):

  • l1 - l3 : e2
  • l3 - l8 : e4,e5
  • l8 - l14 : e9 e10 e11 e12 e13
  • ...
  • l1 - l3 : e2
  • l3 - l8 : e4, e5
  • l8 - l14 : e9, e10, e11, e12, e13
  • ...

我看到的问题是两个流不一定要排序.我考虑过按时间戳对输入流进行排序.这样,使用GlobalWindow和自定义Trigger来实现窗口化将很容易-但似乎不可能.

The problem which I see is that the two streams are not necessarily sorted. I thought about sorting the input stream by time-stamps. Then it would be easy to implement the windowing using GlobalWindow and custom Trigger - yet it seems that it is not possible.

我是否缺少某些东西,或者在当前的Flink(v1.3.2)中绝对不可能吗?

Am I missing something or is it definitely not possible to do so in current Flink (v1.3.2)?

谢谢

推荐答案

问题:E3不应该在L4之前出现吗?

Question: shouldn't E3 come before L4?

使用ProcessFunction进行排序非常简单.像这样:

Sorting is pretty straightforward using a ProcessFunction. Something like this:

public static class SortFunction extends ProcessFunction<Event, Event> {
  private ValueState<PriorityQueue<Event>> queueState = null;

  @Override
  public void open(Configuration config) {
    ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
        // state name
        "sorted-events",
        // type information of state
        TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
        }));
    queueState = getRuntimeContext().getState(descriptor);
  }

  @Override
  public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
    TimerService timerService = context.timerService();

    if (context.timestamp() > timerService.currentWatermark()) {
      PriorityQueue<Event> queue = queueState.value();
      if (queue == null) {
        queue = new PriorityQueue<>(10);
      }
      queue.add(event);
      queueState.update(queue);
      timerService.registerEventTimeTimer(event.timestamp);
    }
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
    PriorityQueue<Event> queue = queueState.value();
    Long watermark = context.timerService().currentWatermark();
    Event head = queue.peek();
    while (head != null && head.timestamp <= watermark) {
      out.collect(head);
      queue.remove(head);
      head = queue.peek();
    }
  }
}

更新:请参见如何使用Flink对乱序事件时间流进行排序 描述一般更好的方法.

Update: see How to sort an out-of-order event time stream using Flink for a description of a generally better approach.

这篇关于排序流联合以标识Apache Flink中的用户会话的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆