apache-flink相关内容
我们正在使用Flink v1.12.2构建一个流处理作业,并计划在Kubernetes集群上运行它。在参考官方的Flink文档时,我们主要发现了向Kubernetes集群提交Flink作业的两种方式,一种是Standalone模式,另一种是Native模式。我们注意到,使用后一种选项时,没有YAML配置文件,看起来很简单。我只是想知道推荐的模式/方法是什么,以及它们的优缺点。谢谢。 推荐答案
..
我的Customer类已使用maven-avro插件创建。当我尝试运行此程序时,收到的错误为Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo [main] INFO org.apache.flink.api.java.typeutils.TypeExt
..
我正在尝试每隔500 ms创建一个JSON数据集,并希望将其推送到Kafka主题,以便我可以在下游设置一些窗口并执行计算。以下是我的代码: package KafkaAsSource import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.ap
..
我正在尝试创建一些Scala函数,以帮助Flinkmap和filter操作将其错误重定向到死信队列。 然而,我正在努力解决Scala的类型擦除问题,这使我无法使它们成为泛型。下面mapWithDeadLetterQueue的实现未编译。 sealed trait ProcessingResult[T] case class ProcessingSuccess[T,U](result:
..
我需要使AsyncIO丰富的函数根据最新的规则集执行调用。对于像map这样的操作,我能够通过关注这篇博客文章来处理具有丰富功能的BroadCastConnectedStream:https://flink.apache.org/2019/06/26/broadcast-state.html 但是,创建AsyncIO函数需要数据流作为输入,而BroadCastConnectedStream不需
..
我一直在浏览以前关于在运行时添加Log4j2附加器的帖子,但似乎没有一个真正符合我的方案。 我们将一个长时间运行的Flink作业打包到一个FAT JAR中,我们实质上将其提交给正在运行的Flink集群。我们希望将错误日志转发到哨兵。Sentry提供了我希望能够使用的Log4j2 appender,但所有让Log4j2工作的尝试都失败了--对此有点疯狂(花了几天时间)。 由于Flink(
..
我在连接的流上使用KeyedCoProcessFunction,两个流都由id设置键,而且我使用MapState并在键不存在的情况下放置一个List类型的值,并且我也在processElement2中检查键的存在,所以理想情况下没有NPE的机会,但仍然可以得到它。 val joinStream = lookDataStream.keyBy(row -> row.getFieldA
..
首先,我已经阅读了这篇关于同一问题的post,并尝试遵循对他有效的相同解决方案(使用MVN创建新的快速入门并将代码迁移到那里),并且开箱即用IntelliJ时无法正常工作。 下面是我的pom.xml和来自另一个pom.xml的依赖项。我做错了什么?
..
我希望使用Flink、Scala-Language和addSource-以及ReadCsvFile-函数来读取CSV文件。我还没有找到任何关于这方面的简单例子。我只发现:https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/scala/com/dataartisans/flinktraining/
..
是否可以清除数据流中的当前水印? 不允许延迟的一个月水印示例输入: [ { timestamp: '10/2018' }, { timestamp: '11/2018' }, { timestamp: '11/2018', clearState: true }, { timestamp: '9/2018' } ] 正常情况下,‘9/2018’的记录会因为时间太晚而被
..
我有一个事件序列流,希望将其平面映射到事件流。 我对flatMap函数的语法有问题 val stream = DataStream[Seq[Event]] stream.flatMap(???) 如有任何帮助,我们将不胜感激 推荐答案 我建议您查看Flink附带的示例,例如wordcount application: val counts: DataStream
..
我需要能够向MyFunction传递扩展ProcessFunction的配置参数。在我的参数中,这是我唯一的方法吗?我不需要将它与每个元素一起传递。我可以以某种方式使用open方法吗? public class MyProcessFunction extends ProcessFunction, MyOutp
..
安装(下载&;tar zxf)Apache Flink 1.11.1并运行: ./bin/flink run examples/streaming/WordCount.jar大于等于5分钟后显示在友好消息上。提交尝试: Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExcep
..
我已经在连接到本地Kafka Broker的Flink中实现了CEP模式,该模式可以像预期的那样工作。但是当我连接到基于群集的云Kafka设置时,Flink CEP没有触发。 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //saves chec
..
我读过有关Flink 1.13版本中EmbeddedRocksDBStateBackend的内容,但有大小限制,因此我希望保留以前Flink版本1.11的当前配置,但重点是这种配置RocksDB的方式已被弃用(new RocksDBStateBackend("path", true);)。 我已使用EmbeddedRocksDBStateBackend (new EmbeddedRocksD
..
我在Mac上将Kafka和Flink作为坞站容器运行。 我已经实现了Flink Job,它应该使用来自Kafka主题的消息。 我运行一个向主题发送消息的python生成器。 作业开始时没有问题,但零消息到达。 我相信消息发送到了正确的主题,因为我有能够使用消息的python使用者。 闪烁作业(Java): package com.p81.datapipeline.swg;
..
我一直在研究Flink Stateful函数。它看起来非常有希望--除了一件事--我希望我只是错过了它。 我怎么也看不到从Python中的Kafka入口处访问Kafka密钥的方法。在Java中,我看到可以使用反序列化程序并有效地将其打包到已解码的message对象中。但我找不到替代方案。 在我们的示例中,键包含值中不存在的有价值的信息。 有没有人碰到这个--或者我刚刚错过了?
..
我们现在有一个现有的正在运行的Flink作业,该作业包含最大并行度设置为128的键控状态。随着我们数据的增长,我们担心128个在未来是不够的。我想知道我们是否有办法通过修改保存点来更改最大并行度?或者有没有办法做到这一点? 推荐答案 您可以使用状态处理器API来完成此操作。您将从从当前作业获取的保存点读取状态,并将该状态写入增加了最大并行度的新保存点。https://nightlies
..
在广播模式文档中提到没有RocksDB状态后端: No RocksDB state backend: Broadcast state is kept in-memory at runtime and memory provisioning should be done accordingly. This holds for all operator states. 如果应用程序使用roc
..
我在编写pyflink作业时尝试读取已建立的CSV文件。我使用文件系统连接器来获取数据,但是在DDL上执行EXECUTE_SQL()并稍后在表上执行查询之后,我收到一个错误,说明它无法获取下一个结果。我无法解决此错误。我已经检查了CSV文件,它是完全正确的,并且可以与 pandas 一起工作,但是在这里我不明白为什么它不能获取下一行。请查找附加的代码以供参考。 from pyflink.co
..