Kafka Consumer用于Twitter流的弃用错误 [英] Deprecation Errors with Kafka Consumer for twitter streaming

查看:54
本文介绍了Kafka Consumer用于Twitter流的弃用错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在研究Kafka Twitter的流式提要数据.

I've been working on Kafka twitter streaming feed data.

我正在跟踪以下链接中的示例: http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html

I'm following the sample from below link: http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html

我可以使用Producer代码,并且工作正常.能够获取Twitter提要并发送给Kafka Producer.

I'm able to use Producer code and it is working fine. Able to get twitter feed and send to Kafka Producer.

我无法使用消费者代码,因为许多API都将其视为已弃用的错误.

I'm not able to use Consumer code, since it has been throwing as deprecated error for many APIs.

这是消费者代码:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
//import kafka.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
//import kafka.consumer.KafkaStream;
//import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


//import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");

        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        this.topic = topic;
    }

    public void testConsumer() {

     System.out.println("Test Con called");

        Map<String, Integer> topicCount = new HashMap<>();

        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        System.out.println("For");

        for (final KafkaStream stream : streams) {

            ConsumerIterator<byte[], byte[]> it = stream.iterator();

            System.out.println("Size"+it.length());

            while (it.hasNext()) {
                System.out.println("Stream");
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {

     System.out.println("Started");
     String topic="twittertopic";
     KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic);
     simpleTWConsumer.testConsumer();
     System.out.println("End");
    }    
}

它引发错误:不推荐使用ConsumerConnector,ConsumerIterator,KafkaStream.

It throws error : ConsumerConnector, ConsumerIterator, KafkaStream are deprecated.

ConsumerConfig不可见.

ConsumerConfig is not visible.

此示例代码是否有固定版本(Twitter的Kafka使用者)?

Is there fixed version of this sample code (Kafka consumer for twitter)?

推荐答案

您正在遵循的教程非常古老,它使用的是已弃用的旧Scala Kafka客户端,请参见

The tutorial you are following is very old and it's using the old Scala Kafka clients that have been deprecated, see http://kafka.apache.org/documentation/#legacyapis

已弃用的类为:

  • kafka.consumer.* kafka.javaapi.consumer ,而是在 org.apache.kafka.clients.consumer下使用较新的Java Consumer..*

kafka.producer.* kafka.javaapi.producer 代替,使用 org.apache.kafka.clients.producer下的较新的Java Producer..*

除了使用不推荐使用的类之外​​,您的代码基本上是正确的,我只需要修复一些导入.参见下面的固定版本.使用它,我可以使用正在产生的消息发送到名为 twittertopic 的主题.

Apart from using deprecated classes, your code was mostly correct, I only had to fix a few imports. See below a fixed version. Using it I was able to consume messages I was producing to a topic called twittertopic.

package example;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MyConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public MyConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {

        Map<String, Integer> topicCount = new HashMap<>();
        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {
        System.out.println("Started");
        String topic = "twittertopic";
        MyConsumer simpleTWConsumer = new MyConsumer("localhost:2181", "testgroup", topic);
        simpleTWConsumer.testConsumer();
        System.out.println("End");
    }
}

虽然可以使用上面的代码,但下一个主要的Kafka版本可能会删除当前不推荐使用的类,因此您不应使用这些类编写新的逻辑.

While the code above can be used, the next major Kafka release is likely to remove classes that are currently deprecated, so you should not write new logic using these.

相反,您应该开始使用Java客户端,可以使用Github上提供的示例:

Instead you should get started with the Java clients, you can use the examples provided on Github: https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples

使用新的Java Consumer,您的逻辑将如下所示:

Using the new Java Consumer, your logic would look like:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

    static final String TOPIC = "twittertopic";
    static final String GROUP = "testgroup";

    public static void main(String[] args) {
        System.out.println("Started");

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", GROUP);
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {
            consumer.subscribe(Arrays.asList(TOPIC));

            for (int i = 0; i < 1000; i++) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
                System.out.println("Size: " + records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received a message: " + record.key() + " " + record.value());
                }
            }
        }
        System.out.println("End");
    }

}

这篇关于Kafka Consumer用于Twitter流的弃用错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆