卡夫卡消费者 [英] Kafka consumer in flink
问题描述
我正在使用kafka和apache flink.我正在尝试使用Apache Flink中来自kafka主题的记录(以avro格式).以下是我尝试使用的代码.
I am working with kafka and apache flink. I am trying to consume records (which are in avro format) from a kafka topic in apache flink. Below is the piece of code I am trying with.
使用自定义反序列化器反序列化主题中的avro记录.
Using a custom deserialiser to deserialise avro records from the topic.
我要发送到主题"test-topic"的数据的Avro架构如下.
the Avro schema for the data I am sending to topic "test-topic" is as below.
{
"namespace": "com.example.flink.avro",
"type": "record",
"name": "UserInfo",
"fields": [
{"name": "name", "type": "string"}
]
}
我正在使用的自定义解串器如下.
The custom deserialiser I am using is as below.
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
private static final long serialVersionUID = 1L;
private final Class<T> avroType;
private transient DatumReader<T> reader;
private transient BinaryDecoder decoder;
public AvroDeserializationSchema(Class<T> avroType) {
this.avroType = avroType;
}
public T deserialize(byte[] message) {
ensureInitialized();
try {
decoder = DecoderFactory.get().binaryDecoder(message, decoder);
T t = reader.read(null, decoder);
return t;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
private void ensureInitialized() {
if (reader == null) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
reader = new SpecificDatumReader<T>(avroType);
} else {
reader = new ReflectDatumReader<T>(avroType);
}
}
}
public boolean isEndOfStream(T nextElement) {
return false;
}
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avroType);
}
}
这就是我的flink应用程序的编写方式.
And this is how my flink app is written.
public class FlinkKafkaApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers", "localhost:9092");
kafkaProperties.put("group.id", "test");
AvroDeserializationSchema<UserInfo> schema = new AvroDeserializationSchema<UserInfo>(UserInfo.class);
FlinkKafkaConsumer011<UserInfo> consumer = new FlinkKafkaConsumer011<UserInfo>("test-topic", schema, kafkaProperties);
DataStreamSource<UserInfo> userStream = env.addSource(consumer);
userStream.map(new MapFunction<UserInfo, UserInfo>() {
@Override
public UserInfo map(UserInfo userInfo) {
return userInfo;
}
}).print();
env.execute("Test Kafka");
}
我正在尝试打印发送到以下主题的记录.{"name":"sumit"}
I am trying to print the record sent to the the topic which is as below. {"name" :"sumit"}
输出:
我得到的输出是{"name":"}
The output I am getting is {"name":""}
任何人都可以帮助找出这里的问题以及为什么我没有得到{"name":"sumit"}作为输出.
Can anyone help to figure out what is the issue here and why I am not getting {"name" : "sumit"} as output.
推荐答案
Flink文档说:Flink的Kafka使用者称为FlinkKafkaConsumer08(对于Kafka 0.9.0.x版本,则称为09,等等;对于Kafka> = 1.0.0版本,则仅称为FlinkKafkaConsumer).它提供对一个或多个Kafka主题的访问.
Flink documentation says : Flink’s Kafka consumer is called FlinkKafkaConsumer08 (or 09 for Kafka 0.9.0.x versions, etc. or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions). It provides access to one or more Kafka topics.
我们不必编写自定义反序列化器即可使用来自Kafka的Avro消息.
We do not have to write the custom de-serializer to consume Avro messages from Kafka.
-要读取SpecificRecords:
-To read SpecificRecords :
DataStreamSource<UserInfo> stream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<>("test_topic", AvroDeserializationSchema.forSpecific(UserInfo.class), properties).setStartFromEarliest());
要读取GenericRecords:
To read GenericRecords :
Schema schema = Schema.parse("{"namespace": "com.example.flink.avro","type": "record","name": "UserInfo","fields": [{"name": "name", "type": "string"}]}");
DataStreamSource<GenericRecord> stream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<>("test_topic", AvroDeserializationSchema.forGeneric(schema), properties).setStartFromEarliest());
更多详细信息: https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumer
这篇关于卡夫卡消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!