Kafka Consumer 用于 Twitter 流媒体的弃用错误 [英] Deprecation Errors with Kafka Consumer for twitter streaming

查看:37
本文介绍了Kafka Consumer 用于 Twitter 流媒体的弃用错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在研究 Kafka Twitter 流媒体提要数据.

I've been working on Kafka twitter streaming feed data.

我正在关注以下链接中的示例:http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html

I'm following the sample from below link: http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html

我可以使用 Producer 代码并且运行良好.能够获取 Twitter 提要并发送给 Kafka Producer.

I'm able to use Producer code and it is working fine. Able to get twitter feed and send to Kafka Producer.

我无法使用消费者代码,因为它已作为许多 API 的已弃用错误抛出.

I'm not able to use Consumer code, since it has been throwing as deprecated error for many APIs.

这是消费者代码:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
//import kafka.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
//import kafka.consumer.KafkaStream;
//import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


//import org.apache.kafka.clients.producer.KafkaProducer;

public class KafkaConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");

        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        this.topic = topic;
    }

    public void testConsumer() {

     System.out.println("Test Con called");

        Map<String, Integer> topicCount = new HashMap<>();

        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);

        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        System.out.println("For");

        for (final KafkaStream stream : streams) {

            ConsumerIterator<byte[], byte[]> it = stream.iterator();

            System.out.println("Size"+it.length());

            while (it.hasNext()) {
                System.out.println("Stream");
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {

     System.out.println("Started");
     String topic="twittertopic";
     KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic);
     simpleTWConsumer.testConsumer();
     System.out.println("End");
    }    
}

它抛出错误:ConsumerConnector、ConsumerIterator、KafkaStream 已弃用.

It throws error : ConsumerConnector, ConsumerIterator, KafkaStream are deprecated.

ConsumerConfig 不可见.

ConsumerConfig is not visible.

此示例代码是否有固定版本(Twitter 的 Kafka 消费者)?

Is there fixed version of this sample code (Kafka consumer for twitter)?

推荐答案

您所遵循的教程很旧,它使用的是已弃用的旧 Scala Kafka 客户端,请参阅 http://kafka.apache.org/documentation/#legacyapis

The tutorial you are following is very old and it's using the old Scala Kafka clients that have been deprecated, see http://kafka.apache.org/documentation/#legacyapis

已弃用的类有:

  • kafka.consumer.*kafka.javaapi.consumer 改为使用 org.apache.kafka.clients.consumer 下较新的 Java Consumer.*

kafka.producer.*kafka.javaapi.producer 改为使用 org.apache.kafka.clients.producer 下较新的 Java Producer.*

除了使用不推荐使用的类之外​​,您的代码大部分是正确的,我只需要修复一些导入.请参阅下面的固定版本.使用它,我能够使用我正在为名为 twittertopic 的主题生成的消息.

Apart from using deprecated classes, your code was mostly correct, I only had to fix a few imports. See below a fixed version. Using it I was able to consume messages I was producing to a topic called twittertopic.

package example;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MyConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public MyConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {

        Map<String, Integer> topicCount = new HashMap<>();
        topicCount.put(topic, 1);

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);

        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("Message from Single Topic: " + new String(it.next().message()));
            }
        }

        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {
        System.out.println("Started");
        String topic = "twittertopic";
        MyConsumer simpleTWConsumer = new MyConsumer("localhost:2181", "testgroup", topic);
        simpleTWConsumer.testConsumer();
        System.out.println("End");
    }
}

虽然可以使用上面的代码,但下一个主要的 Kafka 版本可能会删除当前已弃用的类,因此您不应使用这些类编写新的逻辑.

While the code above can be used, the next major Kafka release is likely to remove classes that are currently deprecated, so you should not write new logic using these.

相反,您应该开始使用 Java 客户端,您可以使用 Github 上提供的示例:https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples

Instead you should get started with the Java clients, you can use the examples provided on Github: https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples

使用新的 Java Consumer,您的逻辑将如下所示:

Using the new Java Consumer, your logic would look like:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MyConsumer {

    static final String TOPIC = "twittertopic";
    static final String GROUP = "testgroup";

    public static void main(String[] args) {
        System.out.println("Started");

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", GROUP);
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {
            consumer.subscribe(Arrays.asList(TOPIC));

            for (int i = 0; i < 1000; i++) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
                System.out.println("Size: " + records.count());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received a message: " + record.key() + " " + record.value());
                }
            }
        }
        System.out.println("End");
    }

}

这篇关于Kafka Consumer 用于 Twitter 流媒体的弃用错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆