将JSON模式附加到KSQL流记录 [英] Attaching JSON schema to KSQL stream records
问题描述
我一直在使用KSQL,到目前为止,它一直运行良好.但是现在我想通过Kafka Connect将输出下沉到BigQuery,并需要附加一个JSON模式.我在弄清楚如何做到这一点时遇到了麻烦.这是我的查询:
I've been using KSQL and so far it's been working great. But now I'd like to sink the output to BigQuery via Kafka Connect, and need to attach a JSON schema. I'm having trouble figuring out how to do this. Here's my query:
CREATE STREAM tweets_original (
CreatedAt BIGINT,
Id BIGINT,
Text VARCHAR,
Source VARCHAR,
GeoLocation VARCHAR,
User STRUCT<Id BIGINT, Name VARCHAR, Description VARCHAR, ScreenName VARCHAR, URL VARCHAR, FollowersCount BIGINT, FriendsCount BIGINT>
)
WITH (kafka_topic='tweets', value_format='JSON');
CREATE STREAM tweets_new
WITH (kafka_topic='tweets-new') AS
SELECT
CreatedAt as created_at,
Id as tweet_id,
Text as tweet_text,
Source as source,
GeoLocation as geo_location,
User->Id as user_id,
User->Name as user_name,
User->Description as user_description,
User->ScreenName as user_screenname
FROM tweets_original ;
这是一个记录的示例,该记录已写入输出主题(tweets-new
).
Here's an example of a record that was written to the output topic (tweets-new
).
{
"CREATED_AT": 1535036410000,
"TWEET_ID": 1032643668614819800,
"TWEET_TEXT": "Sample text",
"SOURCE": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
"GEO_LOCATION": null,
"USER_ID": 123,
"USER_NAME": "John Smith",
"USER_DESCRIPTION": "Developer in Chief",
"USER_SCREENNAME": "newphonewhodis"
}
但是,为了让Kafka Connect将这些记录下沉到BigQuery,我需要附加一个架构,如下所示:
However, in order for Kafka Connect to sink these records to BigQuery, I need to attach a schema, like so:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "CREATED_AT"
},
{
"type": "int64",
"optional": false,
"field": "TWEET_ID"
},
{
"type": "string",
"optional": false,
"field": "TWEET_TEXT"
}
...
],
"optional": false,
"name": "foobar"
},
"payload": {...}
}
无论如何,我在文档中没有看到任何显示如何解决此问题的东西(也许我在错误的位置).任何帮助将不胜感激!
Anyways, I'm not seeing anything thing in the docs that shows how I might approach this (maybe I'm looking in the wrong place). Any help would be greatly appreciated!
推荐答案
这是KSQL的简单解决方案,只需将第二个流更新为AVRO.
This is a simple solution for KSQL, just update your 2nd stream to AVRO.
CREATE STREAM tweets_new
WITH (kafka_topic='tweets-new', value_format='AVRO') AS
SELECT
CreatedAt as created_at,
Id as tweet_id,
Text as tweet_text,
Source as source,
GeoLocation as geo_location,
User->Id as user_id,
User->Name as user_name,
User->Description as user_description,
User->ScreenName as user_screenname
FROM tweets_original ;
然后在您的Kafka Connect配置中,您可以使用AvroConvertor并允许在Google Big Query中进行模式演变/管理.
Then in your Kafka Connect configuration, you can use the AvroConvertor and allow for schema evolution/management in Google Big Query.
这篇关于将JSON模式附加到KSQL流记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!