Storm KafkaSpout停止使用来自Kafka Topic的消息 [英] Storm KafkaSpout stopped to consume messages from Kafka Topic

查看:189
本文介绍了Storm KafkaSpout停止使用来自Kafka Topic的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是Storm KafkaSpout在一段时间后停止使用来自Kafka主题的消息。在storm中启用调试时,我得到如下日志文件:

My problem is that Storm KafkaSpout stopped to consume messages from Kafka topic after a period of time. When debug is enabled in storm, I get the log file like this:


2016-07-05 03:58:26.097 oasdtask [ INFO] Emitting:packet_spout __metrics [#object [org.apache.storm.metric.api.IMetricsConsumer $ TaskInfo 0x2c35b34forg.apache.storm.metric.api.IMetricsConsumer$TaskInfo@2c35b34f] [#object [org.apache。 storm.metric.api.IMetricsConsumer $ DataPoint 0x798f1e35[__ack-count = {default = 0}]] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x230867ec[__sendqueue = {sojourn_time_ms = 0.0, write_pos = 5411461,read_pos = 5411461,overflow = 0,arrival_rate_secs = 0.0,capacity = 1024,population = 0}]] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x7cdec8eb[__complete-latency = {default = 0.0}]] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x658fc59[__skipped-max-spout = 0]] #object [org.apache.storm.metric.api。 IMetricsConsumer $ DataPoint 0x3c1f3a50[__receive = {sojourn_time_ms = 4790.5,write_pos = 2468305,read_pos = 24 68304,overflow = 0,arrival_rate_secs = 0.20874647740319383,capacity = 1024,population = 1}]] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x262d7906[__skipped-inactive = 0]] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x73648c7e[kafkaPartition = {Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPICallCount = 0,Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPILatencyMax = null,Partition {host = slave103:9092,topic = packet,partition = 12} / lostMessageCount = 0,Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPILatencyMean = null,Partition {host = slave103:9092,topic = packet,partition = 12} / fetchAPIMessageCount = 0}]] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x4e43df61[kafkaOffset = { packet / totalLatestCompletedOffset = 154305947,packet / partition_12 / spoutLag = 82472754,packet / totalEarliestTimeOffset = 233919465,packet / partition_12 / earliestTimeOffset = 233919465,packet / partition_12 / latestEmittedOffse t = 154307691,packet / partition_12 / latestTimeOffset = 236778701,packet / totalLatestEmittedOffset = 154307691,packet / partition_12 / latestCompletedOffset = 154305947,packet / totalLatestTimeOffset = 236778701,packet / totalSpoutLag = 82472754}]] #object [org.apache.storm。 metric.api.IMetricsConsumer $ DataPoint 0x49fe816b[__transfer-count = {__ bag_init = 0,default = 0,__ metric = 0}]] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x63e2bdc0[__fail -count = {}]] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x3b17bb7b[__ skipped-throttle = 1086120]] #object [org.apache.storm.metric.api.IMetricsConsumer $ DataPoint 0x1315a68c[__emit-count = {__ack_init = 0,默认= 0,__ metrics = 0}]]]]

2016-07-05 03:58:26.097 o.a.s.d.task [INFO] Emitting: packet_spout __metrics [#object[org.apache.storm.metric.api.IMetricsConsumer$TaskInfo 0x2c35b34f "org.apache.storm.metric.api.IMetricsConsumer$TaskInfo@2c35b34f"] [#object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x798f1e35 "[__ack-count = {default=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x230867ec "[__sendqueue = {sojourn_time_ms=0.0, write_pos=5411461, read_pos=5411461, overflow=0, arrival_rate_secs=0.0, capacity=1024, population=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x7cdec8eb "[__complete-latency = {default=0.0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x658fc59 "[__skipped-max-spout = 0]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x3c1f3a50 "[__receive = {sojourn_time_ms=4790.5, write_pos=2468305, read_pos=2468304, overflow=0, arrival_rate_secs=0.20874647740319383, capacity=1024, population=1}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x262d7906 "[__skipped-inactive = 0]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x73648c7e "[kafkaPartition = {Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPICallCount=0, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPILatencyMax=null, Partition{host=slave103:9092, topic=packet, partition=12}/lostMessageCount=0, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPILatencyMean=null, Partition{host=slave103:9092, topic=packet, partition=12}/fetchAPIMessageCount=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x4e43df61 "[kafkaOffset = {packet/totalLatestCompletedOffset=154305947, packet/partition_12/spoutLag=82472754, packet/totalEarliestTimeOffset=233919465, packet/partition_12/earliestTimeOffset=233919465, packet/partition_12/latestEmittedOffset=154307691, packet/partition_12/latestTimeOffset=236778701, packet/totalLatestEmittedOffset=154307691, packet/partition_12/latestCompletedOffset=154305947, packet/totalLatestTimeOffset=236778701, packet/totalSpoutLag=82472754}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x49fe816b "[__transfer-count = {__ack_init=0, default=0, __metrics=0}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x63e2bdc0 "[__fail-count = {}]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x3b17bb7b "[__skipped-throttle = 1086120]"] #object[org.apache.storm.metric.api.IMetricsConsumer$DataPoint 0x1315a68c "[__emit-count = {__ack_init=0, default=0, __metrics=0}]"]]]

2016-07-05 03:58: 55.042 oasdexecutor [INFO] 处理收到的消息FOR -2 TUPLE :source:__system:-1,stream:__tick,id:{},[30]

2016-07-05 03:58:55.042 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __tick, id: {}, [30]

2016-07-05 03:59:25.042 oasdexecutor [INFO] 处理收到的消息FOR -2 TUPLE :sour ce:__system:-1,stream:__tick,id:{},[30]

2016-07-05 03:59:25.042 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __tick, id: {}, [30]

2016-07-05 03:59:25.946 oasdexecutor [INFO] < b>处理收到的消息FOR -2 TUPLE :source:__system:-1,stream:__ metric_tick,id:{},[60]

2016-07-05 03:59:25.946 o.a.s.d.executor [INFO] Processing received message FOR -2 TUPLE: source: __system:-1, stream: __metrics_tick, id: {}, [60]

我的测试拓扑非常简单,One KafkaSpout和另一个Counter Bolt。当拓扑工作正常时, FOR TUPLE 之间的值为正数;当拓扑停止使用消息时,该值变为负数。所以我很好奇是什么原因造成处理收到的消息FOR -2 TUPLE 的问题,以及如何解决这个问题?

My test topology is really simple, One KafkaSpout and another Counter Bolt. When the topology works fine, the value between FOR and TUPLE is a positive number; when the topology stops to consume the message, the value becomes negative. so I'm curious about what causes the problem of Processing received message FOR -2 TUPLE, and how to fix this problem?

由方式,我的实验环境是:

By the way, my experiment environment is:


操作系统:Red Hat Enterprise Linux Server 7.0版(Maipo)

Kafka: 0.10.0.0

风暴:1.0.1

OS: Red Hat Enterprise Linux Server release 7.0 (Maipo)
Kafka: 0.10.0.0
Storm: 1.0.1


推荐答案

随着来自stom邮件列表的帮助我能够调整KafkaSpout并解决问题。以下设置适用于我。

With the help from the stom mail list I was able to tune KafkaSpout and resolve the issue. The following settings work for me.

config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2048);
config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false);
config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

我通过发送20k-50k批次进行测试,突发之间暂停1秒。每条消息都是2048字节。

I tested by sending 20k-50k batches with 1sec pause between bursts. Each message was 2048 bytes.

我正在运行3节点集群,我的拓扑有4个喷口,主题有64个分区。

I am running 3 node cluster, my topology has 4 spouts and topic has 64 partitions.

在200M消息之后它仍在工作....

After 200M messages its still working....

这篇关于Storm KafkaSpout停止使用来自Kafka Topic的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆