将数据从Kafka主题以JSON推送到PostgreSQL [英] Push Data from Kafka Topic to PostgreSQL in JSON

查看:124
本文介绍了将数据从Kafka主题以JSON推送到PostgreSQL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

更新后出错

  [2019-07-29 12:52:23,301] INFO使用SQL方言初始化书写器:PostgreSqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2019-07-29 12:52:23,303] INFO WorkerSinkTask {id = sink-postgres-0} Sink任务已完成初始化和启动(org .apache.kafka.connect.runtime.WorkerSinkTask:301)
[2019-07-29 12:52:23,367] WARN [Consumer clientId = consumer-1,groupId = connect-sink-postgres]提取元数据时出错具有相关ID 2:{kafkadad = LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient:1023)
[2019-07-29 12:52:23,368]信息集群ID:_gRuX5-0SUu72wzy6PV0Ag(org.apache .kafka.clients.Metadata:365)
[2019-07-29 12:52:23,369] INFO [Consumer clientId = consumer-1,groupId = connect-sink-postgres]发现组协调器INTRIVMPIOT01.xpetize.local :9092(id:2147483647机架:null)(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-07-29 12:52:23,372]信息[消费者clientId = consumer-1,groupId = connect-sink-postgres]撤消先前分配的分区[](org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)
[2019-07-29 12: 52:23,373] INFO [Consumer clientId = consumer-1,groupId = connect-sink-postgres](重新)加入组(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019年-07-29 12:52:23,383] INFO [Consumer clientId = consumer-1,groupId = connect-sink-postgres](重新)加入组(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,482]信息[Consumer clientId = consumer-1,groupId = connect-sink-postgres]已成功加入第1代的组(org.apache.kafka.clients.consumer。 internals.AbstractCoordinator:455)
[2019-07-29 12:52:23,486]信息[Consumer clientId = consumer-1,groupId = connect-sink-postgres]设置新分配的分区:kafkadad-0(org。 apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-07-29 12:52:23, 501] INFO [Consumer clientId = consumer-1,groupId = connect-sink-postgres]将分区kafkadad-0的偏移量重置为偏移量0。(org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-07-29 12:52:35,338]错误WorkerSinkTask {id = sink-postgres-0}任务引发了一个未捕获且不可恢复的异常(org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect。错误.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
在org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
在org.apache.kafka.connect.runtime.WorkerSinkTask。在org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
上的org.apache.kafka.connect.runtime.WorkerSinkTask上的poll(WorkerSinkTask.java:320)
。 execute(WorkerSinkTask.java:192)
在org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
在org.apache.kafka.connect.runtime.WorkerTask。运行(WorkerTask.java:219)
at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java: 266)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)
在java.lang.Thread.run(Thread.java:748)
原因:java.lang.NullPointerException
在org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java: 701)
at org.apache.kafka.connect.json.J sonConverter.access $ 000(JsonConverter.java:61)
在org.apache.kafka.connect.json.JsonConverter $ 12.convert(JsonConverter.java:181)
在org.apache.kafka.connect。 json.JsonConverter.convertToConnect(JsonConverter.java:745)
在org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
在org.apache.kafka.connect。 runtime.WorkerSinkTask.lambda $ convertAndTransformRecord $ 1(WorkerSinkTask.java:487)在org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
在org.apache .kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
...还有13个
[2019-07-29 12:52:35,347]错误WorkerSinkTask {id = sink -postgres-0}任务被杀死,并且直到手动重新启动后才能恢复(org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-07-29 12:52:35,347]信息正在停止任务(io.confluent.connect.jdbc.sink.JdbcSinkTask:105)
[2019-07-29 12:52:35,349]信息[Consumer clientId = consumer-1,groupId = connect-sink-postgres]成员消费者-1-bdbc7035-7625-4701-9ca7-c1ffa6863456发送LeaveGroup请求到协调器INTRIVMPIOT01.xpetize.local:9092(id:2147483647 rack:null)(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)

生产者控制台:



connect-standalone.properties文件

 引导程序.servers = localhost:9092 
键.converter = org.apache.kafka.connect.json.JsonConverter
value.converter = org.apache.kafka.connect.json.JsonConverter
键。 converter.schemas.enable = false
value.converter.schemas.enable = true

offset.storage.file.filename = / tmp / connect.offsets
offset.flush.interval.ms = 10000
plugin.path = / home / kafka / confluent-5.2.1 / share / java

connect-post.properties文件

 名称= sink-postgres 
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max = 2
topic = kafkada
connection.url = jdbc:postgresql:/ / localhost:5432 / kafkadb?
user = postgres& password = postgres
insert.mode = upsert
table.name.format = kafkatable
pk.mode = none
pk.fields = none
auto.create = true
auto.evolve = false
offset.storage.file.filename = / tmp / post-sink.offsets

当我通过apache kafka执行./bin/connect-standalone.sh config / connect-standalone.properties config.postgresql.properties时,导致上述错误。 / p>

然后,我尝试并实现了此链接中提到的流程:



https://hellokoding.com/kafka-connect -sinks-data-to-postgres-example-with-avro-schema-registry-and-python



但是,这里的数据是从使用avro的Python代码。但是对于我来说,我已经有来自kafka主题中传感器(JSON格式)的数据,我想发送给postgreSQL,而不是通过代码生成数据。



所以,我如何实现从kafka主题向PostgreSQL发送数据的流程。



我的属性文件请让我知道是否需要更正。
我正在发送简单的json数据,例如 {{cust_id:1313131, month:12, expenses:1313.13},我也尝试发送这种类型的数据,但仍然存在错误



样本json数据

  {
schema:{
type: struct,
fields:[
{
type: int32,
optional: false,
field: customer_id
},
{
type: int32,
可选:true,
field: month
},

{
type: string,
可选:true,
field : amount_paid
}
],
可选:false,
name: msgschema
},
pa yload:{
cust_id:13,
month:12,
expenses:1313.13
}
}

,我有一个名为kafkatable的表,其表列名称为(customer_id,month,amount_paid)使用

$ b $创建b

创建表kafkatable(customer_id int8,month int4,amount_paid decimal(9,2));

解决方案

我通过进行以下更改来解决此错误


  1. insert.mode = insert

  2. 注释掉table.name.format = kafkatable,因为将通过自动
    create

  3. 从connection.url行的末尾删除问号来创建表。

  4. pk.fields不应在此保留,请确保使用列名
    以避免复杂化。

  5. int32不支持PostgreSQL,因此当我将其更改为int8时,它可以正常工作。

  6. 架构和有效负载中的字段不同名称,请确保使用相同的名称。


Error after updates

[2019-07-29 12:52:23,301] INFO Initializing writer using SQL dialect: PostgreSqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2019-07-29 12:52:23,303] INFO WorkerSinkTask{id=sink-postgres-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301)
[2019-07-29 12:52:23,367] WARN [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Error while fetching metadata with correlation id 2 : {kafkadad=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:1023)
[2019-07-29 12:52:23,368] INFO Cluster ID: _gRuX5-0SUu72wzy6PV0Ag (org.apache.kafka.clients.Metadata:365)
[2019-07-29 12:52:23,369] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Discovered group coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-07-29 12:52:23,372] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)
[2019-07-29 12:52:23,373] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,383] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,482] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455)
[2019-07-29 12:52:23,486] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Setting newly assigned partitions: kafkadad-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-07-29 12:52:23,501] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Resetting offset for partition kafkadad-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-07-29 12:52:35,338] ERROR WorkerSinkTask{id=sink-postgres-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:701)
        at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
        at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:745)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
[2019-07-29 12:52:35,347] ERROR WorkerSinkTask{id=sink-postgres-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-07-29 12:52:35,347] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:105)
[2019-07-29 12:52:35,349] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Member consumer-1-bdbc7035-7625-4701-9ca7-c1ffa6863456 sending LeaveGroup request to coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)

Producer console:

connect-standalone.properties file

bootstrap.servers=localhost:9092 
key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 
key.converter.schemas.enable=false 
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets 
offset.flush.interval.ms=10000
plugin.path=/home/kafka/confluent-5.2.1/share/java

connect-post.properties file

name=sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
topics=kafkada
connection.url=jdbc:postgresql://localhost:5432/kafkadb?
user=postgres&password=postgres
insert.mode=upsert
table.name.format=kafkatable
pk.mode=none
pk.fields=none
auto.create=true 
auto.evolve=false
offset.storage.file.filename=/tmp/post-sink.offsets

The above error is caused when I did ./bin/connect-standalone.sh config/connect-standalone.properties config.postgresql.properties through apache kafka.

Then, I have tried and achieved the flow mentioned in this link:

https://hellokoding.com/kafka-connect-sinks-data-to-postgres-example-with-avro-schema-registry-and-python

But, here the data is being generated from Python code using avro. But in my case, I already have data coming from sensors(in JSON format) in kafka topic which I want to send to postgreSQL, instead of generating data through code.

So, How can I achieve this flow of sending data from kafka topic to postgreSQL.

I have shared my properties file Please let me know if corrrection is required. I am sending simple json data like "{"cust_id": 1313131, "month": 12, "expenses": 1313.13}" and I also tried sending this type of data but still error exists

sample json data

 {
        "schema": {
            "type": "struct",
            "fields": [
                {
                    "type": "int32",
                    "optional": false,
                    "field": "customer_id"
                },
                {
                    "type": "int32",
                    "optional": true,
                    "field": "month"
                },

                {
                    "type": "string",
                    "optional": true,
                    "field": "amount_paid"
                }
            ],
            "optional": false,
            "name": "msgschema"
        },
        "payload": {
           "cust_id": 13, 
           "month": 12, 
           "expenses": 1313.13
        }
    }

and I have a table called kafkatable which has column names as (customer_id, month, amount_paid) created using

"CREATE TABLE kafkatable( customer_id int8, month int4, amount_paid decimal(9,2) );"

解决方案

I solved this error by making following changes

  1. insert.mode=insert
  2. Comment out the table.name.format=kafkatable because table will be created through auto create
  3. Remove the question mark from the end of connection.url line.
  4. pk.fields should not be kept none here, please make sure to give a column name instead to avoid complications.
  5. int32 is not supported by postgresql, so when I changed it to int8 it is working fine.
  6. The fields in your schema and payload have different names, please make sure to give same name.

这篇关于将数据从Kafka主题以JSON推送到PostgreSQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆