为什么Kafka jdbc将插入数据作为BLOB而不是varchar连接 [英] Why Kafka jdbc connect insert data as BLOB instead of varchar

查看:74
本文介绍了为什么Kafka jdbc将插入数据作为BLOB而不是varchar连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Java生产者在Kafka主题中插入数据.然后,我使用Kafka jdbc connect将数据插入到Oracle表中.下面是我的生产者代码.

I am using a Java producer to insert data top my Kafka topic. Then I use Kafka jdbc connect to insert data into my Oracle table. Below is my producer code.

package producer.serialized.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


public class Sender4 {

    public static void main(String[] args) {

        String flightSchema = "{\"type\":\"record\"," + "\"name\":\"Flight\","

                + "\"fields\":[{\"name\":\"flight_id\",\"type\":\"string\"},{\"name\":\"flight_to\",\"type\":\"string\"},{\"name\":\"flight_from\",\"type\":\"string\"}]}";                

        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);    
        props.put("schema.registry.url", "http://192.168.0.1:8081");            

        KafkaProducer producer = new KafkaProducer(props);    

        Schema.Parser parser = new Schema.Parser();

        Schema schema = parser.parse(flightSchema);            

        GenericRecord avroRecord = new GenericData.Record(schema);

        avroRecord.put("flight_id", "myflight");
        avroRecord.put("flight_to", "QWE");
        avroRecord.put("flight_from", "RTY");    

        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("topic9",avroRecord);

        producer.send(record);
    }
}

下面是我的Kafka connect属性

Below is my Kafka connect properties

name=test-sink-6
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=topic9
connection.url=jdbc:oracle:thin:@192.168.0.1:1521:usera
connection.user=usera
connection.password=usera
auto.create=true
table.name.format=FLIGHTS4
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://192.168.0.1:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://192.168.0.1:8081

从我的模式中,我期望插入到Oracle表中的值是varchar2.我创建了一个具有3个varchar2列的表.当我启动连接器时,什么也没插入.然后我删除了表,并在表自动创建模式打开的情况下运行了连接器.那时,创建了表并插入了值.但是问题是,列数据类型是CLOB.我希望它是varchar2,因为它使用的数据更少.

From my schema, I am expecting the values inserted to my Oracle table to be varchar2. I have created a table having 3 varchar2 columns. When i started my connector, nothing got inserted. Then i deleted the table and ran the connector with table auto create mode on. That time, the table got created and values got inserted. But the problem is, the column data type is CLOB. I want it to be varchar2 since it use less data.

为什么会这样,我该如何解决?谢谢你.

Why is this happening and how can i fix this? Thank you.

推荐答案

像卡夫卡(Kafka)的String映射到Oracle的NCLOB:

Looks like Kafka's String is mapped to Oracle's NCLOB:

<table border="1">
<tr>
<th>Schema Type</th><th>MySQL</th><th>Oracle</th><th>PostgreSQL</th><th>SQLite</th>
</tr>
<tr>
<td>INT8</td><td>TINYINT</td><td>NUMBER(3,0)</td><td>SMALLINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT16</td><td>SMALLINT</td><td>NUMBER(5,0)</td><td>SMALLINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT32</td><td>INT</td><td>NUMBER(10,0)</td><td>INT</td><td>NUMERIC</td>
</tr>
<tr>
<td>INT64</td><td>BIGINT</td><td>NUMBER(19,0)</td><td>BIGINT</td><td>NUMERIC</td>
</tr>
<tr>
<td>FLOAT32</td><td>FLOAT</td><td>BINARY_FLOAT</td><td>REAL</td><td>REAL</td>
</tr>
<tr>
<td>FLOAT64</td><td>DOUBLE</td><td>BINARY_DOUBLE</td><td>DOUBLE PRECISION</td><td>REAL</td>
</tr>
<tr>
<td>BOOLEAN</td><td>TINYINT</td><td>NUMBER(1,0)</td><td>BOOLEAN</td><td>NUMERIC</td>
</tr>
<tr>
<td>STRING</td><td>VARCHAR(256)</td><td>NCLOB</td><td>TEXT</td><td>TEXT</td>
</tr>
<tr>
<td>BYTES</td><td>VARBINARY(1024)</td><td>BLOB</td><td>BYTEA</td><td>BLOB</td>
</tr>
<tr>
<td>'Decimal'</td><td>DECIMAL(65,s)</td><td>NUMBER(*,s)</td><td>DECIMAL</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Date'</td><td>DATE</td><td>DATE</td><td>DATE</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Time'</td><td>TIME(3)</td><td>DATE</td><td>TIME</td><td>NUMERIC</td>
</tr>
<tr>
<td>'Timestamp'</td><td>TIMESTAMP(3)</td><td>TIMESTAMP</td><td>TIMESTAMP</td><td>NUMERIC</td>
</tr>
</table>

来源: https://www.ibm.com/support/knowledgecenter/zh-CN/SSPT3X_4.2.5/com.ibm.swg.im.infosphere.biginsights.admin.doc/doc/admin_kafka_jdbc_sink.html

https://docs.confluent.io/current /connect/connect-jdbc/docs/sink_connector.html

更新

OracleDialect类(

OracleDialect class (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/dialect/OracleDialect.java) has hardcoded CLOB value and simply extend it with your own class and change that mapping will not help as type of dialect is defined in static method in JdbcSinkTask (https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java)

final DbDialect dbDialect = DbDialect.fromConnectionString(config.connectionUrl);

这篇关于为什么Kafka jdbc将插入数据作为BLOB而不是varchar连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆