我的Java使用者为什么不读取我创建的数据? [英] Why won't my Java consumer read the data that I have created?

查看:79
本文介绍了我的Java使用者为什么不读取我创建的数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从我制作的简单生成器中读取数据.由于某种原因,无论何时我运行使用者,它都不会看到/产生我产生的任何数据.谁能给我任何下一步的指导?

I am trying to read data from a simple producer that I have made. For some reason whenever I run the consumer, it does not see/produce any of the data I have produced. Can anyone possibly give me any guidance on what to do next?

我在下面包含了生产者和消费者的代码:

I have included code of my producer and consumer below:

制作人:

public class AvroProducer {

public static void main(String[] args) {

    String bootstrapServers = "localhost:9092";
    String topic = "trackingReportsReceived";

    //create Producer properties
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    properties.setProperty("schema.registry.url", "http://localhost:8081");

    //create the producer
    KafkaProducer<String, trackingReport> producer = new KafkaProducer<>(properties);

    //creating my own event
    trackingReport event = trackingReport.newBuilder()
            .setRemoteEventUID(2)
            .setReceivedPacketUID(2)
            .setRemoteUnitAID(2)
            .setEventTime(2)
            .setEventLocationStampUID(3)
            .setEventLatitude(2)
            .setEventLongitude(2)
            .setEventOdometer(3)
            .setEventSpeed(3)
            .setEventCourse(3)
            .build();


    //create a producer record
    ProducerRecord<String, trackingReport> eventRecord = new ProducerRecord<>(topic, event);

    //send data - asynchronous
    producer.send(eventRecord, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e == null) {
                    System.out.println("Success!");
                    System.out.println(recordMetadata.toString());
                } else {
                    e.printStackTrace();
                }
            }
        });


    //flush data
    producer.flush();
    //flush and close producer
    producer.close();

消费者:

public class AvroConsumer {

public static void main(String[] args) {

    final Logger logger = LoggerFactory.getLogger(AvroConsumer.class);

    String bootstrapServers = "localhost:9092";

    //create Consumer properties
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    properties.put("schema.registry.url", "http://localhost:8081");
    properties.put("specific.avro.reader", "true");

    //create the consumer
    KafkaConsumer<String, trackingReport> consumer = new KafkaConsumer<>(properties);
    String topic = "trackingReportsReceived";

    consumer.subscribe(Collections.singletonList(topic));

    System.out.println("Waiting for data...");

//        try {

        while (true) {
            ConsumerRecords<String, trackingReport> records = consumer.poll(100);
            for (ConsumerRecord<String, trackingReport> record : records) {
                trackingReport trackingrep = record.value();
                System.out.println(trackingrep);
            }
            consumer.commitSync();
        }

//        } catch (Exception e) {
//            logger.error("Exception occured while consuming messages...", e);
//        } finally {
//            consumer.close();
//        }


}
}

生产者在工作,而消费者却没有.

N.B. The producer works, however the consumer does not.

推荐答案

使用控制台使用者脚本,您是否使用了与Java使用者中的组ID相同的组ID?

With the console consumer script, did you use the same group id as the one in your java consumer?

如果您随后进行了验证,请尝试在代码中使用新的使用者组.

If you did then to validate, try with a new consumer group in your code.

如果它有效,则意味着控制台使用者使用该组ID的使用者读取最后一个当前偏移量,因此当您启动具有相同组ID的Java使用者时,他尝试读取以下内容:该偏移量是最后一个偏移量.因此没有要读取的消息.

If it works then it would mean that the console consumer read all data in the topic so the consumer with this group id commit the last current offsets and when you launch the java consumer with the same group id, he tried to read at that offset which is the last.. So no messages to read.

为验证您还可以先启动Java使用者,然后再启动生产者,如果看到消息,则表明控制台和Java使用者具有相同的组ID.

To validate that you could also start the java consumer first and after that start the producer, if you see the messages then it would be that the console and java consumer had the same group id.

这篇关于我的Java使用者为什么不读取我创建的数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆