Flink:将文件与 kafka 流连接 [英] Flink: join file with kafka stream

查看:33
本文介绍了Flink:将文件与 kafka 流连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个我真的无法弄清楚的问题.所以我有一个 kafka 流,其中包含一些这样的数据:

I have a problem I don't really can figure out. So I have a kafka stream that contains some data like this:

{"adId":"9001", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"}

我想用另一个值bookingId"替换adId".这个值位于一个 csv 文件中,但我真的不知道如何让它工作.

And I want to replace 'adId' with another value 'bookingId'. This value is located in a csv file, but I can't really figure out how to get it working.

这是我的映射 csv 文件:

Here is my mapping csv file:

9001;8
9002;10

所以我的输出最好是这样的

So my output would ideally be something like

{"bookingId":"8", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"}

这个文件至少每小时刷新一次,所以它应该接收对它的更改.

This file can get refreshed every hour at least once, so it should pick up changes to it.

我目前有这个对我不起作用的代码:

I currently have this code which doesn't work for me:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // create a checkpoint every 30 seconds
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

DataStream<String> adToBookingMapping = env.readTextFile(parameters.get("adToBookingMapping"));

DataStream<Tuple2<Integer,Integer>> input = adToBookingMapping.flatMap(new Tokenizer());

//Kafka Consumer
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", parameters.get("bootstrap.servers"));
properties.setProperty("group.id", parameters.get("group.id"));

FlinkKafkaConsumer010<ObjectNode> consumer = new FlinkKafkaConsumer010<>(parameters.get("inbound_topic"), new JSONDeserializationSchema(), properties);

consumer.setStartFromGroupOffsets();

consumer.setCommitOffsetsOnCheckpoints(true);

DataStream<ObjectNode> logs = env.addSource(consumer);

DataStream<Tuple4<Integer,String,Integer,Float>> parsed = logs.flatMap(new Parser());

// output -> bookingId, action, impressions, sum
DataStream<Tuple4<Integer, String,Integer,Float>> joined = runWindowJoin(parsed, input, 3);


public static DataStream<Tuple4<Integer, String, Integer, Float>> runWindowJoin(DataStream<Tuple4<Integer, String, Integer, Float>> parsed,
      DataStream<Tuple2<Integer, Integer>> input,long windowSize) {

  return parsed.join(input)
          .where(new ParsedKey())
          .equalTo(new InputKey())
          .window(TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)))
          //.window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
          .apply(new JoinFunction<Tuple4<Integer, String, Integer, Float>, Tuple2<Integer, Integer>, Tuple4<Integer, String, Integer, Float>>() {

              private static final long serialVersionUID = 4874139139788915879L;

              @Override
              public Tuple4<Integer, String, Integer, Float> join(
                              Tuple4<Integer, String, Integer, Float> first,
                              Tuple2<Integer, Integer> second) {
                  return new Tuple4<Integer, String, Integer, Float>(second.f1, first.f1, first.f2, first.f3);
              }
          });
}

代码只运行一次然后停止,因此它不会使用 csv 文件转换 kafka 中的新条目.关于如何使用 csv 文件中的最新值处理来自 Kafka 的流的任何想法?

The code only runs once and then stops, so it doesn't convert new entries in kafka using the csv file. Any ideas on how I could process the stream from Kafka with the latest values from my csv file?

亲切的问候,

黑化

推荐答案

您的目标似乎是将流数据与缓慢变化的目录(即侧输入)结合起来.我不认为 join 操作在这里有用,因为它不跨窗口存储目录条目.此外,文本文件是一个有界输入,其行被读取一次.

Your goal appears to be to join steaming data with a slow-changing catalog (i.e. a side-input). I don't think the join operation is useful here because it doesn't store the catalog entries across windows. Also, the text file is a bounded input whose lines are read once.

考虑使用 connect 创建连接流,并将目录数据存储为托管状态以执行查找.运算符的并行度需要为 1.

Consider using connect to create a connected stream, and store the catalog data as managed state to perform lookups into. The operator's parallelism would need to be 1.

您可能会通过研究辅助输入"和当今人们使用的解决方案找到更好的解决方案.参见 FLIP-17Dean Wampler 在 Flink Forward 的演讲.

You may find a better solution by researching 'side inputs', looking at the solutions that people use today. See FLIP-17 and Dean Wampler's talk at Flink Forward.

这篇关于Flink:将文件与 kafka 流连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆