Kafka KStream - 使用带有窗口的 AbstractProcessor [英] Kafka KStream - using AbstractProcessor with a Window

查看:21
本文介绍了Kafka KStream - 使用带有窗口的 AbstractProcessor的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望将 KStream 的窗口批次输出组合在一起,并将它们写入二级存储.

Im hoping to group together windowed batches of output from a KStream and write them to a secondary store.

我期待看到 .punctuate() 大约每 30 秒被调用一次.我得到的是保存在这里.

I was expecting to see .punctuate() get called roughly every 30 seconds. What I got instead is saved here.

(原文件有几千行)

总结 - .punctuate() 被看似随机然后重复调用.它似乎不符合通过 ProcessorContext.schedule().

Summary - .punctuate() is getting called seemingly randomly and then repeatedly. It doesn't appear to adhere to the value set via ProcessorContext.schedule().

相同代码的另一次运行大约每四分钟产生一次对 .punctuate() 的调用.这次我没有看到疯狂的重复值.来源没有变化 - 只是结果不同.

Another run of the same code produced calls to .punctuate() roughly every four minutes. I didn't see the crazy repeated values this time. No change in source - just different result.

使用以下代码:

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

处理器

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay;

       values = new ArrayList<>();

    }

    @Override
    public void process(String s, String s2) {
        LOGGER.debug("batched processor s:{}   s2:{}", s, s2);

        values.add(s2);
    }

    @Override
    public void init(ProcessorContext context) {
        LOGGER.info("init");

        super.init(context);

        values.clear();

        this.context = context;
        context.schedule(delay);
    }

    @Override
    public void punctuate(long timestamp) {
        super.punctuate(timestamp);

        LOGGER.info("punctuate   ts: {}   count: {}", timestamp, values.size());

        context().commit();
    }
}

处理器供应商

public class BPS2 implements ProcessorSupplier<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);

    @Override
    public Processor<String, String> get() {
        try {
            return new BP2(30000);
        } catch(Exception exception) {
            LOGGER.error("Unable to instantiate BatchProcessor()", exception);
            throw new RuntimeException();
        }
    }
}

<小时>

为了确保我的调试器不会减慢速度,我构建了它并在与我的 kafka 进程相同的机器上运行它.这一次它甚至没有尝试延迟 4 分钟或更长时间 - 在几秒钟内它就输出了对 .punctuate() 的虚假调用.其中许多(大部分)没有对 .process() 的干预调用.


To make sure my debugger wasn't slowing this down I built it and ran it on the same box as my kafka process. This time it didn't even try to lag for 4 minutes or more - within seconds it was outputting spurious calls to .punctuate(). Many (most) of these with no intervening calls to .process().

推荐答案

更新:这部分答案适用于 Kafka 0.11 或更早版本(适用于 Kafka 1.0 及更高版本见下文)

在 Kafka Streams 中,标点符号基于流时间系统时间(也称为处理时间).

In Kafka Streams, punctuations are based on stream-time and not system time (aka processing-time).

默认stream-timeevent-time,即嵌入在Kafka记录中的时间戳.由于您没有设置非默认的 TimestampExtractor(请参阅 http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters),对 punctuate 的调用仅取决于与您处理的记录有关的事件时间的过程.因此,如果您需要多分钟来处理记录的30 秒"(事件时间),punctuate 的调用频率将低于 30 秒(挂钟时间)...

Per default stream-time is event-time, ie, the timestamp embedded in the Kafka records themselves. As you do not set a non-default TimestampExtractor (see timestamp.extractor in http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters), the calls to punctuate depend only on the process of the event time with regard to the records you process. Thus, if you need multiple minutes to process "30 seconds" (event time) of records, punctuate will be called less frequently than 30 seconds (wall-clock time)...

这也可以解释您的不规则呼叫模式(即突发和长时间延迟).如果您的数据事件时间确实跳跃",并且您要处理的数据已经在您的主题中完全可用,则 Kafka Streams 也会跳跃"关于内部维护的流时间.

This can also explain your irregular calling patterns (ie, burst and long delays). If your data event time does "jump", and your data to be processed is already completely available in your topic, Kafka Streams also "jumps" with regard to internally maintained stream-time.

我认为,您可以使用 WallclockTimestampExtractor 解决您的问题(请参阅 http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor)

I would assume, that you can resolve your issue by using WallclockTimestampExtractor (see http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor)

还要提一提:stream-time 仅在处理数据时才提前 -- 如果您的应用程序到达输入主题的末尾并等待数据,punctuate 不会被调用.即使您使用 WallclockTimestampExtractor,这也适用.

One more thing to mention: stream-time is only advanced if data is processed -- if your application reaches the end of the input topics and waits for data, punctuate will not be called. This applies even if you use WallclockTimestampExtractor.

顺便说一句:目前有关于 Streams 的标点行为的讨论:https://github.com/apache/kafka/pull/1689

Btw: there is currently a discussion about the punctuation behavior of Streams: https://github.com/apache/kafka/pull/1689

对 Kafka 1.0 及更高版本的回答

从 Kafka 1.0 开始,可以根据挂钟时间或事件时间注册标点符号:https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2

Since Kafka 1.0 it is possible to register punctuations based on wall-clock time or event-time: https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2

这篇关于Kafka KStream - 使用带有窗口的 AbstractProcessor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆