ksql,在表上选择不显示任何内容 [英] ksql, select on table not show anything

查看:23
本文介绍了ksql,在表上选择不显示任何内容的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个源主题订阅者有这样的输入消息:

I created a source topic subscriber have input message like this:

{
  "ip_router": "",
  "ip_lan": "",
  "isdn": "2046573688",
  "end_datetime": "",
  "shop_code": "1000405100",
  "reg_type_id": "5131615",
  "contract_id": "",
  "update_datetime": "20170801171355",
  "project": "",
  "telecom_service_id": "2",
  "local_speed": "",
  "password": "",
  "price_plan": "",
  "vip": "",
  "local_price_plan": "",
  "sub_id": "1083168000",
  "sta_datetime": "20090511152847",
  "update_number_1": "1",
  "act_status": "000",
  "network_class": "",
  "limit_usage": "",
  "num_reset_zone": "",
  "deposit": "",
  "create_user": "TUDV_POPBGG",
  "num_of_computer": "",
  "cust_id": "10922428129",
  "status": "2",
  "active_datetime": "20090511152102",
  "ip_view": "",
  "channel_type_id": "",
  "ip_wan": "",
  "imsi": "452049760887694",
  "infrastructure_type": "",
  "product_code": "HPN03",
  "expire_datetime": "",
  "speed": "",
  "private_ip": "",
  "update_user": "MIGRATE",
  "ip_static": "",
  "vlan": "",
  "sub_type": "",
  "create_datetime": "20090511152102",
  "is_info_completed": "1",
  "pay_type": "2",
  "up_link": "",
  "promotion_code": "",
  "technology": "",
  "offer_id": "400001035",
  "dev_staff_id": "",
  "account_id": "",
  "deploy_accept_date": "",
  "serial": "8984049767000887694",
  "group_id": "",
  "ip_gateway": "",
  "first_connect": "",
  "org_product_code": "MIGRATE",
  "start_money": "100000",
  "keep_alive": "",
  "account": ""
}

然后我在上面创建了一个流和一个表:

And then I created a stream and a table on it:

CREATE STREAM str_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC='subscriber', VALUE_FORMAT='JSON');

CREATE TABLE tbl_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC='subscriber', VALUE_FORMAT='JSON', KEY = 'sub_id' );

我尝试使用 ksql 进行测试:

I tried to use ksql for test:

SELECT * FROM str_subscriber_json; 

(当我将新的 json 放入订阅者主题时打印结果)

SELECT * FROM tbl_subscriber_json; 

(当我将新的 json 放入订阅者主题时没有显示任何内容)

所以请为我澄清这个案例有什么问题?

So please clarify for me what wrong for this case?

非常感谢.

推荐答案

总结

您的消息需要键入.如果您没有消息键,那么 TABLE 的语义就没有任何意义(因为如果没有键,您就无法显示键的值).

Summary

Your messages need to be keyed. If you don't have a message key then the semantics of a TABLE don't make any sense (since you can't show the value for a key, if there is no key).

我已经复制了您的示例,使用 kafkacat 生成带和不带密钥的消息.

I've reproduced your example, using kafkacat to produce messages with and without a key.

$ echo '{ "ip_router": "", "ip_lan": "", "isdn": "2046573688", "end_datetime": "", "shop_code": "1000405100", "reg_type_id": "5131615", "contract_id": "", "update_datetime": "20170801171355", "project": "", "telecom_service_id": "2", "local_speed": "", "password": "", "price_plan": "", "vip": "", "local_price_plan": "", "sub_id": "1083168000", "sta_datetime": "20090511152847", "update_number_1": "1", "act_status": "000", "network_class": "", "limit_usage": "", "num_reset_zone": "", "deposit": "", "create_user": "TUDV_POPBGG", "num_of_computer": "", "cust_id": "10922428129", "status": "2", "active_datetime": "20090511152102", "ip_view": "", "channel_type_id": "", "ip_wan": "", "imsi": "452049760887694", "infrastructure_type": "", "product_code": "HPN03", "expire_datetime": "", "speed": "", "private_ip": "", "update_user": "MIGRATE", "ip_static": "", "vlan": "", "sub_type": "", "create_datetime": "20090511152102", "is_info_completed": "1", "pay_type": "2", "up_link": "", "promotion_code": "", "technology": "", "offer_id": "400001035", "dev_staff_id": "", "account_id": "", "deploy_accept_date": "", "serial": "8984049767000887694", "group_id": "", "ip_gateway": "", "first_connect": "", "org_product_code": "MIGRATE", "start_money": "100000", "keep_alive": "", "account": "" }' \
| kafkacat -b localhost:9092 -P -t subscriber

流输出

注意第二列中的 null - 这是键(第一列是消息的时间戳;其余列是消息中声明的字段)

Stream output

Note the null in the second column - this is the key (the first column is the timestamp of the message; the remainder of the columns are the declared fields in the message)

ksql> select * from str_subscriber_json;
1528368689380 | null | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

表格输出

ksql> SELECT * FROM tbl_subscriber_json;

(无输出)

key在这里任意设置为1,使用kafkacat-K标志来指定:作为键/值分隔符.

The key is set arbitrarily here to 1, using kafkacat's -K flag to specify : as the key/value separator.

$ echo '1:{ "ip_router": "", "ip_lan": "", "isdn": "2046573688", "end_datetime": "", "shop_code": "1000405100", "reg_type_id": "5131615", "contract_id": "", "update_datetime": "20170801171355", "project": "", "telecom_service_id": "2", "local_speed": "", "password": "", "price_plan": "", "vip": "", "local_price_plan": "", "sub_id": "1083168000", "sta_datetime": "20090511152847", "update_number_1": "1", "act_status": "000", "network_class": "", "limit_usage": "", "num_reset_zone": "", "deposit": "", "create_user": "TUDV_POPBGG", "num_of_computer": "", "cust_id": "10922428129", "status": "2", "active_datetime": "20090511152102", "ip_view": "", "channel_type_id": "", "ip_wan": "", "imsi": "452049760887694", "infrastructure_type": "", "product_code": "HPN03", "expire_datetime": "", "speed": "", "private_ip": "", "update_user": "MIGRATE", "ip_static": "", "vlan": "", "sub_type": "", "create_datetime": "20090511152102", "is_info_completed": "1", "pay_type": "2", "up_link": "", "promotion_code": "", "technology": "", "offer_id": "400001035", "dev_staff_id": "", "account_id": "", "deploy_accept_date": "", "serial": "8984049767000887694", "group_id": "", "ip_gateway": "", "first_connect": "", "org_product_code": "MIGRATE", "start_money": "100000", "keep_alive": "", "account": "" }' \
| kafkacat -b localhost:9092 -P -t subscriber -K:

流输出

注意第二列中的 1 - 这是键(第一列是消息的时间戳;其余列是消息中声明的字段)

Stream output

Note the 1 in the second column - this is the key (the first column is the timestamp of the message; the remainder of the columns are the declared fields in the message)

1528368781916 | 1 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

表格输出

1528368781916 | 1 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

使用 KSQL 自动更新主题

您可以使用 KSQL 对主题进行重新分区.例如,以您的源 subscriber 主题为例,这里是如何使用 KSQL 对其进行重新分区以设置密钥:

Rekeying topics automagically with KSQL

You can use KSQL to repartition topics. For example, taking your source subscriber topic, here is how to repartition it using KSQL in order to set the key:

ksql> CREATE STREAM SUBSCRIBER_KEYED AS SELECT * FROM str_subscriber_json PARTITION BY sub_id;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

这会填充一个 Kafka 主题 (SUBSCRIBER_KEYED),然后您可以在其上定义一个表:

This populates a Kafka topic (SUBSCRIBER_KEYED) on which you can then define a table:

CREATE TABLE subscriber_table (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) \
WITH (KAFKA_TOPIC='SUBSCRIBER_KEYED', VALUE_FORMAT='JSON', KEY = 'sub_id' );

现在,当您向 subscriber 发送消息时,即使没有键入,该表也可以工作:

Now when you send a message to subscriber, even if it's not keyed, the table will work:

ksql> select * from subscriber_table;
1528369407576 | 1083168000 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

这篇关于ksql,在表上选择不显示任何内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆