下沉kafka流时看不到消息,并且在flink 1.2中看不到打印消息 [英] Cannot see message while sinking kafka stream and cannot see print message in flink 1.2

查看:309
本文介绍了下沉kafka流时看不到消息,并且在flink 1.2中看不到打印消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是使用kafka读取json格式的字符串,对字符串进行过滤,然后将消息下沉(仍为json字符串格式).

My goal is to use kafka to read in a string in json format, do a filter to the string and then sink the message out (still in json string format).

出于测试目的,我的输入字符串消息如下:

For testing purpose, my input string message looks like:

{"a":1,"b":2}

我的实现代码是:

def main(args: Array[String]): Unit = {

// parse input arguments
val params = ParameterTool.fromArgs(args)

if (params.getNumberOfParameters < 4) {
  println("Missing parameters!\n"
    + "Usage: Kafka --input-topic <topic> --output-topic <topic> "
    + "--bootstrap.servers <kafka brokers> "
    + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]")
  return
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)

// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new JSONKeyValueDeserializationSchema(false),
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a").asText.equals("1")
                      && node.get("b").asText.equals("2"))

messageStream.print()
// Refer to: https://stackoverflow.com/documentation/apache-flink/9004/how-to-define-a-custom-deserialization-schema#t=201708080802319255857
filteredStream.addSink(new FlinkKafkaProducer010[ObjectNode](
  params.getRequired("output-topic"),
  new SerializationSchema[ObjectNode] {
    override def serialize(element: ObjectNode): Array[Byte] = element.toString.getBytes()
  }, params.getProperties
))

env.execute("Kafka 0.10 Example")
}

可以看出,我想将消息流打印到控制台,并将过滤后的消息下沉到kafka.但是,我看不到它们.

As can be seen, I want to print message stream to the console and sink the filtered message to kafka. However, I can see neither of them.

有趣的是,如果我将KafkaConsumer的模式从JSONKeyValueDeserializationSchema修改为SimpleStringSchema,则可以看到messageStream打印到控制台.代码如下所示:

The interesting thing is, if I modify the schema of KafkaConsumer from JSONKeyValueDeserializationSchema to SimpleStringSchema, I can see messageStream print to the console. Code as shown below:

 val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new SimpleStringSchema,
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)
messageStream.print()

这让我想起如果我使用JSONKeyValueDeserializationSchema,我的输入消息实际上是不被Kafka接受的.但这似乎很奇怪,而且与在线文档完全不同( https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html )

This makes me think if I use JSONKeyValueDeserializationSchema, my input message is actually not accepted by Kafka. But this seems so weird and quite different from the online document(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html)

希望有人可以帮助我!

推荐答案

JSONKeyValueDeserializationSchema()需要每个kafka msg的消息密钥,并且我假设在生成JSON消息时未提供任何密钥并通过kafka主题发送.

The JSONKeyValueDeserializationSchema() expects message key with each kafka msg and I am assuming that no key is supplied when the JSON messages are produced and sent over the kafka topic.

要解决此问题,请尝试使用 JSONDeserializationSchema(),该方法仅接收消息,并根据接收到的消息创建对象节点.

Thus to solve the issue, try using JSONDeserializationSchema() which expects only the message and creates an object node based on the message received.

这篇关于下沉kafka流时看不到消息,并且在flink 1.2中看不到打印消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆