将Flink SQL用于Kafka支持的表时出现NoOffsetForPartitionException [英] NoOffsetForPartitionException when using Flink SQL with Kafka-backed table

查看:15
本文介绍了将Flink SQL用于Kafka支持的表时出现NoOffsetForPartitionException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的表:

CREATE TABLE orders (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `order_time` TIMESTAMP(3),
  WATERMARK FOR `order_time` AS order_time - INTERVAL '30' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'groupgroup',
  'value.format' = 'json'
);

我可以插入到表中:

INSERT into orders 
VALUES ('001', 'EURO', 9.10, TO_TIMESTAMP('2022-01-12 12:50:00', 'yyyy-MM-dd HH:mm:ss'));

我可以验证数据是否在那里:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic orders --from-beginning
{"id":"001","currency_code":"EURO","total":9.1,"order_time":"2022-01-12 12:50:00"}

但当我尝试查询表时,收到错误:

Flink SQL> select * from orders;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [orders-0]

推荐答案

scan.startup.mode的默认值为group-offsets,没有提交的组偏移量。您可以通过将scan.startup.mode设置为其他值(例如

)来修复此问题
'scan.startup.mode' = 'earliest-offset',

在表定义中,或在进行查询之前安排提交偏移量。

Flink将提交偏移量作为检查点的一部分,但是,如果您使用sql-client.sh或Zeppelin之类的工具以交互方式尝试Flink SQL,则将scan.startup.mode设置为earliest-offset是一个很好的解决方案。

这篇关于将Flink SQL用于Kafka支持的表时出现NoOffsetForPartitionException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆