Flink CsvTableSource 流 [英] Flink CsvTableSource Streaming

查看:42
本文介绍了Flink CsvTableSource 流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想流式传输一个 csv 文件并使用 flink 执行 sql 操作.但是我写的代码只读了一次就停止了.它不流.提前致谢,

I want to stream a csv file and perform sql operations using flink. But the code i have written just reads once and stops. It does not stream. Thanks in advance,

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);

CsvTableSource csvtable = CsvTableSource.builder()
    .path("D:/employee.csv")
    .ignoreFirstLine()
    .fieldDelimiter(",")
    .field("id", Types.INT())
    .field("name", Types.STRING())
    .field("designation", Types.STRING())
    .field("age", Types.INT())
    .field("location", Types.STRING())
    .build();

tableEnv.registerTableSource("employee", csvtable);

Table table = tableEnv.scan("employee").where("name='jay'").select("id,name,location");
//Table table1 = tableEnv.scan("employee").where("age > 23").select("id,name,age,location");

DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);

//DataStream<Row> stream1 = tableEnv.toAppendStream(table1, Row.class);

stream.print();
//stream1.print();

env.execute();

推荐答案

CsvTableSource 基于 FileInputFormat,它逐行读取和解析引用的文件.结果行被转发到流查询中.因此,在 CsvTableSource 中,在连续读取和转发行的意义上是流式传输的.但是,CsvTableSource 在文件末尾终止.因此,它发出一个有界流.

The CsvTableSource is based on a FileInputFormat which reads and parses the referenced file line by line. The resulting rows are forwarded into the streaming query. So in CsvTableSource is streaming in the sense that rows are continuously read and forwarded. However, the CsvTableSource terminates at the end of the file. Hence, it emits a bounded stream.

我假设您期望的行为是 CsvTableSource 读取文件直到文件结束,然后等待追加写入文件.然而,这不是 CsvTableSource 的工作方式.您需要为此实现自定义 TableSource.

I assume the behavior that you expect is that the CsvTableSource reads the file until its end and then waits for appending writes to the file. However, this is not how the CsvTableSource works. You would need to implement a custom TableSource for that.

这篇关于Flink CsvTableSource 流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆