通过Flink、Scala、addSource和ReadCsvFile读取CSV文件 [英] Reading csv file by Flink, scala, addSource and readCsvFile

查看:5
本文介绍了通过Flink、Scala、addSource和ReadCsvFile读取CSV文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望使用Flink、Scala-Language和addSource-以及ReadCsvFile-函数来读取CSV文件。我还没有找到任何关于这方面的简单例子。我只发现:https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/scala/com/dataartisans/flinktraining/exercises/datastream_scala/cep/LongRides.scala,这对于我的目的来说太复杂了。

定义中:StreamExecutionEnvironment.addSource(sourceFunction)是否应仅将ReadCsvFile用作源函数?

阅读后,我想使用CEP(复杂事件处理)。

推荐答案

ReadCsvFile()仅作为Flink的DataSet(Batch)接口的一部分提供,不能与数据流(流)接口一起使用。这里有一个非常好的example of readCsvFile(),尽管它可能与您正在尝试做的事情无关。

ReadTextFile()和ReadFile()是StreamExecutionEnvironment上的方法,并且不实现SourceFunction接口--它们不是用来与addSource()一起使用的,而是用来代替它的。下面是使用数据流API加载CSV的example of using readTextFile()

另一种选择是使用表API,并使用CsvTableSource。下面是an example and some discussion of what it does and doesn't do。如果采用此路线,则需要在使用CEP之前使用StreamTableEnvironment()将表流转换为数据流。

请记住,所有这些方法都只需读取文件一次,然后根据其内容创建一个有界流。如果您想要一个读入无限CSV流并等待追加新行的源,则需要一种不同的方法。您可以使用自定义源、socketTextStream或类似Kafka的内容。

这篇关于通过Flink、Scala、addSource和ReadCsvFile读取CSV文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆