单个作业中的Flink EXECUTE语句集和数据流 [英] Flink execute statement set and datastream in a single job
本文介绍了单个作业中的Flink EXECUTE语句集和数据流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
不知何故,我无法在单个环境中执行语句集和可查询流,如果我的最后一条语句是flinkEnv.ecute,它将执行可查询流,而不执行语句集中的其他语句,反之亦然
val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
val tableEnv = StreamTableEnvironment.create(flinkEnv);
val statementSet = tableEnv.createStatementSet();
statementSet.addInsertSql("INSERT INTO OUTPUT (SELECT * FROM INPUT_TRANSFORFM)")
tableEnv.toChangelogStream(tableEnv.sqlQuery("SELECT * FROM OUTPUT")).keyBy(row -> row.getField(0)).asQueryableState("OUTPUT_CHANGELOG_STATE");
flinkEnv.execute("job"); // only execute queryable operator
//statementSet.execute(); // only execute insert statement, not queryable state
OUTPUT
表定义为upsert-kafka
->;OUPUT(pkey, name)
接口
推荐答案当前限制此功能。即使使用较低的层也是可能的。
本ticket跟踪语句集+输出到数据流API的用例。
作为一种解决办法,您可以使用toChangelogStream
forsqlQuery("SELECT * FROM INPUT_TRANSFORM")
,然后使用fromChangelogStream
再次将其注册为表,以便在多个sqlQuery()s
中引用同一管道。
这篇关于单个作业中的Flink EXECUTE语句集和数据流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文