带有状态处理器 api 的 Flink 状态后端配置 [英] Flink state backend config with the state processor api
问题描述
我使用 state-processor-api,因为它被发布来引导我的 flink 状态.我使用 RocksDBStateBackend 并且它有效.最近我们去了 flink 1.13,RocksDBStateBackend 被弃用,取而代之的是 EmbeddedRocksDBStateBackend.
I use the state-processor-api since it was released to bootstrap my flink states. I use a RocksDBStateBackend and it works. We went to flink 1.13 recently, and the RocksDBStateBackend was deprecated in favor of EmbeddedRocksDBStateBackend.
我的问题:
由于API的变化和我开发的新引导作业,我得到了以下异常:
Since the change of API and the new bootstrap job I developed, I got the following exception:
Caused by: java.io.IOException: 状态的大小大于允许的最大内存支持状态.大小=85356498,最大大小=5242880.考虑使用不同的状态后端,如文件系统状态后端.
Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=85356498 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
在这里我声明我的状态后端:
Here I declare my statebackend:
val backend = new EmbeddedRocksDBStateBackend(true)
这里我创建了我的保存点:
And here I create my savepoint:
Savepoint
.create(backend, MAX_PARALLELISM)
.withOperator("my_operator", transformMyOperator)
.write(savepointPath)
此外,我的 flink 集群配置为使用 RocksDB 状态后端,和所有其他 flink 拓扑都使用 RocksDB 后端.
Also, my flink cluster is configured to use a RocksDB state backend, and all others flink topologies use the RocksDB backend.
所以我想知道为什么我会收到一个异常,说我不应该使用内存状态后端,因为我使用的是 RocksDB.欢迎任何帮助.
So I wonder why I got an exception saying I should not use a memory state backend since I use RocksDB. Any help would be welcome.
推荐答案
是 1.13 的一个 bug 引起的,请参考 FLINK-23728,运行 1.14.0-RC0 确实为我解决了问题.
It is caused by a bug in 1.13, please see FLINK-23728, running 1.14.0-RC0 did solve the issue for me.
这篇关于带有状态处理器 api 的 Flink 状态后端配置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!