Kafka KStreams - 处理超时 [英] Kafka KStreams - processing timeouts

查看:20
本文介绍了Kafka KStreams - 处理超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 .process()TimeWindows.of("name", 30000) 来批量处理一些 KTable 值并发送它们.似乎 30 秒超过了消费者超时间隔,之后 Kafka 认为该消费者已不复存在并释放分区.

I am attempting to use <KStream>.process() with a TimeWindows.of("name", 30000) to batch up some KTable values and send them on. It seems that 30 seconds exceeds the consumer timeout interval after which Kafka considers said consumer to be defunct and releases the partition.

我已经尝试提高轮询提交间隔的频率以避免这种情况:

I've tried upping the frequency of poll and commit interval to avoid this:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

不幸的是,这些错误仍在发生:

Unfortunately these errors are still occurring:

(很多)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

之后是这些:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

显然我需要更频繁地将心跳发送回服务器.怎么样?

Clearly I need to be sending heartbeats back to the server more often. How?

我的拓扑是:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String>  kt = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

KTable 每 30 秒按键对值进行分组.在 Processor.init() 中,我调用了 context.schedule(30000).

The KTable is grouping values by key every 30 seconds. In Processor.init() I call context.schedule(30000).

DBProcessorSupplier 提供了一个 DBProcessor 的实例.这是 AbstractProcessor 的实现,其中提供了所有覆盖.他们所做的只是记录,所以我知道每个人什么时候被击中.

DBProcessorSupplier provides an instance of DBProcessor. This is an implementation of AbstractProcessor where all the overrides have been provided. All they do is LOG so I know when each is being hit.

这是一个非常简单的拓扑结构,但很明显我在某处遗漏了一步.

It's a pretty simple topology but it's clear I'm missing a step somewhere.

我知道我可以在服务器端进行调整,但我希望有一个客户端解决方案.我喜欢当客户端退出/死亡时分区很快可用的概念.

I get that I can adjust this on the server side but Im hoping there is a client-side solution. I like the notion of partitions being made available pretty quickly when a client exits / dies.

为了简化问题,我从图中删除了聚合步骤.现在只是消费者->处理器().(如果我将消费者直接发送到 .print() 它可以快速运行,所以我知道它没问题).(同样,如果我通过 .print() 输出聚合(KTable),它似乎也可以).

In an attempt to simplify the problem I removed the aggregation step from the graph. It's now just consumer->processor(). (If I send the consumer directly to .print() it works v quickly so I know it's ok). (Similarly If I output the aggregation (KTable) via .print() it seems ok too).

我发现 .process() - 应该每 30 秒调用一次 .punctuate() 实际上阻塞了时间长度可变并且输出有点随机(如果有的话).

What I found was that the .process() - which should be calling .punctuate() every 30 seconds is actually blocking for variable lengths of time and outputting somewhat randomly (if at all).

我将调试级别设置为调试"并重新运行.我看到很多消息:

I set the debug level to 'debug' and reran. Im seeing lots of messages:

DEBUG  o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>

但是 .punctuate() 函数中的断点没有被命中.所以它做了很多工作,但没有给我使用它的机会.

but a breakpoint in the .punctuate() function isn't getting hit. So it's doing lots of work but not giving me a chance to use it.

推荐答案

一些说明:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG 是提交间隔的下限,即,在提交之后,下一次提交不会在此时间过去之前发生.基本上,Kafka Stream 会在此时间过后尝试尽快提交,但无法保证下一次提交实际需要多长时间.
  • StreamsConfig.POLL_MS_CONFIG 用于内部KafkaConsumer#poll()调用,指定poll()的最大阻塞时间打电话.
  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG is a lower bound on the commit interval, ie, after a commit, the next commit happens not before this time passed. Basically, Kafka Stream tries to commit ASAP after this time passed, but there is no guarantee whatsoever how long it will actually take to do the next commit.
  • StreamsConfig.POLL_MS_CONFIG is used for the internal KafkaConsumer#poll() call, to specify the maximum blocking time of the poll() call.

因此,这两个值都无助于更频繁地心跳.

Thus, both values are not helpful to heartbeat more often.

Kafka Streams 在处理记录时遵循深度优先"策略.这意味着,在对每个记录执行 poll() 之后,将执行拓扑的所有操作符.假设您有三个连续的映射,那么在处理下一个/第二个记录之前,将为第一条记录调用所有三个映射.

Kafka Streams follows a "depth-first" strategy when processing record. This means, that after a poll() for each record all operators of the topology are executed. Let's assume you have three consecutive maps, than all three maps will be called for the first record, before the next/second record will get processed.

因此,在第一个 poll() 的所有记录被完全处理后,将进行下一个 poll() 调用.如果你想更频繁地心跳,你需要确保单个 poll() 调用获取更少的记录,这样处理所有记录需要更少的时间和下一个 poll() 会更早触发.

Thus, the next poll() call will be made, after all record of the first poll() got fully processed. If you want to heartbeat more often, you need to make sure, that a single poll() call fetches less records, such that processing all records takes less time and the next poll() will be triggered earlier.

您可以使用 KafkaConsumer 的配置参数,您可以通过 StreamsConfig 指定这些参数来完成此操作(请参阅 https://kafka.apache.org/documentation.html#consumerconfigs):

You can use configuration parameters for KafkaConsumer that you can specify via StreamsConfig to get this done (see https://kafka.apache.org/documentation.html#consumerconfigs):

streamConfig.put(ConsumerConfig.XXX, VALUE);

streamConfig.put(ConsumerConfig.XXX, VALUE);

  • max.poll.records:如果减小该值,将轮询较少的记录
  • session.timeout.ms:如果你增加这个值,有更多的时间来处理数据(添加这个是为了完整性,因为它实际上是一个客户端设置而不是服务器/代理端配置 -- 即使您知道此解决方案并且不喜欢它 :))
    • max.poll.records: if you decrease this value, less record will be polled
    • session.timeout.ms: if you increase this value, there is more time for processing data (adding this for completeness because it is actually a client setting and not a server/broker side configuration -- even if you are aware of this solution and do not like it :))
    • 编辑

      从 Kafka 0.10.1 开始,可以(并推荐)在流配置中为消费者和生产者配置添加前缀.这避免了参数冲突,因为一些参数名称用于消费者和生产者,否则无法区分(并且会同时应用于消费者生产者).要为参数添加前缀,您可以分别使用 StreamsConfig#consumerPrefix()StreamsConfig#producerPrefix().例如:streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

      As of Kafka 0.10.1 it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time). To prefix a parameter you can use StreamsConfig#consumerPrefix() or StreamsConfig#producerPrefix(), respectively. For example: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

      还要补充一点:这个问题中描述的场景是一个已知问题,并且已经存在 KIP-62KafkaConsumer 引入了一个发送心跳的后台线程,因此从 poll() 调用中分离心跳.Kafka Streams 将在即将发布的版本中利用这一新功能.

      One more thing to add: The scenario described in this question is a known issue and there is already KIP-62 that introduces a background thread for KafkaConsumer that send heartbeats, thus decoupling heartbeats from poll() calls. Kafka Streams will leverage this new feature in upcoming releases.

      这篇关于Kafka KStreams - 处理超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆