使用Kinesis Analytics的Apache Flink:java.lang.IllegalArgumentException:要分配的内存部分不应为0 [英] Apache Flink with Kinesis Analytics : java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0

查看:15
本文介绍了使用Kinesis Analytics的Apache Flink:java.lang.IllegalArgumentException:要分配的内存部分不应为0的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景: 我一直在尝试在相同的Flink应用程序中设置Batch+Streaming,该应用程序部署在Kinesis Analytics运行时。流部件工作正常,但添加批处理支持时遇到问题。

Flink : Handling Keyed Streams with data older than application watermark

Apache Flink : Batch Mode failing for Datastream API's with exception `IllegalStateException: Checkpointing is not allowed with sorted inputs.`

逻辑如下:

The logic is something like this :

streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
streamExecutionEnvironment.fromSource(FileSource.forRecordStreamFormat(new TextLineFormat(), path).build(),
WatermarkStrategy.noWatermarks(),
"Text File")
.process(process function which transforms input)
.assignTimestampsAndWatermarks(WatermarkStrategy
                .<DetectionEvent>forBoundedOutOfOrderness(orderness)
                .withTimestampAssigner(
                        (SerializableTimestampAssigner<Event>) (event, l) -> event.getEventTime()))
.keyBy(keyFunction)
.window(TumblingEventWindows(Time.of(x days))
.process(processWindowFunction);

执行此操作时,我收到以下异常:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:764)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_90bea66de1c231edf33913ecd54406c1_(1/1) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
    ... 10 more
Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:306)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:426)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:90)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 12 more
Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
    at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:672)
    at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:521)
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:302)
    ... 17 more

似乎kinesis-analytics不允许客户端定义Flink-conf.yaml文件来定义taskmanager.memory.managed.consumer-weights。有什么办法可以解决这个问题吗?

推荐答案

我不清楚此异常的根本原因,也不清楚如何在kda上进行批处理。

您可以尝试此操作(但我不确定KDA是否允许):

Configuration conf = new Configuration();
conf.setString("taskmanager.memory.managed.consumer-weights", "put-the-value-here");

StreamExecutionEnvironment env =
  StreamExecutionEnvironment.getExecutionEnvironment(conf);

这篇关于使用Kinesis Analytics的Apache Flink:java.lang.IllegalArgumentException:要分配的内存部分不应为0的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆