如何从 ValueTransformer 中的 Punctuator 实例向下游转发事件? [英] How to forward event downstream from a Punctuator instance in a ValueTransformer?

查看:23
本文介绍了如何从 ValueTransformer 中的 Punctuator 实例向下游转发事件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 KafkaStream 中,当实现 ValueTransformerValueTransformerWithKey 时,在 transform() 调用时,我安排了一个新的 Punctuator.当执行 Punctuator 的方法 punctuate() 时,我希望它使用上下文实例向下游转发事件.然而,当 DSL 拓扑的一部分时,上下文实例似乎没有定义.

In KafkaStream, when implementing a ValueTransformer or ValueTransformerWithKey, on transform() call, I schedule a new Punctuator. When method punctuate() of the Punctuator is executed I want it to forward an event downstream using the context instance. However, the context instance seems not defined when part of a DSL topology.

有关如何使用 Transformer 执行此操作的任何线索?

Any clue on how to do this with a Transformer ?

在处理器中使用相同的逻辑,实现其工作的低级处理器拓扑.

Using the same logic in a Processor, implementing the low-level processor topology it works.

在 ValueTransformerWithKey 中:

In ValueTransformerWithKey:

@Override 
    public Event transform(final String key, final Event event) { 
        this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
        return null;
}

在 MyPunctuator 中:

In MyPunctuator:

private class MytPunctuator implements Punctuator {
    private String key;
    private ProcessorContext context;
    private Event event;

    MyPunctuator(ProcessorContext context, String key, Event event)
    {
        this.context = context;
        this.key = key;
        this.event = event;
    }

    @Override
    public void punctuate(final long timestamp) {
        context.forward(key, AlertEvent.builder().withSource(event).build());
        context.commit();
    }
}

执行时

myStream
    .groupByKey(Serialized.with(Serdes.String(), Event.serde()))
    .reduce((k, v) -> v)
    .transformValues(() -> valueTransformerWithKey)
    .toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));

我希望标点符号产生的 Alarm 事件在过期后被转发到 ALARM 主题.

I expect the Alarm event produced by the punctuator to be forwared to the ALARM topic once expired.

相反,我收到了以下异常:不支持 ProcessorContext.forward().

Instead I got the following exception: ProcessorContext.forward() not supported.

推荐答案

像往常一样,我在 javadoc 中找到了关于 ValueTransformerWithKey 接口的答案:https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html

As usual, I found the answer in the javadoc about ValueTransformerWithKey interface: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html

请注意,在转换中不允许使用 ProcessorContext.forward(Object, Object) 或 ProcessorContext.forward(Object, Object, To) 并会导致异常.

Note, that using ProcessorContext.forward(Object, Object) or ProcessorContext.forward(Object, Object, To) is not allowed within transform and will result in an exception.

但是,实现Transformer 接口反而允许使用context.forward().谢谢@Matthias J. Sax

However, implementing the Transformer interface instead allows the usage of context.forward(). Thanks @Matthias J. Sax

https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html

如果要向下游转发多个输出记录,则可以使用 ProcessorContext.forward(Object, Object) 和 ProcessorContext.forward(Object, Object, To).如果不应该向下游转发记录,则转换可以返回 null.

If more than one output record should be forwarded downstream ProcessorContext.forward(Object, Object) and ProcessorContext.forward(Object, Object, To) can be used. If record should not be forwarded downstream, transform can return null.

这篇关于如何从 ValueTransformer 中的 Punctuator 实例向下游转发事件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆