apache-flink相关内容

闪烁中的类型信息

我在以JSON格式将数据从Flink发送到Kafka主题的位置有一个管道。我也能够从Kafka主题中获得它,也能够获得JSON属性。现在,像scala reflect类(我还可以在运行时比较数据类型)一样,我试图使用TypeInformation在Fink中做同样的事情,在那里我可以设置一些预定义的格式,从主题读取的任何数据都应该在这个Validation下,并且应该相应地传递或失败。 我有如下数 ..
发布时间:2022-03-15 11:43:33 其他开发

Flink1.12找不到任何在类路径中实现';org.apache.flink.table.factories.DynamicTableFactory';的标识符kafka的工厂

我有一个本地运行良好的Flink作业,但在尝试群集上的flink run该作业时失败。它基本上是从卡夫卡中读取,进行一些转换,然后写入水槽。尝试通过'connector' = 'kafka'从Kafka加载数据时出错。 这是我的pom.xml,注:flink-connector-kafka包含在内。 ..
发布时间:2022-03-15 11:40:53 其他开发

Flink Streaming:事件是否根据其键分别分发到每个任务槽?

例如,如果我的事件顺序为键A,事件顺序为键B,并行度为2。是否所有键A的事件都转到一个任务槽,而键B的事件都转到另一个任务槽? 如果我只按键A的顺序获取事件,会发生什么情况?它们是否也会分发到两个任务槽。这是否意味着我失去了它们出现的顺序? 推荐答案 不,它不完全是这样工作的。 发生的情况是每个键都映射到一个键组,其中键组总数由集群的最大并行度(配置设置)决定。然后将键组映射到 ..

单个作业中的Flink EXECUTE语句集和数据流

不知何故,我无法在单个环境中执行语句集和可查询流,如果我的最后一条语句是flinkEnv.ecute,它将执行可查询流,而不执行语句集中的其他语句,反之亦然 val flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment(); val tableEnv = StreamTableEnvironment.create(flin ..
发布时间:2022-03-15 11:33:40 其他开发

使用Kinesis Analytics的Apache Flink:java.lang.IllegalArgumentException:要分配的内存部分不应为0

背景: 我一直在尝试在相同的Flink应用程序中设置Batch+Streaming,该应用程序部署在Kinesis Analytics运行时。流部件工作正常,但添加批处理支持时遇到问题。 Flink : Handling Keyed Streams with data older than application watermark Apache Flink : Batch Mode ..

为什么Flink在数据流连接+全局窗口上发出重复记录?

我正在学习/试验Flink,我观察到数据流联接的一些意外行为,希望了解发生的情况. 假设我有两个流,每个流有10条记录,我希望在id字段上加入这两个流。让我们假设一个流中的每个记录在另一个流中都有一个匹配的记录,并且ID在每个流中都是唯一的。我们还假设我必须使用全局窗口(要求)。 使用数据流API连接(我在Scala中的简化代码): val stream1 = ... // from ..
发布时间:2022-03-15 11:09:36 其他开发

Flink SQL客户端连接到安全的Kafka群集

我想对安全Kafka集群的Kafka主题支持的Flink SQL表执行查询。我能够以编程方式执行查询,但无法通过Flink SQL Client执行相同的操作。我不确定如何通过Flink SQL客户端传递JAAS配置(java.security.auth.login.config)和其他系统属性。 以编程方式刷新SQL查询 private static void simpleExec ..
发布时间:2022-03-15 11:07:17 其他开发

使用后端serviceName regex/通配符或选择器的Kubernetes入口

大家好,我们有一个使用Flink运算符获得的蓝绿色部署的Flink应用程序。 Flinkk8soperator用于Apache Flink。运营商部署后启动以下三个K8服务: my-flinkapp-14hdhsr (Top level service) my-flinkapp-green my-flinkapp-blue 设想是蓝绿色中的两个中的一个将处于活动状态,并且将具有Po ..

pyflink tableAPI,多个源到单个处理表序列

我正在尝试实现一个pyflink作业(通过Table API),在来自多个源的数据转换为标准格式后,该作业从多个源执行一些基本处理。我可以将每个源中的数据转换为所需的格式(具有指定列的“表架构”),但不能将该数据设置为“处理表”以供进一步实现。这就是我要做的: 若要将不同的源架构转换为标准的处理表方案,然后可以对其进行窗口化,请实现表函数(针对多个输入行)等。 编辑: 感谢您建议 ..
发布时间:2022-03-15 11:01:12 其他开发

不推荐使用DataStream.keyBy()方法的替代方法

在Flink 1.11DataStream API page上,有一个使用keyBy()的WindowWordCount程序,但此方法已弃用,我找不到任何不使用keyBy()如何重写它的示例。如有任何建议,我们将不胜感激。 我使用IntelliJ;它警告keyBy()已弃用。这是我能找到的唯一link。 推荐答案 接受整数或字符串参数的keyBy形式已弃用。您应该使用接受KeyS ..
发布时间:2022-03-15 10:59:10 其他开发

在Apple M1硅片上运行Apache Flink 1.12作业

我刚刚尝试使用Rosetta 2兼容层在采用新M1处理器的Apple Mac Pro上运行basic example for Apache Flink。 遗憾的是,它失败,出现以下堆栈跟踪: flink-1.12.2 ./bin/flink run ./examples/streaming/WordCount.jar Executing WordCount example with d ..
发布时间:2022-03-15 10:56:01 Java开发

Flink SQL窗口不报告最终结果

我使用Flink SQL来计算基于事件时间的窗口化分析。一切都很正常,直到我的数据源每天晚上变得空闲,之后直到第二天数据再次开始流动时才会产生最后一分钟的结果。 CREATE TABLE input id STRING, data BIGINT, rowtime TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK ..
发布时间:2022-03-15 10:53:21 其他开发

Flink Python数据流API Kafka生产者Sink序列化

您好,我正在尝试从一个卡夫卡主题中读取数据,并在进行一些处理后写入到另一个主题中。 当我试图将数据写入另一个主题时,我能够读取数据并对其进行处理。它会显示错误 如果我尝试按原样写入数据,而不对其进行任何处理。Kafka生产者SimpleStringSchema接受它。 但我想将字符串转换为Json。玩Json,然后以字符串格式将其写入另一个主题。 我的代码: import jso ..
发布时间:2022-03-15 10:50:39 Java开发

如何组织一个复杂的Apache Flink应用程序?

我们使用Flink从一些物联网传感器生成事件。每个传感器都可用于生成不同类型的事件(如温度、湿度等)。一对多比率(传感器启用的事件)。 传感器与存储在关系数据库中的启用事件之间的映射 为了丰富传感器数据,我们将连接传感器数据流和表API。正在添加具有已启用事件列表的元数据。 那么,如果某些特定的sensor-123只启用了TEMP和PRESSURE两个事件,如何才能只向这两个定义的流 ..
发布时间:2022-03-15 10:46:47 其他开发