如何加入流和数据集? [英] How to join a stream and dataset?

查看:54
本文介绍了如何加入流和数据集?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何加入流和数据集?我有一个流,并且文件中有一个静态数据.我想使用文件中的数据来丰富流的数据.

How to join a stream and dataset? I have a stream and I have a static data in a file. I want to enrich the data of stream using the data in the file.

示例:在流中,我获得机场代码,而在文件中,我具有文件中的机场和代码名称.现在,我想将流数据添加到文件中,以形成带有机场名称的新流.请提供有关如何实现此目标的步骤.

Example: in stream I get airports code and in file I have the name of the airports and codes in file. Now I want to join the stream data to the file to form a new stream with airport names. Please provide steps on how to achieve this.

推荐答案

有很多方法可以使用Flink进行流富集,具体取决于确切的要求. https://www.youtube.com/watch?v=cJS18iKLUIY 很好康斯坦丁·可耐夫(Konstantin Knauf)的演讲涵盖了许多不同的方法,以及它们之间的权衡.

There are lots of ways to approach stream enrichment with Flink, depending on the exact requirements. https://www.youtube.com/watch?v=cJS18iKLUIY is a good talk by Konstantin Knauf that covers many different approaches, and the tradeoffs between them.

在简单的情况下,浓缩数据是不可变的并且相当小,我只需要使用 RichFlatMap 并将整个文件加载到 open()方法中即可.看起来像这样:

In the simple case where the enrichment data is immutable and reasonably small, I would just use a RichFlatMap and load the whole file in the open() method. That would look something like this:

public class EnrichmentWithPreloading extends RichFlatMapFunction<Event, EnrichedEvent> {

    private Map<Long, SensorReferenceData> referenceData;

    @Override
    public void open(final Configuration parameters) throws Exception {
      super.open(parameters);
      referenceData = loadReferenceData();
    }

    @Override
    public void flatMap(
        final Event event,
        final Collector<EnrichedEvent> collector) throws Exception {

      SensorReferenceData sensorReferenceData = 
        referenceData.get(sensorMeasurement.getSensorId());
      collector.collect(new EnrichedEvent(event, sensorReferenceData));
    }

}

您将在 https://github.com/中找到其他方法的更多代码示例.knaufk/enrichments-with-flink .

更新:

如果您想做的是预加载一些较大的,分区的参考数据以与流结合,则有几种方法可以实现此目的,其中一些方法已在我上面共享的视频和repo中进行了介绍.对于那些特定的要求,我建议使用自定义分区程序.有一个示例

If what you'd rather do is preload some larger, partitioned reference data to join with a stream, there are a few ways to approach this, some of which are covered in the video and repo I shared above. For those specific requirements, I suggest using a custom partitioner; there's an example here in that same github repo. The idea is that the enrichment data is sharded, and each streaming event is steered toward the instance with the relevant reference data.

我认为,这比尝试获取Table API作为联接来进行这种特殊的充实要简单得多.

In my opinion, this is simpler than trying to get the Table API to do this particular enrichment as a join.

这篇关于如何加入流和数据集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆