如何在Flink 1.2中从Kafka中提取json格式的部分字符串 [英] How to extract part of a string in json format from Kafka in Flink 1.2

查看:112
本文介绍了如何在Flink 1.2中从Kafka中提取json格式的部分字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是使用 kafka 读取 json 格式的字符串,对字符串进行过滤,选择部分消息并将消息下沉(仍然是 json 字符串格式).

My goal is to use kafka to read in a string in json format, do a filter to the string, select part of the message and sink the message out (still in json string format).

出于测试目的,我的输入字符串消息如下所示:

For testing purpose, my input string message looks like:

{"a":1,"b":2,"c":"3"}

我的实现代码是:

def main(args: Array[String]): Unit = {

val inputProperties = new Properties()
inputProperties.setProperty("bootstrap.servers", "localhost:9092")
inputProperties.setProperty("group.id", "myTest2")
val inputTopic = "test"

val outputProperties = new Properties()
outputProperties.setProperty("bootstrap.servers", "localhost:9092")
val outputTopic = "test2"


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)

// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
  inputTopic,
  new JSONDeserializationSchema(),
  inputProperties)

val messageStream : DataStream[ObjectNode]= env
  .addSource(kafkaConsumer).rebalance

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a")
  .asText.equals("1") && node.get("b").asText.equals("2"))

// Need help in this part, how to extract for instance a,c and 
// get something like {"a":"1", "c":"3"}?
val testStream:DataStream[JsonNode] = filteredStream.map(
  node => {
    node.get("a")
  }
)

testStream.addSink(new FlinkKafkaProducer010[JsonNode](
  outputTopic,
  new SerializationSchema[JsonNode] {
    override def serialize(element: JsonNode): Array[Byte] = element.toString.getBytes()
  }, outputProperties
))

env.execute("Kafka 0.10 Example")
 }

如这段代码的注释所示,我不确定如何正确选择消息的一部分.我使用地图,但我不知道如何连接整个消息.例如我在代码中所做的只能给我一个结果为1",但我想要的是 {"a":1, "c":"3"}

As shown in the comment of this code, I am not sure how to properly select part of the message. I use map, but I don't know how to concatenate the whole message. For instance, what I did in the code can only give me a result as "1", but what I want is {"a":1, "c":"3"}

或者也许有一种完全不同的方法来解决这个问题.事情是在火花流中有一个选择"API,但是我在 Flink 中找不到它.

Or maybe there is a completely different way to solve this problem. The thing is in spark streaming there is a "select" API, however I cannot find it in Flink.

非常感谢 flink 社区的帮助!这是我想在这个小项目中实现的最后一个功能.

And thanks a lot for flink community's help! This is the last feature I would like to achieve in this small project.

推荐答案

Flink Streaming 作业处理每个输入一次并将其输出到下一个任务或将它们保存到外部存储.

Flink Streaming job processes each input one time and output it to the next task or save them onto external storage.

一种方法是将所有输出保存到外部存储中,例如 HDFS.流式作业完成后,使用批处理作业将它们组合成一个 JSON.

One way is save all the outputs into external storage, like HDFS. After streaming job is done, using a batch job to combine them into a JSON.

另一种方法是使用 state 和 RichMapFunction 来获取包含所有键值的 JSON.

Another way is to use state and RichMapFunction to get the JSON containing all the key-values.

stream.map(new MapFunction<String, Tuple2<String, String>>() {
    public Tuple2<String, String> map(String value) throws Exception {
        return new Tuple2<String, String>("mock", value);
    }
}).keyBy(0).map(new RichMapFunction<Tuple2<String,String>, String>() {
    @Override
    public String map(Tuple2<String, String> value) throws Exception {
        ValueState<String> old = getRuntimeContext().getState(new ValueStateDescriptor<String>("test", String.class));
        String newVal = old.value();
        if (newVal != null) makeJSON(newVal, value.f1);
        else newVal = value.f1;
        old.update(newVal);
        return newVal;
    }
}).print();

并使用这个映射函数:filteredStream.map(function);

And use this map function: filteredStream.map(function);

注意,当使用 state 时,你会看到这样的输出:{"a": 1}, {"a": 1, "c": 3}.最后的输出应该是你想要的.

Note that when using state, you will see output like this: {"a": 1}, {"a": 1, "c": 3}. The last output should be what you want.

这篇关于如何在Flink 1.2中从Kafka中提取json格式的部分字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆