如何在ValueTransformer中从Punctuator实例向下游转发事件? [英] How to forward event downstream from a Punctuator instance in a ValueTransformer?
问题描述
在KafkaStream中,当实现 ValueTransformer 或 ValueTransformerWithKey 时,在 transform()调用中,我计划了一个新的Punctuator.当执行 Punctuator 的方法 punctuate()时,我希望它使用上下文实例将事件转发到下游.但是,当属于DSL拓扑的一部分时,上下文实例似乎未定义.
In KafkaStream, when implementing a ValueTransformer or ValueTransformerWithKey, on transform() call, I schedule a new Punctuator. When method punctuate() of the Punctuator is executed I want it to forward an event downstream using the context instance. However, the context instance seems not defined when part of a DSL topology.
关于如何使用变形金刚"做到这一点的任何线索?
Any clue on how to do this with a Transformer ?
在处理器中使用相同的逻辑,实现其工作的低级处理器拓扑.
Using the same logic in a Processor, implementing the low-level processor topology it works.
在ValueTransformerWithKey中:
In ValueTransformerWithKey:
@Override
public Event transform(final String key, final Event event) {
this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
return null;
}
在MyPunctuator中:
In MyPunctuator:
private class MytPunctuator implements Punctuator {
private String key;
private ProcessorContext context;
private Event event;
MyPunctuator(ProcessorContext context, String key, Event event)
{
this.context = context;
this.key = key;
this.event = event;
}
@Override
public void punctuate(final long timestamp) {
context.forward(key, AlertEvent.builder().withSource(event).build());
context.commit();
}
}
执行时
myStream
.groupByKey(Serialized.with(Serdes.String(), Event.serde()))
.reduce((k, v) -> v)
.transformValues(() -> valueTransformerWithKey)
.toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));
我希望一旦标点符号过期,就可以将由标点器产生的Alarm事件转发给ALARM主题.
I expect the Alarm event produced by the punctuator to be forwared to the ALARM topic once expired.
相反,我遇到了以下异常:不支持ProcessorContext.forward().
Instead I got the following exception: ProcessorContext.forward() not supported.
推荐答案
和往常一样,我在javadoc中找到了关于 ValueTransformerWithKey 接口的答案: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html
As usual, I found the answer in the javadoc about ValueTransformerWithKey interface: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html
请注意,在转换中不允许使用ProcessorContext.forward(Object,Object)或ProcessorContext.forward(Object,Object,To),这将导致异常.
Note, that using ProcessorContext.forward(Object, Object) or ProcessorContext.forward(Object, Object, To) is not allowed within transform and will result in an exception.
但是,通过实现 Transformer 接口,可以使用 context.forward().谢谢@Matthias J.Sax
However, implementing the Transformer interface instead allows the usage of context.forward(). Thanks @Matthias J. Sax
https://kafka.apache .org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html
如果应转发多个输出记录,则可以使用下游ProcessorContext.forward(Object,Object)和ProcessorContext.forward(Object,Object,To).如果不应将记录转发到下游,则transform可以返回null.
If more than one output record should be forwarded downstream ProcessorContext.forward(Object, Object) and ProcessorContext.forward(Object, Object, To) can be used. If record should not be forwarded downstream, transform can return null.
这篇关于如何在ValueTransformer中从Punctuator实例向下游转发事件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!