apache-flink相关内容

表选项不包含选项键“连接器"

我想使用 flink sql 客户端创建一个 hive 表. 我可以成功创建表 t2,但是当我查询 t2 时,它抱怨 表选项不包含用于发现连接器的选项键“连接器". 我在 conf/sql-client-defaults.yaml 文件中将执行类型设置为批处理, 我会问这里有什么问题.谢谢! Flink SQL>使用 testdb1;Flink SQL>创建表 t2(id int ..
发布时间:2021-11-12 01:16:09 其他开发

无法将 Flink 指标公开给 Prometheus

我试图将 Flink 的内置指标公开给 Prometheus,但不知何故 Prometheus 无法识别目标 - JMX 以及 PrometheusReporter. prometheus.yml 中定义的抓取如下所示: scrape_configs:- 作业名称:节点静态配置:- 目标:['本地主机:9100']- job_name: 'kafka-server'静态配置:- 目标:[' ..
发布时间:2021-11-12 01:16:06 Java开发

flink 不在 std 输出中打印对象

我正在 flink 中做一个简单的程序,但它不会在 std 输出中打印任何字符串.我已经尝试在流上使用 print() 方法,并在其结果上使用 reduce 函数和 after print() 方法.这是一个代码: 公共类 StreamingJob { public static void main(String[] args) 抛出异常 {最终 StreamExecutionEnviro ..
发布时间:2021-11-12 01:16:03 其他开发

flink 计数不同的问题

现在我们使用滚动窗口来计算不同的.我们遇到的问题是,如果我们将翻滚窗口从一天延长到一个月,我们将无法获得目前不同的数量.这意味着如果我们将翻滚窗口设置为 1 个月,我们得到的数字来自每个月的 1 号.我现在如何获得当前的非重复计数(现在是 3 月 9 日.)? 包flink.trigger;导入 org.apache.flink.api.common.state.ReducingState;导入 ..
发布时间:2021-11-12 01:16:00 其他开发

带有 onEventTime 触发器的 Flink 会话窗口?

我想在 Flink 中创建一个基于 EventTime 的会话窗口,以便在新消息的事件时间比创建该窗口的消息的事件时间大 180 秒时触发. 例如: t1(0 seconds) : msg1 我想创建一个触发器,以便通过适当的水印或 onEventTime 触发器实现上述行为.任何人都可以提供一些示例来实现这一目标吗? 解决方案 解决此问题的最佳方法可能是使用 ProcessF ..
发布时间:2021-11-12 01:15:58 其他开发

Apache Flink - 是否可以平均分配插槽共享组?

我们有一个包含操作的管道,分为 2 个工作负载 - Source ->Transform 位于第一组并且是 CPU 密集型工作负载,它们被放入同一个插槽共享组中,比如说 source.而 Sink,RAM 密集型工作负载,因为它使用批量上传并在内存中保存大量数据.发送到sink槽共享组. 另外,我们有一个不同的并行级别Source ->Transform 工作负载和 Sink 工作负载作为第 ..
发布时间:2021-11-12 01:15:55 其他开发

如何在 Flink 中加载外部 jars

我在单机集群模式下向Flink提交作业时,发现每次taskManager都会从jobManager中取jar(即使是同一个jar),耗时很长.我想知道是否可以将这些 jar 保存在每个工作节点中,以便它们每次运行时自动在本地加载 jar. 解决方案 你可以用你的内部代码/逻辑创建一个最小的 jar,并确保你依赖的所有 jar 在 /lib 中可用 文件夹(根据 Arthur 的建议).这将 ..
发布时间:2021-11-12 01:15:52 Java开发

NFS(Netapp 服务器)->Flink ->s3

我是 flink (java) 的新手,并试图将作为文件路径安装的 netapp 文件服务器上的 xml 文件移动到安装了 flink 的服务器上. 如何实时进行批处理或流处理以获取进入文件夹的文件并使用 s3 将其接收. 我在 flink-starter 中找不到任何从本地文件系统读取文件的示例,flink 至少是这个用例的正确选择吗?如果是这样,我在哪里可以找到资源来收听文件夹和管 ..
发布时间:2021-11-12 01:15:46 Java开发

使用 cassandra 数据库查询作为 Flink 程序的源代码

我有一个 Cassandra 数据库,它必须在我的 Flink 程序中从诸如 Steam 之类的套接字接收数据以进行流处理.所以,我写了一个简单的客户端程序,从Cassandra中读取数据并将数据发送到socket;另外,我在server base中编写了Flink程序.其实我的客户端程序很简单,没有使用任何Flink指令;它只是将字符串格式的 Cassandra 行发送到套接字,服务器必须接收该 ..
发布时间:2021-11-12 01:15:43 Java开发

运行时的 Apache Flink 映射

我已经构建了一个 flink 流作业来从 kafka 读取一个 xml 文件,转换该文件并将其写入数据库中.由于 xml 文件中的属性与数据库列名称不匹配,因此我为映射构建了一个 switch case. 由于这不是很灵活,我想从代码中去掉这个硬连线的映射信息.首先,我想出了一个映射文件的想法,它可能如下所示: path.in.xml.to.attribut=database.column ..
发布时间:2021-11-12 01:15:41 Java开发

处理键控窗口后如何立即清除状态?

我的应用程序使用由时间戳函数键控的键控窗口.这意味着一旦该特定窗口被触发和处理,保持该键处于活动状态就没有用了,因为该特定键不可能再次出现.由于此用例涉及不断扩展密钥,因此我希望在完成处理后立即清除密钥的状态,而无需配置计时器. 这是可以在每个键控窗口完成处理后在 evictor 方法或 apply 方法中实现的吗? 解决方案 Windows 自动清理它们的状态.我能想到的唯一可能需 ..
发布时间:2021-11-12 01:15:38 其他开发

Flink SQL 中跳跃窗口上的指数衰减移动平均值:铸造时间

现在我们在 Flink 中拥有带有花式窗口的 SQL,我试图将衰减的移动平均线引用为“在未来的 Flink 版本中对于 Table API 和 SQL 的可能性".来自他们的 SQL 路线图/预览 2017-03 帖子: 表.window(每 1.second 滑动 1.hour 为 'w).groupBy('productId, 'w).选择('结束,'产品编号,('unitPrice * ( ..

Apache flink:从 RocksDB 后端的保存点延迟加载

我们希望使用带有 RocksDB 后端 (HDFS) 的 Apache Flink 进行有状态的流处理.但是,我们的应用程序状态(键控状态)将以 TB 级为单位. 据我所知,当我们从保存点恢复作业时,所有操作员状态数据将从 HDFS 上的保存点位置传送到每个任务管理器.如果状态是TB级的,那么每次部署都会导致很长的停机时间,如果所有这些状态都需要转移. 我想了解,如果在 RocksDB ..
发布时间:2021-11-12 01:15:32 其他开发

如何在输出方法期间在数据集中生成动态路径

有没有办法在 Flink 中创建动态 DataSink 输出路径? DataSet 的数据类型为 Tuple2 当我们尝试使用流时,我有一种使用自定义 Bucketer 生成动态浴的方法,如下所示 @Overridepublic Path getBucketPath(Clock clock, Path basePath, Tuple2 元素) {返回新路径(basePath + "/ ..
发布时间:2021-11-12 01:15:29 其他开发

Flink Job 子任务需要几分钟才能启动,而源会立即启动

所以我最近将我的 flink 作业的并行度从 10 增加到 50.目前设置是 25 个任务管理器,每个任务管理器有 2 个任务槽,每个有 2 个 cpu.具有 5 个 CPU 的作业管理器.但是出于某种原因,当作业开始时,它能够立即启动两个源的所有子任务,但作业的其余部分需要一分钟或更长时间才能启动所有其他操作员的子任务. 我知道这不应该发生,但我不确定需要更改什么才能解决此问题.这是 ui ..
发布时间:2021-11-12 01:15:25 其他开发

flink 中自定义类的 hashCode() 和 equals() 方法

我怀疑 Flink 中的自定义类是否需要覆盖 hashCode() 和 equals() 方法,因为我已经阅读了 这个页面 hashCode() 绝不能在分布式系统中实现,Apache Flink 就是其中之一. 示例:我有这个类: public class EventCounter {公共字符串 ID;公长计数;公共时间戳 firstEvent;公共时间戳 lastEvent;公开日期; ..

如何在Kafka和Flink环境下测试性能?

如何以 kafka 作为输入源对 Flink 进行性能测试.另外,请推荐是否有适用于这种情况的性能测试工具. 解决方案 Flink 包括吞吐量(numRecordsInPerSecond 和 numRecordsOutPerSecond)和 延迟. 如果您想更仔细地测量端到端的延迟,您可以在接收器(或其他终端节点)中添加自定义指标,将事件中的时间戳与当前时间进行比较.看起来像这样: ..
发布时间:2021-11-12 01:15:16 其他开发

使用 MiniCluster 测试 flink 作业以使用处理时间触发计时器

在使用MiniClusterWithClientResource测试flink作业时,有没有办法控制触发定时器的处理时间? 我能够在单元测试中使用testharness 并控制处理时间,即: //直接提前算子的处理时间触发处理时间定时器testHarness.setProcessingTime(300000) 这样.我可以在指定时间触发定时器 但是,我现在需要的是使用 mi ..
发布时间:2021-11-12 01:15:13 其他开发

如何在输出方法期间在数据集中生成动态路径

有没有办法在 Flink 中创建动态 DataSink 输出路径? DataSet 的数据类型为 Tuple2 当我们尝试使用流时,我有一种使用自定义 Bucketer 生成动态浴的方法,如下所示 @Overridepublic Path getBucketPath(Clock clock, Path basePath, Tuple2 元素) {返回新路径(basePath + "/ ..
发布时间:2021-11-12 01:15:10 其他开发