KSQL是在后台发出远程请求,还是Table实际上是全局KTable? [英] Is KSQL making remote requests under the hood, or is a Table actually a global KTable?

查看:76
本文介绍了KSQL是在后台发出远程请求,还是Table实际上是全局KTable?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含客户记录的Kafka主题,称为客户创建".每个客户都是该主题中的新记录.有四个分区.

我有两个基于docker镜像confluentinc/cp-ksql-server:5.3.0的ksql-server实例正在运行.两者都使用相同的 KSQL服务ID .

我已经创建了一个表格:

CREATE TABLE t_customer (id VARCHAR, 
                         firstname VARCHAR, 
                         lastname VARCHAR)
WITH (KAFKA_TOPIC = 'customer-created', 
      VALUE_FORMAT='JSON', 
      KEY = 'id');

我是KSQL的新手,但我的理解是KSQL建立在Kafka Streams之上,每个ksql-server实例大致等效于Kafka Streams应用程序实例.我注意到的第一件事是,一旦我启动了ksql-server的新实例,即使它是开发人员模式下的交互式实例,它也已经知道在第一个实例上创建的表/流.其次,我可以从两个实例中基于其ID选择相同的客户,但是我希望只能从一个实例中做到这一点,因为我假定KSQL表与KTable等效,即它仅应包含本地数据,即来自ksql-server实例正在处理的分区的数据.

SET 'auto.offset.reset'='earliest';
select * from t_customer where id = '7e1a141b-b8a6-4f4a-b368-45da2a9e92a1';

无论我将ksql-cli附加到ksql-server的哪个实例,都将得到结果.使用纯Kafka Streams时,使它起作用的唯一方法是使用全局KTable.我从这两个实例获得结果的事实使我有些惊讶,因为 docs ,"只有Kafka Streams DSL具有GlobalKTable的概念",因此我希望这两个实例中只有一个能够找到客户.我在任何地方都找不到任何文档来说明如何指定KSQL表应该是本地表还是全局表.

所以这是我的问题:是KSQL Table等同于 global KTable,并且文档具有误导性,还是我连接到的ksql-server实例在以下条件下发出远程请求罩到负责ID的实例(大概基于分区),如解决方案

KSQL不支持GlobalKTables atm.

您在KSQL服务器和Kafka Streams程序之间的类比并不是100%准确的.每个 query 是一个Kafka Streams程序(请注意,程序"可以具有多个 instances ).此外,持久查询和瞬时查询之间也存在差异.从主题创建TABLE时,命令本身仅是元数据操作(与从主题创建CREATE STREAM相似).对于这两者,都不会执行查询,也不会启动Kafka Streams程序.

有关所有创建的STREAMS和TABLES的信息存储在Kafka群集的共享命令主题"中.具有相同ID的所有服务器都会收到有关创建的流,表的相同信息.

在CLI中运行的

查询是瞬时查询,它们将由单个服务器执行.有关此类瞬时查询的信息分发给其他服务器.基本上,将生成唯一的查询ID(即application.id),并且服务器运行单个实例KafakStreams程序.因此,服务器/程序将预订所有分区.

持久查询(即CREATE STREAM ASCREATE TABLE AS)是查询STREAM或TABLE并生成STREAM或TABLE作为输出的查询.通过命令主题"将有关持久查询的信息分发给所有服务器(但是,并非所有服务器都将执行所有持久查询-这取决于配置的并行度,将执行多少个并行查询).对于持久查询,每个参与执行查询的服务器都会创建一个运行同一程序的KafkaStreams实例,并且所有实例都将使用相同的查询ID(即application.id),因此不同的服务器将订阅不同的主题./p>

I have a Kafka topic containing customer records, called "customer-created". Each customer is a new record in the topic. There are 4 partitions.

I have two ksql-server instances running, based on the docker image confluentinc/cp-ksql-server:5.3.0. Both use the same KSQL Service Id.

I've created a table:

CREATE TABLE t_customer (id VARCHAR, 
                         firstname VARCHAR, 
                         lastname VARCHAR)
WITH (KAFKA_TOPIC = 'customer-created', 
      VALUE_FORMAT='JSON', 
      KEY = 'id');

I'm new to KSQL, but my understanding was that KSQL builds on top of Kafka Streams and that each ksql-server instance is roughly equivalent to a Kafka streams application instance. The first thing I notice is that as soon as I start a new instance of the ksql-server, it already knows about the tables/streams created on the first instance, even though it is an interactive instance in developer mode. Second of all, I can select the same customer based on it's ID from both instances, but I expected to only be able to do that from one of the instances, because I assumed a KSQL Table is equivalent to a KTable, i.e. it should only contain local data, i.e. from the partitions being processed by the ksql-server instance.

SET 'auto.offset.reset'='earliest';
select * from t_customer where id = '7e1a141b-b8a6-4f4a-b368-45da2a9e92a1';

Regardless of which instance of the ksql-server I attach the ksql-cli to, I get a result. The only way that I can get this to work when using plain Kafka Streams, is to use a global KTable. The fact that I get the result from both instances surprised me a little because according to the docs, "Only the Kafka Streams DSL has the notion of a GlobalKTable", so I expected only one of the two instances to find the customer. I haven't found any docs anywhere that explain how to specify that a KSQL Table should be a local or global table.

So here is my question: is a KSQL Table the equivalent of a global KTable and the docs are misleading, or is the ksql-server instance that I am connected to, making a remote request under the hood, to the instance responsible for the ID (presumably based on the partition), as described here, for Kafka Streams?

解决方案

KSQL does not support GlobalKTables atm.

Your analogy between a KSQL server and a Kafka Streams program is not 100% accurate though. Each query is a Kafka Streams program (note, that a "program" can have multiple instances). Also, there is a difference between persistent queries and transient queries. When you create a TABLE from a topic, the command itself is a metadata operation only (similar for CREATE STREAM from a topic). For both, no query is executed and no Kafka Streams program is started.

The information about all creates STREAMS and TABLES is stored in a shared "command topic" in the Kafka Cluster. All servers with the same ID receive the same information about created streams, tables.

Queries run in the CLI are transient queries and they will be executed by a single server. The information about such transient queries is not distributed to other servers. Basically, a unique query-id (ie, application.id) is generated and the servers runs a single instance KafakStreams program. Hence, the server/program will subscribe to all partitions.

A persistent query (ie, CREATE STREAM AS or CREATE TABLE AS) is a query that queries a STREAM or TABLE and produces a STREAM or TABLE as output. The information about persistent queries is distributed via the "command topic" to all servers (however, not all servers will execute all persistent queries -- it depends on the configured parallelism how many will execute it). For persistent queries, each server that participates to execute the query creates a KafkaStreams instance running the same program, and all will use the same query-Id (ie, application.id) and thus different servers will subscribe to different topics.

这篇关于KSQL是在后台发出远程请求,还是Table实际上是全局KTable?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆