将Flink SQL用于Kafka支持的表时出现NoOffsetForPartitionException [英] NoOffsetForPartitionException when using Flink SQL with Kafka-backed table
本文介绍了将Flink SQL用于Kafka支持的表时出现NoOffsetForPartitionException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
这是我的表:
CREATE TABLE orders (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`order_time` TIMESTAMP(3),
WATERMARK FOR `order_time` AS order_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'groupgroup',
'value.format' = 'json'
);
我可以插入到表中:
INSERT into orders
VALUES ('001', 'EURO', 9.10, TO_TIMESTAMP('2022-01-12 12:50:00', 'yyyy-MM-dd HH:mm:ss'));
我可以验证数据是否在那里:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic orders --from-beginning
{"id":"001","currency_code":"EURO","total":9.1,"order_time":"2022-01-12 12:50:00"}
但当我尝试查询表时,收到错误:
Flink SQL> select * from orders;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [orders-0]
推荐答案
scan.startup.mode
的默认值为group-offsets
,没有提交的组偏移量。您可以通过将scan.startup.mode
设置为其他值(例如
'scan.startup.mode' = 'earliest-offset',
在表定义中,或在进行查询之前安排提交偏移量。
Flink将提交偏移量作为检查点的一部分,但是,如果您使用sql-client.sh或Zeppelin之类的工具以交互方式尝试Flink SQL,则将scan.startup.mode
设置为earliest-offset
是一个很好的解决方案。
这篇关于将Flink SQL用于Kafka支持的表时出现NoOffsetForPartitionException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文