Kafka KStream - 使用带窗口的AbstractProcessor [英] Kafka KStream - using AbstractProcessor with a Window

查看:692
本文介绍了Kafka KStream - 使用带窗口的AbstractProcessor的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储。

Im hoping to group together windowed batches of output from a KStream and write them to a secondary store.

我原本希望看到 .punctuate()大约每30秒调用一次。我得到的是这里

I was expecting to see .punctuate() get called roughly every 30 seconds. What I got instead is saved here.

(原始文件长达数千行)

(The original file was several thousand lines long)

摘要 - .punctuate()似乎被调用了随机然后反复。它似乎不符合通过 ProcessorContext.schedule()

Summary - .punctuate() is getting called seemingly randomly and then repeatedly. It doesn't appear to adhere to the value set via ProcessorContext.schedule().

相同代码的另一次运行大约每四分钟产生一次对 .punctuate()的调用。这次我没有看到疯狂的重复值。来源没有变化 - 只是结果不同。

Another run of the same code produced calls to .punctuate() roughly every four minutes. I didn't see the crazy repeated values this time. No change in source - just different result.

使用以下代码:

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();



处理器



Processor

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay;

       values = new ArrayList<>();

    }

    @Override
    public void process(String s, String s2) {
        LOGGER.debug("batched processor s:{}   s2:{}", s, s2);

        values.add(s2);
    }

    @Override
    public void init(ProcessorContext context) {
        LOGGER.info("init");

        super.init(context);

        values.clear();

        this.context = context;
        context.schedule(delay);
    }

    @Override
    public void punctuate(long timestamp) {
        super.punctuate(timestamp);

        LOGGER.info("punctuate   ts: {}   count: {}", timestamp, values.size());

        context().commit();
    }
}



ProcessorSupplier



ProcessorSupplier

public class BPS2 implements ProcessorSupplier<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);

    @Override
    public Processor<String, String> get() {
        try {
            return new BP2(30000);
        } catch(Exception exception) {
            LOGGER.error("Unable to instantiate BatchProcessor()", exception);
            throw new RuntimeException();
        }
    }
}






编辑:



为了确保我的调试器没有减慢速度,我构建它并在与我的kafka进程相同的盒子上运行它。这次它甚至没有试图延迟4分钟或更长时间 - 在几秒钟内就输出了对 .punctuate()的虚假来电。其中许多(大多数)没有干预调用 .process()


To make sure my debugger wasn't slowing this down I built it and ran it on the same box as my kafka process. This time it didn't even try to lag for 4 minutes or more - within seconds it was outputting spurious calls to .punctuate(). Many (most) of these with no intervening calls to .process().

推荐答案

更新:这部分答案适用于Kafka版本0.11或更早版本(适用于Kafka 1.0及更高版本,见下文)

In Kafka Streams,标点符号基于流时 系统时间(又称处​​理时间)。

In Kafka Streams, punctuations are based on stream-time and not system time (aka processing-time).

默认流时间事件时间,即Kafka记录中嵌入的时间戳。因为你没有设置非默认的 TimestampExtractor (参见 timestamp.extractor /docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters\"rel =nofollow noreferrer> http://docs.confluent.io/current/streams/developer-guide.html#可选配置参数),对 punctuate 的调用仅取决于您处理的记录的事件时间过程。因此,如果您需要多分钟来处理记录的30秒(事件时间), punctuate 将被频繁调用,而不是30秒(挂钟时间)。 。

Per default stream-time is event-time, ie, the timestamp embedded in the Kafka records themselves. As you do not set a non-default TimestampExtractor (see timestamp.extractor in http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters), the calls to punctuate depend only on the process of the event time with regard to the records you process. Thus, if you need multiple minutes to process "30 seconds" (event time) of records, punctuate will be called less frequently than 30 seconds (wall-clock time)...

这也可以解释你的不规则通话模式(即突发和长延迟)。如果您的数据事件时间跳转,并且您的主题中已经完全可以处理您要处理的数据,那么Kafka Streams也会内部维护流时间

This can also explain your irregular calling patterns (ie, burst and long delays). If your data event time does "jump", and your data to be processed is already completely available in your topic, Kafka Streams also "jumps" with regard to internally maintained stream-time.

我认为你可以使用 WallclockTimestampExtractor 来解决你的问题(参见 http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor

I would assume, that you can resolve your issue by using WallclockTimestampExtractor (see http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor)

还有一点需要提及:流时间仅在处理数据时才会提前 - 如果您的应用程序到达结束时输入主题和等待数据, punctuate 将不会被调用。即使您使用 WallclockTimestampExtractor ,这也适用。

One more thing to mention: stream-time is only advanced if data is processed -- if your application reaches the end of the input topics and waits for data, punctuate will not be called. This applies even if you use WallclockTimestampExtractor.

顺便说一句:目前有关于Streams标点符号行为的讨论: https://github.com/apache/kafka/pull/1689

Btw: there is currently a discussion about the punctuation behavior of Streams: https://github.com/apache/kafka/pull/1689

Kafka 1.0及更高版本的答案

从Kafka 1.0开始,可以注册标点符号在挂钟时间或事件时间: https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2

Since Kafka 1.0 it is possible to register punctuations based on wall-clock time or event-time: https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2

这篇关于Kafka KStream - 使用带窗口的AbstractProcessor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆