如何在ValueTransformer中从Punctuator实例向下游转发事件? [英] How to forward event downstream from a Punctuator instance in a ValueTransformer?

查看:102
本文介绍了如何在ValueTransformer中从Punctuator实例向下游转发事件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在KafkaStream中,当实现 ValueTransformer ValueTransformerWithKey 时,在 transform()调用中,我计划了一个新的Punctuator.当执行 Punctuator 的方法 punctuate()时,我希望它使用上下文实例将事件转发到下游.但是,当属于DSL拓扑的一部分时,上下文实例似乎未定义.

In KafkaStream, when implementing a ValueTransformer or ValueTransformerWithKey, on transform() call, I schedule a new Punctuator. When method punctuate() of the Punctuator is executed I want it to forward an event downstream using the context instance. However, the context instance seems not defined when part of a DSL topology.

关于如何使用变形金刚"做到这一点的任何线索?

Any clue on how to do this with a Transformer ?

在处理器中使用相同的逻辑,实现其工作的低级处理器拓扑.

Using the same logic in a Processor, implementing the low-level processor topology it works.

在ValueTransformerWithKey中:

In ValueTransformerWithKey:

@Override 
    public Event transform(final String key, final Event event) { 
        this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
        return null;
}

在MyPunctuator中:

In MyPunctuator:

private class MytPunctuator implements Punctuator {
    private String key;
    private ProcessorContext context;
    private Event event;

    MyPunctuator(ProcessorContext context, String key, Event event)
    {
        this.context = context;
        this.key = key;
        this.event = event;
    }

    @Override
    public void punctuate(final long timestamp) {
        context.forward(key, AlertEvent.builder().withSource(event).build());
        context.commit();
    }
}

执行时

myStream
    .groupByKey(Serialized.with(Serdes.String(), Event.serde()))
    .reduce((k, v) -> v)
    .transformValues(() -> valueTransformerWithKey)
    .toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));

我希望一旦标点符号过期,就可以将由标点器产生的Alarm事件转发给ALARM主题.

I expect the Alarm event produced by the punctuator to be forwared to the ALARM topic once expired.

相反,我遇到了以下异常:不支持ProcessorContext.forward().

Instead I got the following exception: ProcessorContext.forward() not supported.

推荐答案

和往常一样,我在javadoc中找到了关于 ValueTransformerWithKey 接口的答案: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html

As usual, I found the answer in the javadoc about ValueTransformerWithKey interface: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html

请注意,在转换中不允许使用ProcessorContext.forward(Object,Object)或ProcessorContext.forward(Object,Object,To),这将导致异常.

Note, that using ProcessorContext.forward(Object, Object) or ProcessorContext.forward(Object, Object, To) is not allowed within transform and will result in an exception.

但是,通过实现 Transformer 接口,可以使用 context.forward().谢谢@Matthias J.Sax

However, implementing the Transformer interface instead allows the usage of context.forward(). Thanks @Matthias J. Sax

https://kafka.apache .org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html

如果应转发多个输出记录,则可以使用下游ProcessorContext.forward(Object,Object)和ProcessorContext.forward(Object,Object,To).如果不应将记录转发到下游,则transform可以返回null.

If more than one output record should be forwarded downstream ProcessorContext.forward(Object, Object) and ProcessorContext.forward(Object, Object, To) can be used. If record should not be forwarded downstream, transform can return null.

这篇关于如何在ValueTransformer中从Punctuator实例向下游转发事件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆