如何使用Spring-Kafka通过Confluent Schema注册表读取AVRO消息? [英] How to use Spring-Kafka to read AVRO message with Confluent Schema registry?
本文介绍了如何使用Spring-Kafka通过Confluent Schema注册表读取AVRO消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
如何使用Spring-Kafka通过Confluent Schema注册表读取AVRO消息?有样品吗?我在官方参考文件中找不到它.
How to use Spring-Kafka to read AVRO message with Confluent Schema registry? Is there any sample? I can't find it in official reference document.
推荐答案
下面的代码可以读取customer-avro主题中的消息.这是我定义为的值的AVRO模式.
Below code can read the message from customer-avro topic. Here's the AVRO schema on value i have defined as.
{
"type": "record",
"namespace": "com.example",
"name": "Customer",
"version": "1",
"fields": [
{ "name": "first_name", "type": "string", "doc": "First Name of Customer" },
{ "name": "last_name", "type": "string", "doc": "Last Name of Customer" },
{ "name": "age", "type": "int", "doc": "Age at the time of registration" },
{ "name": "height", "type": "float", "doc": "Height at the time of registration in cm" },
{ "name": "weight", "type": "float", "doc": "Weight at the time of registration in kg" },
{ "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" }
]
}
下面是一个完整的代码片段,可通过手动提交阅读此示例.
Below is a complete code snippet to read this example with manual commit.
import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Calendar;
import java.util.Collections;
import java.util.Properties;
public class KafkaAvroJavaConsumerV1Demo {
public static void main(String[] args) {
Properties properties = new Properties();
// normal consumer
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.put("group.id", "customer-consumer-group-v1");
properties.put("auto.commit.enable", "false");
properties.put("auto.offset.reset", "earliest");
// avro part (deserializer)
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
properties.setProperty("schema.registry.url", "http://127.0.0.1:8081");
properties.setProperty("specific.avro.reader", "true");
KafkaConsumer<String, Customer> kafkaConsumer = new KafkaConsumer<>(properties);
String topic = "customer-avro";
kafkaConsumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for data...");
while (true){
System.out.println("Polling at " + Calendar.getInstance().getTime().toString());
ConsumerRecords<String, Customer> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, Customer> record : records){
Customer customer = record.value();
System.out.println(customer);
}
kafkaConsumer.commitSync();
}
}
}
这篇关于如何使用Spring-Kafka通过Confluent Schema注册表读取AVRO消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文