带有状态处理器 api 的 Flink 状态后端配置 [英] Flink state backend config with the state processor api

查看:37
本文介绍了带有状态处理器 api 的 Flink 状态后端配置的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 state-processor-api,因为它被发布来引导我的 flink 状态.我使用 RocksDBStateBackend 并且它有效.最近我们去了 flink 1.13,RocksDBStateBackend 被弃用,取而代之的是 EmbeddedRocksDBStateBackend.

I use the state-processor-api since it was released to bootstrap my flink states. I use a RocksDBStateBackend and it works. We went to flink 1.13 recently, and the RocksDBStateBackend was deprecated in favor of EmbeddedRocksDBStateBackend.

我的问题:

由于API的变化和我开发的新引导作业,我得到了以下异常:

Since the change of API and the new bootstrap job I developed, I got the following exception:

Caused by: java.io.IOException: 状态的大小大于允许的最大内存支持状态.大小=85356498,最大大小=5242880.考虑使用不同的状态后端,如文件系统状态后端.

Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=85356498 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.

在这里我声明我的状态后端:

Here I declare my statebackend:

val backend = new EmbeddedRocksDBStateBackend(true)

这里我创建了我的保存点:

And here I create my savepoint:

  Savepoint
    .create(backend, MAX_PARALLELISM)
    .withOperator("my_operator", transformMyOperator)
    .write(savepointPath)

此外,我的 flink 集群配置为使用 RocksDB 状态后端,和所有其他 flink 拓扑都使用 RocksDB 后端.

Also, my flink cluster is configured to use a RocksDB state backend, and all others flink topologies use the RocksDB backend.

所以我想知道为什么我会收到一个异常,说我不应该使用内存状态后端,因为我使用的是 RocksDB.欢迎任何帮助.

So I wonder why I got an exception saying I should not use a memory state backend since I use RocksDB. Any help would be welcome.

推荐答案

是 1.13 的一个 bug 引起的,请参考 FLINK-23728,运行 1.14.0-RC0 确实为我解决了问题.

It is caused by a bug in 1.13, please see FLINK-23728, running 1.14.0-RC0 did solve the issue for me.

这篇关于带有状态处理器 api 的 Flink 状态后端配置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆