apache-flink相关内容

关于库伯内斯的Flink

我们正在使用Flink v1.12.2构建一个流处理作业,并计划在Kubernetes集群上运行它。在参考官方的Flink文档时,我们主要发现了向Kubernetes集群提交Flink作业的两种方式,一种是Standalone模式,另一种是Native模式。我们注意到,使用后一种选项时,没有YAML配置文件,看起来很简单。我只是想知道推荐的模式/方法是什么,以及它们的优缺点。谢谢。 推荐答案 ..
发布时间:2022-08-21 09:05:28 其他开发

带有死信队列的Flink Scala贴图

我正在尝试创建一些Scala函数,以帮助Flinkmap和filter操作将其错误重定向到死信队列。 然而,我正在努力解决Scala的类型擦除问题,这使我无法使它们成为泛型。下面mapWithDeadLetterQueue的实现未编译。 sealed trait ProcessingResult[T] case class ProcessingSuccess[T,U](result: ..
发布时间:2022-08-06 21:07:40 其他开发

将BroadCastConnectedStream连接到AsyncIO

我需要使AsyncIO丰富的函数根据最新的规则集执行调用。对于像map这样的操作,我能够通过关注这篇博客文章来处理具有丰富功能的BroadCastConnectedStream:https://flink.apache.org/2019/06/26/broadcast-state.html 但是,创建AsyncIO函数需要数据流作为输入,而BroadCastConnectedStream不需 ..
发布时间:2022-08-06 20:58:09 其他开发

在运行时添加Sentry Log4j2附加器

我一直在浏览以前关于在运行时添加Log4j2附加器的帖子,但似乎没有一个真正符合我的方案。 我们将一个长时间运行的Flink作业打包到一个FAT JAR中,我们实质上将其提交给正在运行的Flink集群。我们希望将错误日志转发到哨兵。Sentry提供了我希望能够使用的Log4j2 appender,但所有让Log4j2工作的尝试都失败了--对此有点疯狂(花了几天时间)。 由于Flink( ..
发布时间:2022-08-06 20:48:53 其他开发

Flink KeyedCoProcessFunction中的NPE

我在连接的流上使用KeyedCoProcessFunction,两个流都由id设置键,而且我使用MapState并在键不存在的情况下放置一个List类型的值,并且我也在processElement2中检查键的存在,所以理想情况下没有NPE的机会,但仍然可以得到它。 val joinStream = lookDataStream.keyBy(row -> row.getFieldA ..
发布时间:2022-08-06 20:41:37 其他开发

将集合流平面映射到其元素流

我有一个事件序列流,希望将其平面映射到事件流。 我对flatMap函数的语法有问题 val stream = DataStream[Seq[Event]] stream.flatMap(???) 如有任何帮助,我们将不胜感激 推荐答案 我建议您查看Flink附带的示例,例如wordcount application: val counts: DataStream ..
发布时间:2022-03-15 12:13:42 其他开发

未触发闪烁CEP事件

我已经在连接到本地Kafka Broker的Flink中实现了CEP模式,该模式可以像预期的那样工作。但是当我连接到基于群集的云Kafka设置时,Flink CEP没有触发。 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //saves chec ..
发布时间:2022-03-15 12:04:50 其他开发

在Flink 1.13中配置RocksDB

我读过有关Flink 1.13版本中EmbeddedRocksDBStateBackend的内容,但有大小限制,因此我希望保留以前Flink版本1.11的当前配置,但重点是这种配置RocksDB的方式已被弃用(new RocksDBStateBackend("path", true);)。 我已使用EmbeddedRocksDBStateBackend (new EmbeddedRocksD ..
发布时间:2022-03-15 12:02:11 Java开发

闪光。卡夫卡消费者没有收到卡夫卡的信息

我在Mac上将Kafka和Flink作为坞站容器运行。 我已经实现了Flink Job,它应该使用来自Kafka主题的消息。 我运行一个向主题发送消息的python生成器。 作业开始时没有问题,但零消息到达。 我相信消息发送到了正确的主题,因为我有能够使用消息的python使用者。 闪烁作业(Java): package com.p81.datapipeline.swg; ..
发布时间:2022-03-15 11:58:40 其他开发

Python Flink Stateful函数入口上的Kafka键访问

我一直在研究Flink Stateful函数。它看起来非常有希望--除了一件事--我希望我只是错过了它。 我怎么也看不到从Python中的Kafka入口处访问Kafka密钥的方法。在Java中,我看到可以使用反序列化程序并有效地将其打包到已解码的message对象中。但我找不到替代方案。 在我们的示例中,键包含值中不存在的有价值的信息。 有没有人碰到这个--或者我刚刚错过了? ..
发布时间:2022-03-15 11:55:56 Python

将最大并行度更改为现有作业

我们现在有一个现有的正在运行的Flink作业,该作业包含最大并行度设置为128的键控状态。随着我们数据的增长,我们担心128个在未来是不够的。我想知道我们是否有办法通过修改保存点来更改最大并行度?或者有没有办法做到这一点? 推荐答案 您可以使用状态处理器API来完成此操作。您将从从当前作业获取的保存点读取状态,并将该状态写入增加了最大并行度的新保存点。https://nightlies ..
发布时间:2022-03-15 11:53:12 其他开发

使用pyflink从本地系统以批处理模式读取CSV文件

我在编写pyflink作业时尝试读取已建立的CSV文件。我使用文件系统连接器来获取数据,但是在DDL上执行EXECUTE_SQL()并稍后在表上执行查询之后,我收到一个错误,说明它无法获取下一个结果。我无法解决此错误。我已经检查了CSV文件,它是完全正确的,并且可以与 pandas 一起工作,但是在这里我不明白为什么它不能获取下一行。请查找附加的代码以供参考。 from pyflink.co ..
发布时间:2022-03-15 11:47:12 Python