apache-flink相关内容

如何使用处理时间模型检查流中项目的内部 flink 时间戳?

我希望用它在 flink 中到达的时间标记我的流中的数据,以便我可以执行一些计算.我意识到在使用事件时间模型时我可以直接控制它,但我希望有一些简单的方法来发现在流上做出窗口决策时使用的时间戳. 解决方案 Flink 支持三种工作模式: 处理时间:根据每个操作员的当前时间处理事件 事件时间:事件是与手动分配的时间戳相关的过程. 摄取时间:根据 Flink 摄取事件时自动分配的时间戳 ..
发布时间:2021-11-12 01:17:07 其他开发

Apache Flink:在 MapReduce() 中正确进行异步网络服务调用

我有一个具有以下 mapPartition 函数的程序: public void mapPartition(Iterable values, Collector> out) 我从输入的 values 中收集 100 个批次 &将它们发送到网络服务进行转换.我将结果添加回 out 集合. 为了加快进程,我通过使用 Executors 进行了网络服务调用 async.这造成了问题,要么我得到 ..
发布时间:2021-11-12 01:17:04 Java开发

kafka 源流上的事件时间窗口

Kafka 服务器中有一个主题.在程序中,我们以流的形式读取这个topic,并分配事件时间戳.然后对该流进行窗口操作.但该程序不起作用.调试后,似乎没有执行WindowOperator的processWatermark方法.这是我的代码. DataStream>广告 = 环境.addSource(new FlinkKafkaConsumer082("advertisement", new Si ..
发布时间:2021-11-12 01:16:57 其他开发

Flink 自定义触发器给出意外输出

我想创建一个 Trigger,它第一次在 20 秒内触发,之后每五秒触发一次.我使用了 GlobalWindows 和一个自定义的 Trigger val windowedStream = valueStream.keyBy(0).window(GlobalWindows.create()).trigger(TradeTrigger.of()) 这是TradeTrigger中的代码: @Pu ..
发布时间:2021-11-12 01:16:55 其他开发

使用 Flink Rich InputFormat 创建 Elasticsearch 的输入格式

我们使用的是 Elasticsearch 6.8.4 和 Flink 1.0.18. 我们在 elasticsearch 中有一个包含 1 个分片和 1 个副本的索引,我想创建自定义输入格式以使用具有 1 个以上输入拆分的 apache Flink 数据集 API 在 elasticsearch 中读取和写入数据,以实现更好的性能.那么有什么办法可以达到这个要求吗? 注意:每个文档的大 ..
发布时间:2021-11-12 01:16:49 其他开发

使用 MiniCluster 测试 flink 作业以使用处理时间触发计时器

在使用MiniClusterWithClientResource测试flink作业时,有没有办法控制触发定时器的处理时间? 我能够在单元测试中使用testharness 并控制处理时间,即: //直接提前算子的处理时间触发处理时间定时器testHarness.setProcessingTime(300000) 这样.我可以在指定时间触发定时器 但是,我现在需要的是使用 mi ..
发布时间:2021-11-12 01:16:46 其他开发

Apache Flink - 将流与输入 Kafka 主题一样进行分区

我想在 Apache Flink 中实现以下场景: 给定一个有 4 个分区的 Kafka 主题,我想在 Flink 中使用不同的逻辑独立处理分区内数据,具体取决于事件的类型. 特别地,假设输入 Kafka 主题包含之前图像中描述的事件.每个事件都有不同的结构:分区 1 具有字段“a";作为键,分区 2 具有字段“b";作为关键等.在 Flink 中,我想根据事件应用不同的业务逻辑,所以 ..

Apache Flink 中的空窗口

我有一个带事件时间的数据流,是否可以在 Apache Flink 中发出没有数据的窗口?例如,滚动窗口大小为 15 分钟,如果在 [(0, 15), (15, 20), ...] 例如. 解决方案 不,这是不可能的.Flink 的 window 操作符只会在添加第一个事件时创建一个窗口. 不为空窗口发出结果的原因是,窗口通常定义在一个键控流上(如果没有键控流,就不可能并行处理窗口). ..
发布时间:2021-11-12 01:16:40 Java开发

flink Operator State 是线程安全的吗?

在 Operator State(或非键控状态)下,每个 operator state 都绑定到一个并行的 operator 实例 以上引文来自Flink官网.每个并行算子实例可能有线程池.当这些线程访问 Operator State 时(如上所述,每个并行 operator 实例可以有一个 operator state),是否会遇到线程安全问题?我应该在 Java 中使用具有以下术语的 O ..
发布时间:2021-11-12 01:16:31 Java开发

Apache Flink:即使没有输入记录到达给定聚合窗口,也基于键控状态在 Flink 中发出输出记录

我正在尝试将 Apache Flink 用于 IoT 应用程序.我有一堆设备可以处于几种状态中的一种.当设备更改状态时,它会发出一条消息,其中包含事件时间戳及其更改的状态.对于一台设备,这可能如下所示: {Device_id: 1, Event_Timestamp: 9:01, State: STATE_1} {Device_id: 1, Event_Timestamp: 9:03, ..
发布时间:2021-11-12 01:16:29 其他开发

flink 中自定义类的 hashCode() 和 equals() 方法

我怀疑 Flink 中的自定义类是否需要覆盖 hashCode() 和 equals() 方法,因为我已经阅读了 这个页面 hashCode() 绝不能在分布式系统中实现,Apache Flink 就是其中之一. 示例:我有这个类: public class EventCounter {公共字符串 ID;公长计数;公共时间戳 firstEvent;公共时间戳 lastEvent;公开日期; ..

Flink 不发出存储在 Cassandra 中的值

我有以下 POJO 课程, import com.datastax.driver.mapping.annotations.Column;导入 com.datastax.driver.mapping.annotations.Table;@Table(keyspace = "testKey", name = "contact")公共类 Person 实现可序列化 {private static fi ..
发布时间:2021-11-12 01:16:21 其他开发

从 FasterXML 读取值时 Flink 收集器问题

我有 Kafka 值作为字符串,POJO 如下, {"name":"John","timeStamp":"2020-08-11T13:31:31"} class Person{私人字符串名称;私人 LocalDateTime 时间戳;} 这个时间戳来自Kafka的字符串,并将它们转换为LocalDateTime. 当我使用来自 FasterXML 的必需库作为 Standalone 和 ..
发布时间:2021-11-12 01:16:18 其他开发

关于 StateTtlConfig

我正在为 MapState 配置我的 StateTtlConfig,我感兴趣的是进入状态的对象有 3 个小时的生命,然后它们应该从状态中消失并传递给 GC 进行清理并释放一些内存和检查点我认为也应该释放一些重量.我之前有这个配置,但它似乎不起作用,因为检查点总是在增长: private final StateTtlConfig ttlConfig = StateTtlConfig.newBuil ..
发布时间:2021-11-12 01:16:15 其他开发