在Kafka-storm中未能将偏移数据写入Zookeeper [英] Failing to write offset data to zookeeper in kafka-storm
问题描述
我当时正在建立一个风暴集群,以计算实时趋势和其他统计数据,但是,通过允许最后一次由kafka-spout
读取的偏移量(源),我在将恢复"功能引入该项目时遇到了一些问题. kafka-spout
的代码来自 https://github.com/apache/incubator-storm/tree/master/external/storm-kafka ).我以这种方式启动kafka-spout
:
I was setting up a storm cluster to calculate real time trending and other statistics, however I have some problems introducing the "recovery" feature into this project, by allowing the offset that was last read by the kafka-spout
(the source code for kafka-spout
comes from https://github.com/apache/incubator-storm/tree/master/external/storm-kafka) to be remembered. I start my kafka-spout
in this way:
BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);
默认设置应该执行此操作,但是对于我而言,我认为这样做并非如此,每次我启动项目时,PartitionManager
都会尝试查找具有偏移量的文件,但未找到任何内容:
The default settings should be doing this, but I think it is not doing so in my case, every time I start my project, the PartitionManager
tries to look for the file with the offsets, then nothing is found:
2014-06-25 11:57:08 INFO PartitionManager:73 - Read partition information from: /storm/partition_1 --> null
2014-06-25 11:57:08 INFO PartitionManager:86 - No partition information found, using configuration to determine offset
然后从最近的可能偏移开始读取.如果我的项目从未失败过,但又不是我想要的,那没关系.
Then it starts reading from the latest possible offset. Which is okay if my project never fails, but not exactly what I wanted.
我还进一步研究了PartitionManager
类,该类使用Zkstate
类从以下代码片段中编写偏移量:
I also looked a bit more into the PartitionManager
class which uses Zkstate
class to write the offsets, from this code snippet:
PartitionManeger
PartitionManeger
public void commit() {
long lastCompletedOffset = lastCompletedOffset();
if (_committedTo != lastCompletedOffset) {
LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", lastCompletedOffset)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
_committedTo = lastCompletedOffset;
LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
} else {
LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
}
}
ZkState
public void writeBytes(String path, byte[] bytes) {
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, bytes);
} else {
_curator.setData().forPath(path, bytes);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
我可以看到,对于第一条消息,writeBytes
方法进入if
块并尝试创建路径,然后对于第二条消息,它进入else
块,这似乎没问题.但是,当我再次启动该项目时,会出现与上述相同的消息.找不到partition information
.
I could see that for the first message, the writeBytes
method gets into the if
block and tries to create a path, then for the second message it goes into the else
block, which seems to be ok. But when I start the project again, the same message as mentioned above shows up. No partition information
can be found.
推荐答案
我遇到了同样的问题.原来,我是在本地模式下运行的,它使用的是内存中的Zookeeper,而不是Kafka使用的Zookeeper.
I had the same problem. Turned out I was running in local mode which uses an in memory zookeeper and not the zookeeper that Kafka is using.
要确保KafkaSpout不在存储偏移量的ZkState
中使用Storm的ZooKeeper,除了ZkHosts
外,还需要设置SpoutConfig.zkServers
,SpoutConfig.zkPort
和SpoutConfig.zkRoot
.例如
To make sure that KafkaSpout doesn't use Storm's ZooKeeper for the ZkState
that stores the offset, you need to set the SpoutConfig.zkServers
, SpoutConfig.zkPort
, and SpoutConfig.zkRoot
in addition to the ZkHosts
. For example
import org.apache.zookeeper.client.ConnectStringParser;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;
...
final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr);
final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses();
final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size());
final Integer zkPort = serverInetAddresses.get(0).getPort();
for (InetSocketAddress serverInetAddress : serverInetAddresses) {
serverAddresses.add(serverInetAddress.getHostName());
}
final ZkHosts zkHosts = new ZkHosts(zkConnectStr);
zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath;
final SpoutConfig spoutConfig = new SpoutConfig(zkHosts, inputTopic, kafkaZnode, kafkaConsumerGroup);
spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme);
spoutConfig.zkServers = serverAddresses;
spoutConfig.zkPort = zkPort;
spoutConfig.zkRoot = kafkaZnode;
这篇关于在Kafka-storm中未能将偏移数据写入Zookeeper的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!