Kafka快速入门:我需要什么依赖? [英] Kafka Quickstart: What Dependencies do I need?

查看:597
本文介绍了Kafka快速入门:我需要什么依赖?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在通过kafka快速入门:

I am working through the kafka quickstart:

http: //kafka.apache.org/07/quickstart.html

和基本的消费者群体示例:

and the basic Consumer Group example:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+组+示例

我已将Consumer和ConsumerThreadPool编码如上:

I have coded up the Consumer and ConsumerThreadPool as above:

import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;

public class Consumer implements Runnable {

    private KafkaStream m_stream;
    private Integer m_threadNumber;

    public Consumer(KafkaStream a_stream, Integer a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext()) {
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));

        }
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

其他几个方面:我使用spring来管理我的zookeeper:

A couple of other facets: I am using spring to manage my zookeeper:

import javax.inject.Named;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {

    @Bean
    @Named("consumerConfig")
    private static ConsumerConfig createConsumerConfig() {
        String zookeeperAddress = "127.0.0.1:2181";
        String groupId = "inventory";
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeperAddress);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }
}

我正在使用Maven和OneJar Maven插件进行编译。但是,我编译然后运行生成的一个jar我得到以下错误:

And I am compiling with Maven and the OneJar Maven plugin. However, I compile and then run the resulting one jar I get the following error:

Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803)
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521)
at java.lang.Class.getDeclaredMethods(Class.java:1845)
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180)
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222)
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165)
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223)
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461)
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31)
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20)
... 6 more
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 27 more

现在,我对Kafka知之甚少,而对Scala一无所知。我该如何解决?接下来我该怎么办?这是一个已知的问题?我需要其他依赖项吗?这是我的pom.xml中的kafka版本:

Now, I know little about Kafka, and nothing about Scala. How do I fix this? What should i try next? Is this a known issue? Do I need other dependencies? Here is the kafka version in my pom.xml:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.0-beta1</version>
</dependency> 

更新:我联系了Kafka dev邮件列表,他们让我知道了一些特定的版本要求。 scala依赖项。但是,还有一个未记录的log4j依赖项,这会导致另一个运行时,而不是编译时异常。

Update: I contacted the Kafka dev mailing list, and they let me know some specific version requirements for the scala dependencies. However, there is also an undocumented log4j dependency, which results in another runtime, not compile time, exception.

Exception in thread "main" java.lang.reflect.InvocationTargetException
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
at org.apache.log4j.Category.log(Category.java:333)
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)

另一个更新:

我找到了正确的log4j依赖:

I found the correct log4j dependency:

    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>

但现在我遇到了更加神秘的运行时异常:

But now I am met with an even more cryptic runtime exception:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)

此时我得到了WTF有种感觉。所以我添加了另一个依赖项:

At this point I got the WTF kind of feeling. So I added another dependency:

    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.3</version>
    </dependency>

但这暴露了另一个运行时异常:

But this exposed yet another runtime exception:

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)

我希望能够让这个婴儿出现并运行,但也许这是使用beta产品需要付出的代价吗?也许我应该切换到Apache Active MQ。但这听起来不那么有趣。我错过了什么吗?

I am hoping to be able to get this baby example up and running, but maybe this is the price to pay for using beta products? Maybe I should switch to Apache Active MQ. But that sounds less fun. Am I missing something?

推荐答案

问题是 kafka beta的构建方式是使用jar生成的pom无效且maven无法识别它并正确解析,从而获取传递依赖性。我们已经设法通过在我们的pom定义中从该pom(scala,zk等)中获取所有依赖项来缓解此问题。我们正在等待kafka的下一个beta版本,其中问题将得到解决。

The problem is that kafka beta was built in a way that pom generated with a jar isn't valid and maven could not recognize it and parse properly, thus fetching transitive dependencies. We've managed to mitigate this problem by enlisting all of the dependencies from that pom (scala, zk, etc) in our pom definition. We're waiting for next beta builds of kafka, in which problem will be fixed.

完整依赖项列表如下。请注意,您必须相应地更改scala版本依赖关系到kafka工件的后缀。

Full dependencies list is below. Note that you have to change scala version dependency accordingly to the postfix of your kafka artifact.

<dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
            <exclusions>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.jms</groupId>
                    <artifactId>jms</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>net.sf.jopt-simple</groupId>
            <artifactId>jopt-simple</artifactId>
            <version>3.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.3</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-core</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.yammer.metrics</groupId>
            <artifactId>metrics-annotation</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.easymock</groupId>
            <artifactId>easymock</artifactId>
            <version>3.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest</artifactId>
            <version>1.2</version>
            <scope>test</scope>
        </dependency>

至于


也许我应该切换到Apache Active MQ。但这听起来不那么有趣。
我错过了什么吗?

Maybe I should switch to Apache Active MQ. But that sounds less fun. Am I missing something?

好吧,你不要忘记这是 beta 发布?确实发生了一些不好的事情,但目前我们正在运行kafka 0.7而没有任何努力

Well, don't you forget that this is the beta release? Some bad things are happening, indeed, but currently we're running kafka 0.7 without any efforts.

这篇关于Kafka快速入门:我需要什么依赖?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆