在kafka-storm中无法将偏移数据写入zookeeper [英] Failing to write offset data to zookeeper in kafka-storm

查看:21
本文介绍了在kafka-storm中无法将偏移数据写入zookeeper的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在设置一个风暴集群来计算实时趋势和其他统计数据,但是我在将恢复"功能引入该项目时遇到了一些问题,因为它允许 kafka-spout 最后读取的偏移量(kafka-spout 的源代码来自https://github.com/apache/incubator-storm/tree/master/external/storm-kafka)被记住.我以这种方式启动我的 kafka-spout:

I was setting up a storm cluster to calculate real time trending and other statistics, however I have some problems introducing the "recovery" feature into this project, by allowing the offset that was last read by the kafka-spout (the source code for kafka-spout comes from https://github.com/apache/incubator-storm/tree/master/external/storm-kafka) to be remembered. I start my kafka-spout in this way:

BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);

默认设置应该这样做,但我认为在我的情况下不是这样做的,每次我开始我的项目时,PartitionManager 都会尝试查找带有偏移量的文件,然后什么也没找到:

The default settings should be doing this, but I think it is not doing so in my case, every time I start my project, the PartitionManager tries to look for the file with the offsets, then nothing is found:

2014-06-25 11:57:08 INFO  PartitionManager:73 - Read partition information from: /storm/partition_1  --> null
2014-06-25 11:57:08 INFO  PartitionManager:86 - No partition information found, using configuration to determine offset

然后它从最新的可能偏移量开始读取.如果我的项目永远不会失败,这没关系,但不完全是我想要的.

Then it starts reading from the latest possible offset. Which is okay if my project never fails, but not exactly what I wanted.

我还从这个代码片段中更多地研究了 PartitionManager 类,它使用 Zkstate 类来编写偏移量:

I also looked a bit more into the PartitionManager class which uses Zkstate class to write the offsets, from this code snippet:

分区管理器

public void commit() {
    long lastCompletedOffset = lastCompletedOffset();
    if (_committedTo != lastCompletedOffset) {
        LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
        Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                        "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                .put("offset", lastCompletedOffset)
                .put("partition", _partition.partition)
                .put("broker", ImmutableMap.of("host", _partition.host.host,
                        "port", _partition.host.port))
                .put("topic", _spoutConfig.topic).build();
        _state.writeJSON(committedPath(), data);

        _committedTo = lastCompletedOffset;
        LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
    } else {
        LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
    }
}

ZkState

public void writeBytes(String path, byte[] bytes) {
    try {
        if (_curator.checkExists().forPath(path) == null) {
            _curator.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(path, bytes);
        } else {
            _curator.setData().forPath(path, bytes);
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

我可以看到,对于第一条消息,writeBytes 方法进入 if 块并尝试创建路径,然后对于第二条消息,它进入else 块,这似乎没问题.但是当我再次启动该项目时,会出现与上述相同的消息.找不到分区信息.

I could see that for the first message, the writeBytes method gets into the if block and tries to create a path, then for the second message it goes into the else block, which seems to be ok. But when I start the project again, the same message as mentioned above shows up. No partition information can be found.

推荐答案

我遇到了同样的问题.结果我在本地模式下运行,它使用内存中的动物园管理员,而不是 Kafka 正在使用的动物园管理员.

I had the same problem. Turned out I was running in local mode which uses an in memory zookeeper and not the zookeeper that Kafka is using.

为了确保KafkaSpout不为存储偏移量的ZkState使用Storm的ZooKeeper,你需要设置SpoutConfig.zkServersSpoutConfig.除了 ZkHosts 之外,zkPortSpoutConfig.zkRoot.例如

To make sure that KafkaSpout doesn't use Storm's ZooKeeper for the ZkState that stores the offset, you need to set the SpoutConfig.zkServers, SpoutConfig.zkPort, and SpoutConfig.zkRoot in addition to the ZkHosts. For example

import org.apache.zookeeper.client.ConnectStringParser;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;

...

    final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr);
    final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses();
    final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size());
    final Integer zkPort = serverInetAddresses.get(0).getPort();
    for (InetSocketAddress serverInetAddress : serverInetAddresses) {
        serverAddresses.add(serverInetAddress.getHostName());
    }

    final ZkHosts zkHosts = new ZkHosts(zkConnectStr);
    zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath;

    final SpoutConfig spoutConfig = new SpoutConfig(zkHosts, inputTopic, kafkaZnode, kafkaConsumerGroup);
    spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme);

    spoutConfig.zkServers = serverAddresses;
    spoutConfig.zkPort = zkPort;
    spoutConfig.zkRoot = kafkaZnode;

这篇关于在kafka-storm中无法将偏移数据写入zookeeper的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆