Kafka KStreams - 处理超时 [英] Kafka KStreams - processing timeouts

查看:121
本文介绍了Kafka KStreams - 处理超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用< KStream> .process()并使用 TimeWindows.of(name,30000)批量处理一些 KTable 值并将其发送。似乎30秒超过消费者超时间隔,之后Kafka认为消费者已经解散并释放分区。



我试图提高的频率民意调查提交间隔以避免这种情况:

  config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG ,5000); 
config.put(StreamsConfig.POLL_MS_CONFIG,5000);

不幸的是,这些错误仍在发生:



(很多这些)

 错误oakspinternals.RecordCollector  - 向主题发送记录时出错kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException:包含1条记录的批处理因超时而过期,同时从代理请求元数据为kafka_test1-write_aggregate2-changelog-0

其次是:

  INFO oakcciAbstractCoordinator  - 标记协调员12.34.56.7:9092(id:2147483547 rack:null)对于组kafka_test1来说是死的
WARN oakspinternals.StreamThread - 无法在线程[StreamThread-1]中提交StreamTask#0_0:
org.apache.kafka .clients.consumer.CommitFailedException:由于组已经重新平衡并将分区分配给另一个成员,因此无法完成提交。这意味着后续调用poll()之间的时间比配置的session.timeout.ms长,这通常意味着轮询循环花费了太多时间进行消息处理。您可以通过增加会话超时或通过max.poll.records减少poll()中返回的批量的最大大小来解决此问题。 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator $ $ $ b $ $ $ Off $ $ $ $ $ $ $ $ $ $ $
$ b

显然,我需要更频繁地将心跳发送回服务器。怎么样?



我的拓扑结构是:

  KStreamBuilder kStreamBuilder = new KStreamBuilder (); 
KStream< String,String> lines = kStreamBuilder.stream(TOPIC);
KTable< Windowed< String> ;, String> kt = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of(write_aggregate2,30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream()。process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamsConfig);

kafkaStreams.start();

KTable 每隔30秒按键对值进行分组。在 Processor.init()中,我调用 context.schedule(30000)



DBProcessorSupplier 提供 DBProcessor 的实例。这是 AbstractProcessor 的实现,其中提供了所有覆盖。所有他们做的都是LOG,所以我知道每个人都被击中。



这是一个非常简单的拓扑结构,但很明显我在某个地方错过了一步。






编辑:



我知道我可以在服务器端对此进行调整但是我希望有一个客户端解决方案。我喜欢在客户退出/死亡时很快就能获得分区的概念。






编辑:



为了简化问题,我从图表中删除了聚合步骤。它现在只是消费者 - >处理器()。 (如果我将消费者直接发送到 .print(),它会很快工作,所以我知道没关系。 (同样如果我通过 .print()输出聚合(KTable),它似乎也可以。)



什么我发现是 .process() - 应该调用 .punctuate()每隔30秒实际上会阻塞可变长度的时间并稍微随机输出(如果有的话)。





此外:



我将调试级别设置为'debug'并重申。我看到很多消息:

  DEBUG o.a.k.s.p.internals.StreamTask  - 开始处理一条记录[ConsumerRecord< info> 

但是 .punctuate()功能没有被击中。所以它做了很多工作,但没有给我机会使用它。

解决方案

一些澄清:




  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG 是提交间隔的下限,即提交后,下一个在此时间过去之前不会发生提交。基本上,Kafka Stream试图在这段时间过后尽快提交,但无法保证下次提交需要多长时间。

  • StreamsConfig。 POLL_MS_CONFIG 用于内部 KafkaConsumer#poll()调用,以指定 poll()的最大阻塞时间 call。



因此,这两个值对心跳更有帮助。



Kafka Streams在处理记录时遵循深度优先策略。这意味着,在每个记录的 poll()之后,将执行拓扑的所有运算符。假设你有三个连续的地图,那么在下一个/第二个记录将被处理之前,将为第一个记录调用所有三个地图。



因此,下一个 poll()在第一个 poll()的所有记录得到完全处理后,将进行调用。如果你想更频繁地心跳,你需要确保一个 poll()调用获取更少的记录,这样处理所有记录所花费的时间更少,而下一个 poll()将提前触发。



您可以使用 KafkaConsumer的配置参数您可以通过 StreamsConfig 指定完成此操作(参见 https://kafka.apache.org/documentation.html#consumerconfigs ):


streamConfig.put(ConsumerConfig.XXX,VALUE);





  • max。 poll.records :如果您减少此值,将轮询更少的记录

  • session.timeout.ms :如果增加此值,则有更多时间处理数据(为了完整性而添加此数据,因为它实际上是客户端设置而不是服务器/代理端配置 - e如果你知道这个解决方案并且不喜欢它:))




编辑



从Kafka开始 0.10.1 可以(并推荐)为消费者和购买者添加前缀流配置中的配置。这避免了参数冲突,因为一些参数名称用于消费者和生产者,否则无法区分(并且将同时应用于消费者生产者)。
要为参数添加前缀,您可以分别使用 StreamsConfig#consumerPrefix() StreamsConfig#producerPrefix() 。例如:
streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER),VALUE);


还要补充一点:这个问题中描述的场景是一个已知问题,已经有 KIP-62 ,为引入后台线程发送心跳的KafkaConsumer ,从而将心跳与 poll()调用解耦。 Kafka Streams将在即将发布的版本中利用这一新功能。


I am attempting to use <KStream>.process() with a TimeWindows.of("name", 30000) to batch up some KTable values and send them on. It seems that 30 seconds exceeds the consumer timeout interval after which Kafka considers said consumer to be defunct and releases the partition.

I've tried upping the frequency of poll and commit interval to avoid this:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

Unfortunately these errors are still occurring:

(lots of these)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

Followed by these:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

Clearly I need to be sending heartbeats back to the server more often. How?

My topology is:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String>  kt = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

The KTable is grouping values by key every 30 seconds. In Processor.init() I call context.schedule(30000).

DBProcessorSupplier provides an instance of DBProcessor. This is an implementation of AbstractProcessor where all the overrides have been provided. All they do is LOG so I know when each is being hit.

It's a pretty simple topology but it's clear I'm missing a step somewhere.


Edit:

I get that I can adjust this on the server side but Im hoping there is a client-side solution. I like the notion of partitions being made available pretty quickly when a client exits / dies.


Edit:

In an attempt to simplify the problem I removed the aggregation step from the graph. It's now just consumer->processor(). (If I send the consumer directly to .print() it works v quickly so I know it's ok). (Similarly If I output the aggregation (KTable) via .print() it seems ok too).

What I found was that the .process() - which should be calling .punctuate() every 30 seconds is actually blocking for variable lengths of time and outputting somewhat randomly (if at all).

Further:

I set the debug level to 'debug' and reran. Im seeing lots of messages:

DEBUG  o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>

but a breakpoint in the .punctuate() function isn't getting hit. So it's doing lots of work but not giving me a chance to use it.

解决方案

A few clarifications:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG is a lower bound on the commit interval, ie, after a commit, the next commit happens not before this time passed. Basically, Kafka Stream tries to commit ASAP after this time passed, but there is no guarantee whatsoever how long it will actually take to do the next commit.
  • StreamsConfig.POLL_MS_CONFIG is used for the internal KafkaConsumer#poll() call, to specify the maximum blocking time of the poll() call.

Thus, both values are not helpful to heartbeat more often.

Kafka Streams follows a "depth-first" strategy when processing record. This means, that after a poll() for each record all operators of the topology are executed. Let's assume you have three consecutive maps, than all three maps will be called for the first record, before the next/second record will get processed.

Thus, the next poll() call will be made, after all record of the first poll() got fully processed. If you want to heartbeat more often, you need to make sure, that a single poll() call fetches less records, such that processing all records takes less time and the next poll() will be triggered earlier.

You can use configuration parameters for KafkaConsumer that you can specify via StreamsConfig to get this done (see https://kafka.apache.org/documentation.html#consumerconfigs):

streamConfig.put(ConsumerConfig.XXX, VALUE);

  • max.poll.records: if you decrease this value, less record will be polled
  • session.timeout.ms: if you increase this value, there is more time for processing data (adding this for completeness because it is actually a client setting and not a server/broker side configuration -- even if you are aware of this solution and do not like it :))

EDIT

As of Kafka 0.10.1 it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time). To prefix a parameter you can use StreamsConfig#consumerPrefix() or StreamsConfig#producerPrefix(), respectively. For example: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

One more thing to add: The scenario described in this question is a known issue and there is already KIP-62 that introduces a background thread for KafkaConsumer that send heartbeats, thus decoupling heartbeats from poll() calls. Kafka Streams will leverage this new feature in upcoming releases.

这篇关于Kafka KStreams - 处理超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆