单个作业中的Flink EXECUTE语句集和数据流 [英] Flink execute statement set and datastream in a single job

查看:21
本文介绍了单个作业中的Flink EXECUTE语句集和数据流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

不知何故,我无法在单个环境中执行语句集和可查询流,如果我的最后一条语句是flinkEnv.ecute,它将执行可查询流,而不执行语句集中的其他语句,反之亦然

val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
val tableEnv = StreamTableEnvironment.create(flinkEnv);

val statementSet = tableEnv.createStatementSet();
statementSet.addInsertSql("INSERT INTO OUTPUT (SELECT * FROM INPUT_TRANSFORFM)")

tableEnv.toChangelogStream(tableEnv.sqlQuery("SELECT * FROM OUTPUT")).keyBy(row -> row.getField(0)).asQueryableState("OUTPUT_CHANGELOG_STATE");

flinkEnv.execute("job"); // only execute queryable operator
//statementSet.execute(); // only execute insert statement, not queryable state

OUTPUT表定义为upsert-kafka->;OUPUT(pkey, name)

接口

推荐答案当前限制此功能。即使使用较低的层也是可能的。

ticket跟踪语句集+输出到数据流API的用例。

作为一种解决办法,您可以使用toChangelogStreamforsqlQuery("SELECT * FROM INPUT_TRANSFORM"),然后使用fromChangelogStream再次将其注册为表,以便在多个sqlQuery()s中引用同一管道。

这篇关于单个作业中的Flink EXECUTE语句集和数据流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆