Storm KafkaSpout 停止使用来自 Kafka Topic 的消息 [英] Storm KafkaSpout stopped to consume messages from Kafka Topic

查看:24
本文介绍了Storm KafkaSpout 停止使用来自 Kafka Topic 的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是 Storm KafkaSpout 在一段时间后停止使用来自 Kafka 主题的消息.在 Storm 中启用调试时,我得到这样的日志文件:

My problem is that Storm KafkaSpout stopped to consume messages from Kafka topic after a period of time. When debug is enabled in storm, I get the log file like this:

2016-07-05 03:58:26.097 oasdtask [INFO] 发射:packet_spout __metrics [#object[org.apache.storm.metric.api.IMetricsConsumer$TaskInfo 0x2c35b34f "org.apache.storm.metric.api.IMetricsConsumer$TaskInfo@2c35b34f"] [#object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x798f1e35 "[__ack-count = {default=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x230867ec[__sendqueue = {sojourn_time_ms=0.0, write_pos=5411461, read_pos=5411461, overflow=0,arrival_rate_secs=0.0, capacity=10}]"#object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x7cdec8eb[__complete-latency = {default=0.0}]"]#object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x658fc59"[__skipped-max-spout = 0]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x3c1f3a50 "[__receive = {sojourn_time_ms=4790.5, write_pos=2468305, read_pos=24680, 到达率_秒=0.20874647740319383,容量=1024,人口=1}]"]#object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x262d7906[__skipped-inactive = 0]"]#object[org.apache.Storm.metric.api.IMetricsConsumer$DataPoint 0x73648c7e "[kafkaPartition = {Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPICallCount=0, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPILatencyMax=null, Partition{host=slave103:9092, topic=packet, partition=12}/lostMessageCount=0, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPILatencyMean=null,分区{host=slave103:9092, topic=packet, partition=12}/fetchAPIMessageCount=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x4e43df61 "[kafkaOffset = {packet/totalLatestCompletedOffset=154305947,packet/partition_12/spoutLag=82472754,packet/totalEarliestTimeOffset=233919465,packet/partition_12/earliestTimeOffset=233919465,packet/partition_12/latest Offset=p161430acket/partition_12/latestTimeOffset=236778701,packet/totalLatestEmittedOffset=154307691,packet/partition_12/latestCompletedOffset=154305947,packet/totalLatestTimeOffset=236778701,packet/totalLatestEmittedOffset=154307691,packet/totalLatestTimeOffset=236778701,packet/totalLatestEmittedOffset=154307691.IMetricsConsumer$DataPoint 0x49fe816b "[__transfer-count = {__ack_init=0, default=0, __metrics=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x63e2bdc0 "[__fail-count = {}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x3b17bb7b "[__skipped-throttle = 1086120]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x1315a68c[__emit-count = {__ack_init=0, default=0, __metrics=0}]"]]]

2016-07-05 03:58:26.097 o.a.s.d.task [INFO] Emitting: packet_spout __metrics [#object[org.apache.storm.metric.api.IMetricsConsumer$TaskInfo 0x2c35b34f "org.apache.storm.metric.api.IMetricsConsumer$TaskInfo@2c35b34f"] [#object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x798f1e35 "[__ack-count = {default=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x230867ec "[__sendqueue = {sojourn_time_ms=0.0, write_pos=5411461, read_pos=5411461, overflow=0, arrival_rate_secs=0.0, capacity=1024, population=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x7cdec8eb "[__complete-latency = {default=0.0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x658fc59 "[__skipped-max-spout = 0]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x3c1f3a50 "[__receive = {sojourn_time_ms=4790.5, write_pos=2468305, read_pos=2468304, overflow=0, arrival_rate_secs=0.20874647740319383, capacity=1024, population=1}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x262d7906 "[__skipped-inactive = 0]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x73648c7e "[kafkaPartition = {Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPICallCount=0, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPILatencyMax=null, Partition{host=slave103:9092, topic=packet, partition=12}/lostMessageCount=0, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPILatencyMean=null, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPIMessageCount=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x4e43df61 "[kafkaOffset = {packet/totalLatestCompletedOffset=154305947, packet/partition_12/spoutLag=82472754, packet/totalEarliestTimeOffset=233919465, packet/partition_12/earliestTimeOffset=233919465, packet/partition_12/latestEmittedOffset=154307691, packet/partition_12/latestTimeOffset=236778701, packet/totalLatestEmittedOffset=154307691, packet/partition_12/latestCompletedOffset=154305947, packet/totalLatestTimeOffset=236778701, packet/totalSpoutLag=82472754}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x49fe816b "[__transfer-count = {__ack_init=0, default=0, __metrics=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x63e2bdc0 "[__fail-count = {}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x3b17bb7b "[__skipped-throttle = 1086120]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x1315a68c "[__emit-count = {__ack_init=0, default=0, __metrics=0}]"]]]

2016-07-05 03:58:55.042 oasdexecutor [INFO] 正在处理收到的消息 FOR -2 TUPLE:来源:__system:-1,流:__tick,id:{},[30]

2016-07-05 03:58:55.042 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __tick, id: {}, [30]

2016-07-05 03:59:25.042 oasdexecutor [INFO] 正在处理收到的消息 FOR -2 TUPLE:来源:__system:-1,流:__tick,id:{},[30]

2016-07-05 03:59:25.042 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __tick, id: {}, [30]

2016-07-05 03:59:25.946 oasdexecutor [INFO] 正在处理收到的消息 FOR -2 TUPLE:来源:__system:-1,流:__metrics_tick,id:{},[60]

2016-07-05 03:59:25.946 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]

我的测试拓扑非常简单,一个 KafkaSpout 和另一个 Counter Bolt.当拓扑正常时,FORTUPLE 之间的值为正数;当拓扑停止消费消息时,该值变为负数.所以我很好奇是什么导致了Processing received message FOR -2 TUPLE 的问题,以及如何解决这个问题?

My test topology is really simple, One KafkaSpout and another Counter Bolt. When the topology works fine, the value between FOR and TUPLE is a positive number; when the topology stops to consume the message, the value becomes negative. so I'm curious about what causes the problem of Processing received message FOR -2 TUPLE, and how to fix this problem?

顺便说一下,我的实验环境是:

By the way, my experiment environment is:

操作系统:Red Hat Enterprise Linux Server 7.0 (Maipo)
卡夫卡:0.10.0.0
风暴:1.0.1

OS: Red Hat Enterprise Linux Server release 7.0 (Maipo)
Kafka: 0.10.0.0
Storm: 1.0.1

推荐答案

在 stom 邮件列表的帮助下,我能够调整 KafkaSpout 并解决问题.以下设置对我有用.

With the help from the stom mail list I was able to tune KafkaSpout and resolve the issue. The following settings work for me.

config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2048);
config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false);
config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

我通过发送 20k-50k 批次进行测试,在突发之间有 1 秒的停顿.每条消息为 2048 字节.

I tested by sending 20k-50k batches with 1sec pause between bursts. Each message was 2048 bytes.

我正在运行 3 节点集群,我的拓扑有 4 个 spout,主题有 64 个分区.

I am running 3 node cluster, my topology has 4 spouts and topic has 64 partitions.

200M 消息后它仍然有效......

After 200M messages its still working....

这篇关于Storm KafkaSpout 停止使用来自 Kafka Topic 的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆