Flink:使用Kafka流加入文件 [英] Flink: join file with kafka stream

查看:189
本文介绍了Flink:使用Kafka流加入文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个我无法真正解决的问题. 所以我有一个kafka流,其中包含这样的数据:

I have a problem I don't really can figure out. So I have a kafka stream that contains some data like this:

{"adId":"9001", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"}

我想用另一个值'bookingId'替换'adId'. 该值位于一个csv文件中,但是我真的无法弄清楚如何使其工作.

And I want to replace 'adId' with another value 'bookingId'. This value is located in a csv file, but I can't really figure out how to get it working.

这是我的映射csv文件:

Here is my mapping csv file:

9001;8
9002;10

所以理想情况下,我的输出应该是

So my output would ideally be something like

{"bookingId":"8", "eventAction":"start", "eventType":"track", "eventValue":"", "timestamp":"1498118549550"}

此文件每小时至少可以刷新一次,因此应该对其进行更改.

This file can get refreshed every hour at least once, so it should pick up changes to it.

我目前有对我不起作用的这段代码:

I currently have this code which doesn't work for me:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // create a checkpoint every 30 seconds
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

DataStream<String> adToBookingMapping = env.readTextFile(parameters.get("adToBookingMapping"));

DataStream<Tuple2<Integer,Integer>> input = adToBookingMapping.flatMap(new Tokenizer());

//Kafka Consumer
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", parameters.get("bootstrap.servers"));
properties.setProperty("group.id", parameters.get("group.id"));

FlinkKafkaConsumer010<ObjectNode> consumer = new FlinkKafkaConsumer010<>(parameters.get("inbound_topic"), new JSONDeserializationSchema(), properties);

consumer.setStartFromGroupOffsets();

consumer.setCommitOffsetsOnCheckpoints(true);

DataStream<ObjectNode> logs = env.addSource(consumer);

DataStream<Tuple4<Integer,String,Integer,Float>> parsed = logs.flatMap(new Parser());

// output -> bookingId, action, impressions, sum
DataStream<Tuple4<Integer, String,Integer,Float>> joined = runWindowJoin(parsed, input, 3);


public static DataStream<Tuple4<Integer, String, Integer, Float>> runWindowJoin(DataStream<Tuple4<Integer, String, Integer, Float>> parsed,
      DataStream<Tuple2<Integer, Integer>> input,long windowSize) {

  return parsed.join(input)
          .where(new ParsedKey())
          .equalTo(new InputKey())
          .window(TumblingProcessingTimeWindows.of(Time.of(windowSize, TimeUnit.SECONDS)))
          //.window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
          .apply(new JoinFunction<Tuple4<Integer, String, Integer, Float>, Tuple2<Integer, Integer>, Tuple4<Integer, String, Integer, Float>>() {

              private static final long serialVersionUID = 4874139139788915879L;

              @Override
              public Tuple4<Integer, String, Integer, Float> join(
                              Tuple4<Integer, String, Integer, Float> first,
                              Tuple2<Integer, Integer> second) {
                  return new Tuple4<Integer, String, Integer, Float>(second.f1, first.f1, first.f2, first.f3);
              }
          });
}

该代码仅运行一次然后停止,因此不会使用csv文件转换kafka中的新条目.关于如何使用csv文件中的最新值处理来自Kafka的流的任何想法?

The code only runs once and then stops, so it doesn't convert new entries in kafka using the csv file. Any ideas on how I could process the stream from Kafka with the latest values from my csv file?

亲切的问候,

贫乏

推荐答案

您的目标似乎是将蒸腾的数据与变化缓慢的目录(即侧面输入)结合在一起.我认为join操作在这里没有用,因为它没有跨Windows存储目录条目.此外,文本文件是有界输入,其行被读取一次.

Your goal appears to be to join steaming data with a slow-changing catalog (i.e. a side-input). I don't think the join operation is useful here because it doesn't store the catalog entries across windows. Also, the text file is a bounded input whose lines are read once.

考虑使用connect创建连接的流,并将目录数据存储为托管状态以执行查找.运算符的并行度必须为1.

Consider using connect to create a connected stream, and store the catalog data as managed state to perform lookups into. The operator's parallelism would need to be 1.

通过研究边际投入",看看人们今天使用的解决方案,您可能会找到更好的解决方案.参见 FLIP-17 Dean Wampler在Flink Forward上的演讲.

You may find a better solution by researching 'side inputs', looking at the solutions that people use today. See FLIP-17 and Dean Wampler's talk at Flink Forward.

这篇关于Flink:使用Kafka流加入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆