在Flink 1.2中下沉Kafka流时出错 [英] Error in sinking kafka stream in flink 1.2

查看:105
本文介绍了在Flink 1.2中下沉Kafka流时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我所做的是读取来自kafka的json格式的消息.例如

What I did was to read in a message from kafka in json format. E.g.

{"a":1,"b":2}

然后,我对此消息应用了一个过滤器,以确保与a对应的值为1,b的值为2.最后,我想将结果流输出到下游的kafka.但是,我不知道为什么编译器会说类型不匹配.

Then I applied a filter to this message to make sure the value corresponding to a is 1, the value of b is 2. Finally, I want to output the result stream to a downstream kafka. However, I don't know why the compiler says type mismatch.

我的代码如下:

val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONDeserializationSchema(),
params.getProperties)

val messageStream = env.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1")
                      && jsonNode.get("b").asText.equals("2"))

filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties))

我得到的错误如下图所示:

The error I got is shown in the image below:

我参考flink kafka连接器文档来编写kafka流出代码: https://ci.apache. org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

I refer to the flink kafka connector document to write the kafka outstream code: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

推荐答案

您有类型为ObjectNode的流DataStream,因此您需要提供FlinkKafkaProducer010[ObjectNode]例如:

You have a stream DataStream of type ObjectNode, so you need to provide FlinkKafkaProducer010[ObjectNode] e.g:

stream1.addSink(new FlinkKafkaProducer010[ObjectNode](params.getRequired("output-topic"), new SerializationSchema[ObjectNode] {
  override def serialize(element: ObjectNode): Array[Byte] = ???
} ), params.getProperties) 

java中所有泛型类型的类型都是不变的,这就是为什么不能仅传递FlinkKafkaProducer010[Object]的原因.

All generic types in java are invariant in type, that is why you cannot just pass FlinkKafkaProducer010[Object].

您可能会遇到的另一个问题是,您还需要提供SerializationSchema[ObjectNode],而SimpleStringSchema实现SerializationSchema[String].

Another problem you may encounter further is that you also need to provide SerializationSchema[ObjectNode] whereas the SimpleStringSchema implements SerializationSchema[String].

这篇关于在Flink 1.2中下沉Kafka流时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆