下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息 [英] Cannot see message while sinking kafka stream and cannot see print message in flink 1.2

查看:26
本文介绍了下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是使用 kafka 读取 json 格式的字符串,对字符串进行过滤,然后将消息下沉(仍然是 json 字符串格式).

My goal is to use kafka to read in a string in json format, do a filter to the string and then sink the message out (still in json string format).

出于测试目的,我的输入字符串消息如下所示:

For testing purpose, my input string message looks like:

{"a":1,"b":2}

我的实现代码是:

def main(args: Array[String]): Unit = {

// parse input arguments
val params = ParameterTool.fromArgs(args)

if (params.getNumberOfParameters < 4) {
  println("Missing parameters!\n"
    + "Usage: Kafka --input-topic <topic> --output-topic <topic> "
    + "--bootstrap.servers <kafka brokers> "
    + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]")
  return
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)

// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new JSONKeyValueDeserializationSchema(false),
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a").asText.equals("1")
                      && node.get("b").asText.equals("2"))

messageStream.print()
// Refer to: https://stackoverflow.com/documentation/apache-flink/9004/how-to-define-a-custom-deserialization-schema#t=201708080802319255857
filteredStream.addSink(new FlinkKafkaProducer010[ObjectNode](
  params.getRequired("output-topic"),
  new SerializationSchema[ObjectNode] {
    override def serialize(element: ObjectNode): Array[Byte] = element.toString.getBytes()
  }, params.getProperties
))

env.execute("Kafka 0.10 Example")
}

可以看出,我想将消息流打印到控制台并将过滤后的消息下沉到 kafka.但是,我看不到它们.

As can be seen, I want to print message stream to the console and sink the filtered message to kafka. However, I can see neither of them.

有趣的是,如果我将 KafkaConsumer 的架构从 JSONKeyValueDeserializationSchema 修改为 SimpleStringSchema,我可以看到 messageStream 打印到控制台.代码如下图:

The interesting thing is, if I modify the schema of KafkaConsumer from JSONKeyValueDeserializationSchema to SimpleStringSchema, I can see messageStream print to the console. Code as shown below:

 val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new SimpleStringSchema,
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)
messageStream.print()

这让我觉得如果我使用 JSONKeyValueDeserializationSchema,我的输入消息实际上不会被 Kafka 接受.但这似乎很奇怪,与在线文档(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html)

This makes me think if I use JSONKeyValueDeserializationSchema, my input message is actually not accepted by Kafka. But this seems so weird and quite different from the online document(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html)

希望有人能帮帮我!

推荐答案

JSONKeyValueDeserializationSchema() 需要每个 kafka msg 的消息密钥,我假设在生成 JSON 消息时没有提供密钥并通过 kafka 主题发送.

The JSONKeyValueDeserializationSchema() expects message key with each kafka msg and I am assuming that no key is supplied when the JSON messages are produced and sent over the kafka topic.

因此要解决这个问题,请尝试使用 JSONDeserializationSchema(),它只需要消息并根据收到的消息创建一个对象节点.

Thus to solve the issue, try using JSONDeserializationSchema() which expects only the message and creates an object node based on the message received.

这篇关于下沉 kafka 流时看不到消息,并且在 flink 1.2 中看不到打印消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆