如何通过检测组中的第一条消息来将消息组分配给Windows? [英] How to assign groups of messages to windows by detecting the first message of a group?

查看:76
本文介绍了如何通过检测组中的第一条消息来将消息组分配给Windows?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下问题: 我收到必须分组的邮件,并且必须处理每组邮件.我只能检测到每个组的第一条消息.在该特定的第一条消息之后,以下消息属于该组,直到检测到下一个组的第一条消息为止.

I have the following problem: I receive messages which have to be grouped and each group of messages has to be processed. I can only detect the first message of each group. After that specific first message, the following messages belong to that group until the first message of the next group has been detected.

我解决该问题的方法是编写一个自定义触发器,当他检测到组的第一条消息时(通过覆盖onElement)返回FIRE_PURGE. 我的目标是将一组的所有消息分配到一个窗口.

My approach to solving that problem was to write a custom trigger that returns FIRE_PURGE when he detects the first message of a group (by overriding onElement). My goal was to assign all messages of one group to one window.

这种方法的问题在于,每个组的第一条消息总是分配给前一组的窗口.

The problem with that approach is that the first message of each group is always assigned to the window of the preceding group.

我得到的是:[aaaaaaab],[bbbbbbbbc] ... 我想要的是:[aaaaaaa],[bbbbbbbb] ...

What i get is: [aaaaaaab], [bbbbbbbbc] ... What i want is: [aaaaaaa], [bbbbbbbb] ...

主要功能中的相关代码:

Relevant code from the main function:

            esRawInputStream.filter(new FilterFunction<JsonNode>() {
                @Override
                public boolean filter(JsonNode doc) throws Exception {
                    return // some condition
                }
            }).keyBy(new KeySelector<JsonNode, String>() {
                @Override
                public String getKey(JsonNode doc) throws Exception {
                    return doc.findValue("meta_charge_point_id").asText();
                }
            }).window(GlobalWindows.create())
                    .trigger(new CustomEventTrigger<JsonNode, GlobalWindow>())
                    .fold(new SessionBucket(), new FoldFunction<JsonNode, SessionBucket>() {
                        @Override
                        public SessionBucket fold(SessionBucket b, JsonNode msg) throws Exception {
                            b.addMessage(msg);
                            return b;
                        }
                    }).addSink(new FileSink<SessionBucket>());

触发器:

public class CustomEventTrigger<T, W extends Window> extends Trigger {
    private String currentSessionId = "foo";

    @Override
    public TriggerResult onElement(Object element, long timestamp, Window window, TriggerContext ctx) throws Exception {
        JsonNode jsonElement = null;
        if (element instanceof JsonNode) {
            jsonElement = (JsonNode) element;

        } else {
            // raise
        }
        TriggerResult res = TriggerResult.CONTINUE;
        String elementSessionId = jsonElement.findValue("ocpp_session_id").asText();
        if (!elementSessionId.equals(currentSessionId)) {
            currentSessionId = elementSessionId;
            res = TriggerResult.FIRE_AND_PURGE;
        }
        return res;
    }

    @Override
    public TriggerResult onProcessingTime(long time, Window window, TriggerContext ctx) throws Exception {
        return null;
    }

    @Override
    public TriggerResult onEventTime(long time, Window window, TriggerContext ctx) throws Exception {
        return null;
    }

    @Override
    public void clear(Window window, TriggerContext ctx) throws Exception {

    }
} 

推荐答案

此用例不适用于Flink的window API.让我建议一种替代方法,即使用有状态的平面图函数来完成此操作.

This use case isn't very well suited to Flink's window API. Let me suggest an alternative, which is to do this with a stateful flatmap function.

下面是一个可能看起来像的例子:

Here's an example of what that might look like:

public class Segmenting {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.fromElements(1, 2, 2, 3, 3, 3, 1, 4, 4, 4, 4, 2, 2)
            // key the stream so we can used keyed state
            .keyBy(event -> 1)
            .flatMap(new RichFlatMapFunction<Integer, List<Integer>>() {
                private transient ValueState<Integer> currentValue;
                private transient ListState<Integer> list;

                @Override
                public void open(Configuration parameters) throws Exception {
                    currentValue = getRuntimeContext().getState(new ValueStateDescriptor<>("currentValue", Integer.class));
                    list = getRuntimeContext().getListState(new ListStateDescriptor<>("list", Integer.class));
                }

                @Override
                public void flatMap(Integer event, Collector<List<Integer>> collector) throws Exception {
                    Integer value = currentValue.value();

                    if (value == event) {
                        list.add(event);
                    } else {
                        if (value != null) {
                            List<Integer> result = new ArrayList<>();
                            list.get().forEach(result::add);
                            collector.collect(result);
                        }
                        currentValue.update(event);
                        list.clear();
                        list.add(event);
                    }
                }
            })
            .print();

        env.execute();
    }
}

输出为

[1]
[2, 2]
[3, 3, 3]
[1]
[4, 4, 4, 4]

顺便说一句,我假设数据是有序的,并且避免并行处理以便保持数据有序.对于大多数流处理应用程序来说,这是不现实的假设.如果您的数据混乱,您可以以此为起点,但是最终的解决方案会更加复杂.

By the way, I'm assuming the data is in order, and am avoiding parallel processing so as to keep it in order. For most stream processing applications that would be an unrealistic assumption. If your data will be out-of-order, you can use this as a starting point, but the final solution will be more complex.

这篇关于如何通过检测组中的第一条消息来将消息组分配给Windows?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆