flink-streaming相关内容

关于库伯内斯的Flink

我们正在使用Flink v1.12.2构建一个流处理作业,并计划在Kubernetes集群上运行它。在参考官方的Flink文档时,我们主要发现了向Kubernetes集群提交Flink作业的两种方式,一种是Standalone模式,另一种是Native模式。我们注意到,使用后一种选项时,没有YAML配置文件,看起来很简单。我只是想知道推荐的模式/方法是什么,以及它们的优缺点。谢谢。 推荐答案 ..
发布时间:2022-08-21 09:05:28 其他开发

带有死信队列的Flink Scala贴图

我正在尝试创建一些Scala函数,以帮助Flinkmap和filter操作将其错误重定向到死信队列。 然而,我正在努力解决Scala的类型擦除问题,这使我无法使它们成为泛型。下面mapWithDeadLetterQueue的实现未编译。 sealed trait ProcessingResult[T] case class ProcessingSuccess[T,U](result: ..
发布时间:2022-08-06 21:07:40 其他开发

Flink KeyedCoProcessFunction中的NPE

我在连接的流上使用KeyedCoProcessFunction,两个流都由id设置键,而且我使用MapState并在键不存在的情况下放置一个List类型的值,并且我也在processElement2中检查键的存在,所以理想情况下没有NPE的机会,但仍然可以得到它。 val joinStream = lookDataStream.keyBy(row -> row.getFieldA ..
发布时间:2022-08-06 20:41:37 其他开发

将集合流平面映射到其元素流

我有一个事件序列流,希望将其平面映射到事件流。 我对flatMap函数的语法有问题 val stream = DataStream[Seq[Event]] stream.flatMap(???) 如有任何帮助,我们将不胜感激 推荐答案 我建议您查看Flink附带的示例,例如wordcount application: val counts: DataStream ..
发布时间:2022-03-15 12:13:42 其他开发

未触发闪烁CEP事件

我已经在连接到本地Kafka Broker的Flink中实现了CEP模式,该模式可以像预期的那样工作。但是当我连接到基于群集的云Kafka设置时,Flink CEP没有触发。 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //saves chec ..
发布时间:2022-03-15 12:04:50 其他开发

在Flink 1.13中配置RocksDB

我读过有关Flink 1.13版本中EmbeddedRocksDBStateBackend的内容,但有大小限制,因此我希望保留以前Flink版本1.11的当前配置,但重点是这种配置RocksDB的方式已被弃用(new RocksDBStateBackend("path", true);)。 我已使用EmbeddedRocksDBStateBackend (new EmbeddedRocksD ..
发布时间:2022-03-15 12:02:11 Java开发

将最大并行度更改为现有作业

我们现在有一个现有的正在运行的Flink作业,该作业包含最大并行度设置为128的键控状态。随着我们数据的增长,我们担心128个在未来是不够的。我想知道我们是否有办法通过修改保存点来更改最大并行度?或者有没有办法做到这一点? 推荐答案 您可以使用状态处理器API来完成此操作。您将从从当前作业获取的保存点读取状态,并将该状态写入增加了最大并行度的新保存点。https://nightlies ..
发布时间:2022-03-15 11:53:12 其他开发

闪烁中的类型信息

我在以JSON格式将数据从Flink发送到Kafka主题的位置有一个管道。我也能够从Kafka主题中获得它,也能够获得JSON属性。现在,像scala reflect类(我还可以在运行时比较数据类型)一样,我试图使用TypeInformation在Fink中做同样的事情,在那里我可以设置一些预定义的格式,从主题读取的任何数据都应该在这个Validation下,并且应该相应地传递或失败。 我有如下数 ..
发布时间:2022-03-15 11:43:33 其他开发

使用Kinesis Analytics的Apache Flink:java.lang.IllegalArgumentException:要分配的内存部分不应为0

背景: 我一直在尝试在相同的Flink应用程序中设置Batch+Streaming,该应用程序部署在Kinesis Analytics运行时。流部件工作正常,但添加批处理支持时遇到问题。 Flink : Handling Keyed Streams with data older than application watermark Apache Flink : Batch Mode ..

pyflink tableAPI,多个源到单个处理表序列

我正在尝试实现一个pyflink作业(通过Table API),在来自多个源的数据转换为标准格式后,该作业从多个源执行一些基本处理。我可以将每个源中的数据转换为所需的格式(具有指定列的“表架构”),但不能将该数据设置为“处理表”以供进一步实现。这就是我要做的: 若要将不同的源架构转换为标准的处理表方案,然后可以对其进行窗口化,请实现表函数(针对多个输入行)等。 编辑: 感谢您建议 ..
发布时间:2022-03-15 11:01:12 其他开发

Flink SQL窗口不报告最终结果

我使用Flink SQL来计算基于事件时间的窗口化分析。一切都很正常,直到我的数据源每天晚上变得空闲,之后直到第二天数据再次开始流动时才会产生最后一分钟的结果。 CREATE TABLE input id STRING, data BIGINT, rowtime TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK ..
发布时间:2022-03-15 10:53:21 其他开发

Flink Python数据流API Kafka生产者Sink序列化

您好,我正在尝试从一个卡夫卡主题中读取数据,并在进行一些处理后写入到另一个主题中。 当我试图将数据写入另一个主题时,我能够读取数据并对其进行处理。它会显示错误 如果我尝试按原样写入数据,而不对其进行任何处理。Kafka生产者SimpleStringSchema接受它。 但我想将字符串转换为Json。玩Json,然后以字符串格式将其写入另一个主题。 我的代码: import jso ..
发布时间:2022-03-15 10:50:39 Java开发

如何组织一个复杂的Apache Flink应用程序?

我们使用Flink从一些物联网传感器生成事件。每个传感器都可用于生成不同类型的事件(如温度、湿度等)。一对多比率(传感器启用的事件)。 传感器与存储在关系数据库中的启用事件之间的映射 为了丰富传感器数据,我们将连接传感器数据流和表API。正在添加具有已启用事件列表的元数据。 那么,如果某些特定的sensor-123只启用了TEMP和PRESSURE两个事件,如何才能只向这两个定义的流 ..
发布时间:2022-03-15 10:46:47 其他开发