在Flink 1.2中下沉Kafka流时出错 [英] Error in sinking kafka stream in flink 1.2
问题描述
我所做的是读取来自kafka的json格式的消息.例如
What I did was to read in a message from kafka in json format. E.g.
{"a":1,"b":2}
然后,我对此消息应用了一个过滤器,以确保与a对应的值为1,b的值为2.最后,我想将结果流输出到下游的kafka.但是,我不知道为什么编译器会说类型不匹配.
Then I applied a filter to this message to make sure the value corresponding to a is 1, the value of b is 2. Finally, I want to output the result stream to a downstream kafka. However, I don't know why the compiler says type mismatch.
我的代码如下:
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONDeserializationSchema(),
params.getProperties)
val messageStream = env.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1")
&& jsonNode.get("b").asText.equals("2"))
filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties))
我得到的错误如下图所示:
The error I got is shown in the image below:
我参考flink kafka连接器文档来编写kafka流出代码: https://ci.apache. org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
I refer to the flink kafka connector document to write the kafka outstream code: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
推荐答案
您有类型为ObjectNode
的流DataStream
,因此您需要提供FlinkKafkaProducer010[ObjectNode]
例如:
You have a stream DataStream
of type ObjectNode
, so you need to provide FlinkKafkaProducer010[ObjectNode]
e.g:
stream1.addSink(new FlinkKafkaProducer010[ObjectNode](params.getRequired("output-topic"), new SerializationSchema[ObjectNode] {
override def serialize(element: ObjectNode): Array[Byte] = ???
} ), params.getProperties)
java中所有泛型类型的类型都是不变的,这就是为什么不能仅传递FlinkKafkaProducer010[Object]
的原因.
All generic types in java are invariant in type, that is why you cannot just pass FlinkKafkaProducer010[Object]
.
您可能会遇到的另一个问题是,您还需要提供SerializationSchema[ObjectNode]
,而SimpleStringSchema
实现SerializationSchema[String]
.
Another problem you may encounter further is that you also need to provide SerializationSchema[ObjectNode]
whereas the SimpleStringSchema
implements SerializationSchema[String]
.
这篇关于在Flink 1.2中下沉Kafka流时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!