在kafka-storm中无法将偏移数据写入zookeeper [英] Failing to write offset data to zookeeper in kafka-storm
问题描述
我正在设置一个风暴集群来计算实时趋势和其他统计数据,但是我在将恢复"功能引入该项目时遇到了一些问题,因为它允许 kafka-spout 最后读取的偏移量
(kafka-spout
的源代码来自https://github.com/apache/incubator-storm/tree/master/external/storm-kafka)被记住.我以这种方式启动我的 kafka-spout
:
I was setting up a storm cluster to calculate real time trending and other statistics, however I have some problems introducing the "recovery" feature into this project, by allowing the offset that was last read by the kafka-spout
(the source code for kafka-spout
comes from https://github.com/apache/incubator-storm/tree/master/external/storm-kafka) to be remembered. I start my kafka-spout
in this way:
BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);
默认设置应该这样做,但我认为在我的情况下不是这样做的,每次我开始我的项目时,PartitionManager
都会尝试查找带有偏移量的文件,然后什么也没找到:
The default settings should be doing this, but I think it is not doing so in my case, every time I start my project, the PartitionManager
tries to look for the file with the offsets, then nothing is found:
2014-06-25 11:57:08 INFO PartitionManager:73 - Read partition information from: /storm/partition_1 --> null
2014-06-25 11:57:08 INFO PartitionManager:86 - No partition information found, using configuration to determine offset
然后它从最新的可能偏移量开始读取.如果我的项目永远不会失败,这没关系,但不完全是我想要的.
Then it starts reading from the latest possible offset. Which is okay if my project never fails, but not exactly what I wanted.
我还从这个代码片段中更多地研究了 PartitionManager
类,它使用 Zkstate
类来编写偏移量:
I also looked a bit more into the PartitionManager
class which uses Zkstate
class to write the offsets, from this code snippet:
分区管理器
public void commit() {
long lastCompletedOffset = lastCompletedOffset();
if (_committedTo != lastCompletedOffset) {
LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
.put("offset", lastCompletedOffset)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
_committedTo = lastCompletedOffset;
LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
} else {
LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
}
}
ZkState
public void writeBytes(String path, byte[] bytes) {
try {
if (_curator.checkExists().forPath(path) == null) {
_curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, bytes);
} else {
_curator.setData().forPath(path, bytes);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
我可以看到,对于第一条消息,writeBytes
方法进入 if
块并尝试创建路径,然后对于第二条消息,它进入else
块,这似乎没问题.但是当我再次启动该项目时,会出现与上述相同的消息.找不到分区信息
.
I could see that for the first message, the writeBytes
method gets into the if
block and tries to create a path, then for the second message it goes into the else
block, which seems to be ok. But when I start the project again, the same message as mentioned above shows up. No partition information
can be found.
推荐答案
我遇到了同样的问题.结果我在本地模式下运行,它使用内存中的动物园管理员,而不是 Kafka 正在使用的动物园管理员.
I had the same problem. Turned out I was running in local mode which uses an in memory zookeeper and not the zookeeper that Kafka is using.
为了确保KafkaSpout不为存储偏移量的ZkState
使用Storm的ZooKeeper,你需要设置SpoutConfig.zkServers
、SpoutConfig.除了
和 ZkHosts
之外,zkPortSpoutConfig.zkRoot
.例如
To make sure that KafkaSpout doesn't use Storm's ZooKeeper for the ZkState
that stores the offset, you need to set the SpoutConfig.zkServers
, SpoutConfig.zkPort
, and SpoutConfig.zkRoot
in addition to the ZkHosts
. For example
import org.apache.zookeeper.client.ConnectStringParser;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;
...
final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr);
final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses();
final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size());
final Integer zkPort = serverInetAddresses.get(0).getPort();
for (InetSocketAddress serverInetAddress : serverInetAddresses) {
serverAddresses.add(serverInetAddress.getHostName());
}
final ZkHosts zkHosts = new ZkHosts(zkConnectStr);
zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath;
final SpoutConfig spoutConfig = new SpoutConfig(zkHosts, inputTopic, kafkaZnode, kafkaConsumerGroup);
spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme);
spoutConfig.zkServers = serverAddresses;
spoutConfig.zkPort = zkPort;
spoutConfig.zkRoot = kafkaZnode;
这篇关于在kafka-storm中无法将偏移数据写入zookeeper的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!