flink-streaming相关内容
我们正在使用Flink v1.12.2构建一个流处理作业,并计划在Kubernetes集群上运行它。在参考官方的Flink文档时,我们主要发现了向Kubernetes集群提交Flink作业的两种方式,一种是Standalone模式,另一种是Native模式。我们注意到,使用后一种选项时,没有YAML配置文件,看起来很简单。我只是想知道推荐的模式/方法是什么,以及它们的优缺点。谢谢。 推荐答案
..
我的Customer类已使用maven-avro插件创建。当我尝试运行此程序时,收到的错误为Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo [main] INFO org.apache.flink.api.java.typeutils.TypeExt
..
我正在尝试创建一些Scala函数,以帮助Flinkmap和filter操作将其错误重定向到死信队列。 然而,我正在努力解决Scala的类型擦除问题,这使我无法使它们成为泛型。下面mapWithDeadLetterQueue的实现未编译。 sealed trait ProcessingResult[T] case class ProcessingSuccess[T,U](result:
..
我在连接的流上使用KeyedCoProcessFunction,两个流都由id设置键,而且我使用MapState并在键不存在的情况下放置一个List类型的值,并且我也在processElement2中检查键的存在,所以理想情况下没有NPE的机会,但仍然可以得到它。 val joinStream = lookDataStream.keyBy(row -> row.getFieldA
..
首先,我已经阅读了这篇关于同一问题的post,并尝试遵循对他有效的相同解决方案(使用MVN创建新的快速入门并将代码迁移到那里),并且开箱即用IntelliJ时无法正常工作。 下面是我的pom.xml和来自另一个pom.xml的依赖项。我做错了什么?
..
是否可以清除数据流中的当前水印? 不允许延迟的一个月水印示例输入: [ { timestamp: '10/2018' }, { timestamp: '11/2018' }, { timestamp: '11/2018', clearState: true }, { timestamp: '9/2018' } ] 正常情况下,‘9/2018’的记录会因为时间太晚而被
..
我有一个事件序列流,希望将其平面映射到事件流。 我对flatMap函数的语法有问题 val stream = DataStream[Seq[Event]] stream.flatMap(???) 如有任何帮助,我们将不胜感激 推荐答案 我建议您查看Flink附带的示例,例如wordcount application: val counts: DataStream
..
我需要能够向MyFunction传递扩展ProcessFunction的配置参数。在我的参数中,这是我唯一的方法吗?我不需要将它与每个元素一起传递。我可以以某种方式使用open方法吗? public class MyProcessFunction extends ProcessFunction, MyOutp
..
我已经在连接到本地Kafka Broker的Flink中实现了CEP模式,该模式可以像预期的那样工作。但是当我连接到基于群集的云Kafka设置时,Flink CEP没有触发。 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //saves chec
..
我读过有关Flink 1.13版本中EmbeddedRocksDBStateBackend的内容,但有大小限制,因此我希望保留以前Flink版本1.11的当前配置,但重点是这种配置RocksDB的方式已被弃用(new RocksDBStateBackend("path", true);)。 我已使用EmbeddedRocksDBStateBackend (new EmbeddedRocksD
..
我们现在有一个现有的正在运行的Flink作业,该作业包含最大并行度设置为128的键控状态。随着我们数据的增长,我们担心128个在未来是不够的。我想知道我们是否有办法通过修改保存点来更改最大并行度?或者有没有办法做到这一点? 推荐答案 您可以使用状态处理器API来完成此操作。您将从从当前作业获取的保存点读取状态,并将该状态写入增加了最大并行度的新保存点。https://nightlies
..
在广播模式文档中提到没有RocksDB状态后端: No RocksDB state backend: Broadcast state is kept in-memory at runtime and memory provisioning should be done accordingly. This holds for all operator states. 如果应用程序使用roc
..
我在以JSON格式将数据从Flink发送到Kafka主题的位置有一个管道。我也能够从Kafka主题中获得它,也能够获得JSON属性。现在,像scala reflect类(我还可以在运行时比较数据类型)一样,我试图使用TypeInformation在Fink中做同样的事情,在那里我可以设置一些预定义的格式,从主题读取的任何数据都应该在这个Validation下,并且应该相应地传递或失败。 我有如下数
..
我有一个ProcessAllWindowFunction实现(请参阅下面代码中的AttributeBackLogEvents()),它有相当多的I/O,可能需要30秒以上。windowAll()正在使用30秒的TumblingProcessingTimeWindows对数据进行窗口化。 attributedStream .windowAll(TumblingProce
..
背景: 我一直在尝试在相同的Flink应用程序中设置Batch+Streaming,该应用程序部署在Kinesis Analytics运行时。流部件工作正常,但添加批处理支持时遇到问题。 Flink : Handling Keyed Streams with data older than application watermark Apache Flink : Batch Mode
..
为了丰富数据流,我们计划将MySQL(MemSQL)服务器连接到现有的Flink流应用程序 我们可以看到,Flink提供了一个带有JDBC连接器的表APIhttps://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/ 此外,我还发现了另一个名为Flink-CDChttps://verver
..
我正在尝试实现一个pyflink作业(通过Table API),在来自多个源的数据转换为标准格式后,该作业从多个源执行一些基本处理。我可以将每个源中的数据转换为所需的格式(具有指定列的“表架构”),但不能将该数据设置为“处理表”以供进一步实现。这就是我要做的: 若要将不同的源架构转换为标准的处理表方案,然后可以对其进行窗口化,请实现表函数(针对多个输入行)等。 编辑: 感谢您建议
..
我使用Flink SQL来计算基于事件时间的窗口化分析。一切都很正常,直到我的数据源每天晚上变得空闲,之后直到第二天数据再次开始流动时才会产生最后一分钟的结果。 CREATE TABLE input id STRING, data BIGINT, rowtime TIMESTAMP(3) METADATA FROM 'timestamp', WATERMARK
..
您好,我正在尝试从一个卡夫卡主题中读取数据,并在进行一些处理后写入到另一个主题中。 当我试图将数据写入另一个主题时,我能够读取数据并对其进行处理。它会显示错误 如果我尝试按原样写入数据,而不对其进行任何处理。Kafka生产者SimpleStringSchema接受它。 但我想将字符串转换为Json。玩Json,然后以字符串格式将其写入另一个主题。 我的代码: import jso
..
我们使用Flink从一些物联网传感器生成事件。每个传感器都可用于生成不同类型的事件(如温度、湿度等)。一对多比率(传感器启用的事件)。 传感器与存储在关系数据库中的启用事件之间的映射 为了丰富传感器数据,我们将连接传感器数据流和表API。正在添加具有已启用事件列表的元数据。 那么,如果某些特定的sensor-123只启用了TEMP和PRESSURE两个事件,如何才能只向这两个定义的流
..