将 JSON 模式附加到 KSQL 流记录 [英] Attaching JSON schema to KSQL stream records

查看:26
本文介绍了将 JSON 模式附加到 KSQL 流记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在使用 KSQL,到目前为止效果很好.但是现在我想通过 Kafka Connect 将输出下沉到 BigQuery,并且需要附加一个 JSON 模式.我很难弄清楚如何做到这一点.这是我的查询:

I've been using KSQL and so far it's been working great. But now I'd like to sink the output to BigQuery via Kafka Connect, and need to attach a JSON schema. I'm having trouble figuring out how to do this. Here's my query:

CREATE STREAM tweets_original (
      CreatedAt BIGINT,
      Id BIGINT,
      Text VARCHAR,
      Source VARCHAR,
      GeoLocation VARCHAR,
      User STRUCT<Id BIGINT, Name VARCHAR, Description VARCHAR, ScreenName VARCHAR, URL VARCHAR, FollowersCount BIGINT, FriendsCount BIGINT>
    )
    WITH (kafka_topic='tweets', value_format='JSON');

    CREATE STREAM tweets_new
    WITH (kafka_topic='tweets-new') AS
    SELECT
      CreatedAt as created_at,
      Id as tweet_id,
      Text as tweet_text,
      Source as source,
      GeoLocation as geo_location,
      User->Id as user_id,
      User->Name as user_name,
      User->Description as user_description,
      User->ScreenName as user_screenname
    FROM tweets_original ;

这是写入输出主题 (tweets-new) 的记录示例.

Here's an example of a record that was written to the output topic (tweets-new).

{
  "CREATED_AT": 1535036410000,
  "TWEET_ID": 1032643668614819800,
  "TWEET_TEXT": "Sample text",
  "SOURCE": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
  "GEO_LOCATION": null,
  "USER_ID": 123,
  "USER_NAME": "John Smith",
  "USER_DESCRIPTION": "Developer in Chief",
  "USER_SCREENNAME": "newphonewhodis"
}

但是,为了让 Kafka Connect 将这些记录接收到 BigQuery,我需要附加一个架构,如下所示:

However, in order for Kafka Connect to sink these records to BigQuery, I need to attach a schema, like so:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "CREATED_AT"
      },
      {
        "type": "int64",
        "optional": false,
        "field": "TWEET_ID"
      },
      {
        "type": "string",
        "optional": false,
        "field": "TWEET_TEXT"
      }
      ...
    ],
    "optional": false,
    "name": "foobar"
  },
  "payload": {...}
}

无论如何,我在文档中没有看到任何说明我可能会如何处理此问题的内容(也许我找错了地方).任何帮助将不胜感激!

Anyways, I'm not seeing anything thing in the docs that shows how I might approach this (maybe I'm looking in the wrong place). Any help would be greatly appreciated!

推荐答案

这是 KSQL 的简单解决方案,只需将您的第二个流更新为 AVRO.

This is a simple solution for KSQL, just update your 2nd stream to AVRO.

CREATE STREAM tweets_new
    WITH (kafka_topic='tweets-new', value_format='AVRO') AS
    SELECT
      CreatedAt as created_at,
      Id as tweet_id,
      Text as tweet_text,
      Source as source,
      GeoLocation as geo_location,
      User->Id as user_id,
      User->Name as user_name,
      User->Description as user_description,
      User->ScreenName as user_screenname
    FROM tweets_original ;

然后在您的 Kafka Connect 配置中,您可以使用 AvroConvertor 并允许在 Google Big Query 中进行架构演化/管理.

Then in your Kafka Connect configuration, you can use the AvroConvertor and allow for schema evolution/management in Google Big Query.

这篇关于将 JSON 模式附加到 KSQL 流记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆