kafkaSpout 不发出消息 [英] kafkaSpout is not emitting messages

查看:24
本文介绍了kafkaSpout 不发出消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我可以使用本地集群运行storm Kafka,但不能使用storm Submitter 下面是我的拓扑代码

I am able to run storm Kafka with local cluster but not able to run with storm Submitter below is my topology code

谁能帮我解决这个问题:)

can anyone please help me to solve this issue :)

package com.org.kafka;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;

import kafka.api.OffsetRequest;

public class KafkaTopology {
public static void main(String[] args)
        throws AlreadyAliveException, InvalidTopologyException,
               AuthorizationException {

    ZkHosts zkHosts = new ZkHosts("localhost:2181");

    SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "secondTest", "", "id7");
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

    kafkaConfig.startOffsetTime = OffsetRequest.EarliestTime();
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
    builder.setBolt("Sentence-bolt", new SentenceBolt(), 1).globalGrouping("KafkaSpout");
    builder.setBolt("PrinterBolt", new PrinterBolt(), 1).globalGrouping("SentenceBolt");
        LocalCluster cluster = new LocalCluster();

    Config conf = new Config();
    StormSubmitter.submitTopology("KafkaStormToplogy", conf, builder.createTopology());
    try {
        System.out.println("Waiting to consume from kafka");

        Thread.sleep(10000);

    }

    catch (Exception exception) {

        System.out.println("Thread interrupted exception : " + exception);
    }
    cluster.killTopology("KafkaToplogy");
    cluster.shutdown();

}

}

我在 worker.log 文件中发现以下异常.

I am getting an below exception found in worker.log file.

但是当我查看终端时,它显示已完成提交拓扑:KafkaStormTopology

but when I look into terminal it is showing Finished submitting topology:KafkaStormToplogy

2018-01-24 11:58:38.941 o.a.s.d.worker main [ERROR] Error on initialization of server mk-worker
java.lang.RuntimeException: java.io.InvalidClassException: org.apache.storm.kafka.SpoutConfig; local class incompatible: stream classdesc serialVersionUID = -1247769246497567352, local class serialVersionUID = 6814635004761021338
    at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:254) ~[storm-core-1.0.5.jar:1.0.5]
    at org.apache.storm.utils.Utils.getSetComponentObject(Utils.java:504) ~[storm-core-1.0.5.jar:1.0.5]
    at org.apache.storm.daemon.task$get_task_object.invoke(task.clj:74) ~[storm-core-1.0.5.jar:1.0.5]
    at org.apache.storm.daemon.task$mk_task_data$fn__4609.invoke(task.clj:177) ~[storm-core-1.0.5.jar:1.0.5]
    at org.apache.storm.util$assoc_apply_self.invoke(util.clj:931) ~[storm-core-1.0.5.jar:1.0.5]
    at org.apache.storm.daemon.task$mk_task_data.invoke(task.clj:170) ~[storm-core-1.0.5.jar:1.0.5]
    at org.apache.storm.daemon.task$mk_task.invoke(task.clj:181) ~[storm-core-1.0.5.jar:1.0.5]
    at org.apache.storm.daemon.executor$mk_executor$fn__4830.invoke(executor.clj:371) ~[storm-core-1.0.5.jar:1.0.5]
    at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?]
    at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]
    at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]
    at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
    at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]
    at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) ~[clojure-1.7.0.jar:?]
    at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) ~[clojure-1.7.0.jar:?]

推荐答案

我认为这要么是因为您的 Nimbus 类路径与工作程序类路径上的 Storm-kafka 版本不同,要么是因为您正在运行 Nimbus 和工作程序在不同的 JDK 上.SpoutConfig (https://github.com/apache/storm/blob/1.x-branch/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java) 应该声明一个serialVersionUID,但它没有.参考 https://stackoverflow.com/a/285809/8845188.据我了解,serialVersionUID是JVM在运行时生成的,不同的JDK可能会为同一个类生成不同的编号.

I think this is either because you have different versions of storm-kafka on your Nimbus classpath vs your worker classpath, or because you're running Nimbus and the worker on different JDKs. SpoutConfig (https://github.com/apache/storm/blob/1.x-branch/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java) ought to declare a serialVersionUID, but it doesn't. See for reference https://stackoverflow.com/a/285809/8845188. As I understand it, the serialVersionUID is generated at runtime by the JVM, and different JDKs may generate different numbers for the same class.

我会克隆storm-kafka 并将缺少的serialVersionUID 字段添加到SpoutConfig,构建storm-kafka 并重试.我提出了 https://issues.apache.org/jira/browse/STORM-2911 跟踪修复它.欢迎你来看看.

I would clone storm-kafka and add the missing serialVersionUID field to SpoutConfig, build storm-kafka and try again. I've raised https://issues.apache.org/jira/browse/STORM-2911 to track fixing it. You're welcome to take a look at it.

这篇关于kafkaSpout 不发出消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆