在ksql中读取管道分隔值 [英] Read pipe separated values in ksql
问题描述
我在POC上工作,我必须读取管道分隔的值文件,并将这些记录插入ms sql服务器.我正在使用融合5.4.1来使用 value_delimiter
创建流属性.但是它有一个例外: Delimeter仅支持DELIMITED格式
I am working on POC, I have to read pipe separated value file and insert these records into ms sql server.
I am using confluent 5.4.1 to use value_delimiter
create stream property. But its giving exception: Delimeter only supported with DELIMITED format
1.开始汇合(版本:5.4.1)::
[Dev root @ myip ~]
# confluent local start
The local commands are intended for a single-node development environment
only, NOT for production usage. https://docs.confluent.io/current/cli/index.html
Using CONFLUENT_CURRENT: /tmp/confluent.vHhSRAnj
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]
[Dev root @ myip ~]
# jps
49923 KafkaRestMain
50099 ConnectDistributed
49301 QuorumPeerMain
50805 KsqlServerMain
49414 SupportedKafka
52103 Jps
51020 ControlCenter
1741
49646 SchemaRegistryMain
[Dev root @ myip ~]
#
2.创建主题:
[Dev root @ myip ~]
# kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic SampleData
Created topic SampleData.
3.将管道分隔的数据提供给SampeData主题
[Dev root @ myip ~]
# kafka-console-producer --broker-list localhost:9092 --topic SampleData <<EOF
> this is col1|and now col2|and col 3 :)
> EOF
>>[Dev root @ myip ~]
#
4.启动KSQL:
[Dev root @ myip ~]
# ksql
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2017-2019 Confluent Inc.
CLI v5.4.1, Server v5.4.1 located at http://localhost:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
5.为现有主题声明一个模式:SampleData
ksql> CREATE STREAM sample_delimited (
> column1 varchar(1000),
> column2 varchar(1000),
> column3 varchar(1000))
> WITH (KAFKA_TOPIC='SampleData', VALUE_FORMAT='DELIMITED', VALUE_DELIMITER='|');
Message
----------------
Stream created
----------------
6.验证数据进入KSQl Stream
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM sample_delimited emit changes limit 1;
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|ROWTIME |ROWKEY |COLUMN1 |COLUMN2 |COLUMN3 |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|1584339233947 |null |this is col1 |and now col2 |and col 3 :) |
Limit Reached
Query terminated
7.编写一个新的Kafka主题:SampleDataAvro,它将所有数据从sample_delimited流序列化为Avro格式流
ksql> CREATE STREAM sample_avro WITH (KAFKA_TOPIC='SampleDataAvro', VALUE_FORMAT='AVRO') AS SELECT * FROM sample_delimited;
Delimeter only supported with DELIMITED format
ksql>
8.上面的行给出了例外::
8. Above line gives exception::
仅支持DELIMITED格式的焦距
9.加载ms sql kafka连接配置
融合本地负载测试库--d ./etc/kafka-connect-jdbc/sink-quickstart-mssql.properties
推荐答案
唯一需要指定定界符的时间是在定义从源读取的流时.em>主题.
The only time you need to specify the delimiter is when you define the stream that is reading from the source topic.
这是我的工作示例:
-
使用管道分隔的数据填充主题:
Populate a topic with pipe-delimited data:
$ kafkacat -b localhost:9092 -t SampleData -P<<EOF
this is col1|and now col2|and col 3 :)
EOF
在其上声明流
Declare a stream over it
CREATE STREAM sample_delimited (
column1 varchar(1000),
column2 varchar(1000),
column3 varchar(1000))
WITH (KAFKA_TOPIC='SampleData', VALUE_FORMAT='DELIMITED', VALUE_DELIMITER='|');
查询流以确保其正常工作
Query the stream to make sure it works
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM sample_delimited emit changes limit 1;
+----------------+--------+---------------+--------------+--------------+
|ROWTIME |ROWKEY |COLUMN1 |COLUMN2 |COLUMN3 |
+----------------+--------+---------------+--------------+--------------+
|1583933584658 |null |this is col1 |and now col2 |and col 3 :) |
Limit Reached
Query terminated
将数据重新序列化到Avro:
Reserialise the data to Avro:
CREATE STREAM sample_avro WITH (KAFKA_TOPIC='SampleDataAvro', VALUE_FORMAT='AVRO') AS SELECT * FROM sample_delimited;
转储主题的内容-请注意,它现在是Avro:
Dump the contents of the topic - note that it is now Avro:
ksql> print SampleDataAvro;
Key format: UNDEFINED
Value format: AVRO
rowtime: 3/11/20 1:33:04 PM UTC, key: <null>, value: {"COLUMN1": "this is col1", "COLUMN2": "and now col2", "COLUMN3": "and col 3 :)"}
您遇到的错误是错误#4200 的结果.您可以等待Confluent Platform的下一个发行版,或者使用已经解决了该问题的独立的ksqlDB .
The error that you're hitting is as a result of bug #4200. You can wait for the next release of Confluent Platform, or use standalone ksqlDB in which the issue is already fixed.
这里使用ksqlDB 0.7.1将数据流传输到MS SQL:
Here's using ksqlDB 0.7.1 streaming the data to MS SQL:
CREATE SINK CONNECTOR SINK_MSSQL WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:sqlserver://mssql:1433',
'connection.user' = 'sa',
'connection.password' = 'Admin123',
'topics' = 'SampleDataAvro',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'auto.create' = 'true',
'insert.mode' = 'insert'
);
现在在MS SQL中查询数据
Now query the data in MS SQL
1> Select @@version
2> go
---------------------------------------------------------------------
Microsoft SQL Server 2017 (RTM-CU17) (KB4515579) - 14.0.3238.1 (X64)
Sep 13 2019 15:49:57
Copyright (C) 2017 Microsoft Corporation
Developer Edition (64-bit) on Linux (Ubuntu 16.04.6 LTS)
(1 rows affected)
1> SELECT * FROM SampleDataAvro;
2> GO
COLUMN3 COLUMN2 COLUMN1
-------------- --------------- ------------------
and col 3 :) and now col2 this is col1
(1 rows affected)
这篇关于在ksql中读取管道分隔值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!