排序流联合以标识Apache Flink中的用户会话 [英] Sorting union of streams to identify user sessions in Apache Flink
问题描述
我有两个事件流
- L = (l1,l3,l8,...)-是稀疏的,代表用户登录到IP
- E = (e2,e4,e5,e9,...)-是特定IP日志流
- L = (l1, l3, l8, ...) - is sparser and represents user logins to a IP
- E = (e2, e4, e5, e9, ...) - is a stream of logs the particular IP
较低的索引表示时间戳记...如果将两个流合并在一起并按时间对其进行排序,则会得到:
the lower index represents a timestamp... If we joined the two streams together and sorted them by time we would get:
- l1 , e2 , l3 , e4,e5 , l8 , e9 ,...
- l1, e2, l3, e4, e5, l8, e9, ...
是否可以实现自定义Window
/Trigger
函数,以将事件分组为会话(不同用户登录之间的时间):
Would it be possible to implement custom Window
/ Trigger
functions to group the event to sessions (time between logins of different users):
- l1 - l3 : e2
- l3 - l8 : e4,e5
- l8 - l14 : e9 , e10 , e11 , e12 , e13
- ...
- l1 - l3 : e2
- l3 - l8 : e4, e5
- l8 - l14 : e9, e10, e11, e12, e13
- ...
我看到的问题是两个流不一定要排序.我考虑过按时间戳对输入流进行排序.这样,使用GlobalWindow
和自定义Trigger
来实现窗口化将很容易-但似乎不可能.
The problem which I see is that the two streams are not necessarily sorted. I thought about sorting the input stream by time-stamps. Then it would be easy to implement the windowing using GlobalWindow
and custom Trigger
- yet it seems that it is not possible.
我是否缺少某些东西,或者在当前的Flink(v1.3.2)中绝对不可能吗?
Am I missing something or is it definitely not possible to do so in current Flink (v1.3.2)?
谢谢
推荐答案
问题:E3不应该在L4之前出现吗?
Question: shouldn't E3 come before L4?
使用ProcessFunction
进行排序非常简单.像这样:
Sorting is pretty straightforward using a ProcessFunction
. Something like this:
public static class SortFunction extends ProcessFunction<Event, Event> {
private ValueState<PriorityQueue<Event>> queueState = null;
@Override
public void open(Configuration config) {
ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
// state name
"sorted-events",
// type information of state
TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
}));
queueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<Event> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10);
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(event.timestamp);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
PriorityQueue<Event> queue = queueState.value();
Long watermark = context.timerService().currentWatermark();
Event head = queue.peek();
while (head != null && head.timestamp <= watermark) {
out.collect(head);
queue.remove(head);
head = queue.peek();
}
}
}
更新:请参见如何使用Flink对乱序事件时间流进行排序 描述一般更好的方法.
Update: see How to sort an out-of-order event time stream using Flink for a description of a generally better approach.
这篇关于排序流联合以标识Apache Flink中的用户会话的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!