将数据从Kafka主题以JSON推送到PostgreSQL [英] Push Data from Kafka Topic to PostgreSQL in JSON
问题描述
更新后出错
[2019-07-29 12:52:23,301] INFO使用SQL方言初始化书写器:PostgreSqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2019-07-29 12:52:23,303] INFO WorkerSinkTask {id = sink-postgres-0} Sink任务已完成初始化和启动(org .apache.kafka.connect.runtime.WorkerSinkTask:301)
[2019-07-29 12:52:23,367] WARN [Consumer clientId = consumer-1,groupId = connect-sink-postgres]提取元数据时出错具有相关ID 2:{kafkadad = LEADER_NOT_AVAILABLE}(org.apache.kafka.clients.NetworkClient:1023)
[2019-07-29 12:52:23,368]信息集群ID:_gRuX5-0SUu72wzy6PV0Ag(org.apache .kafka.clients.Metadata:365)
[2019-07-29 12:52:23,369] INFO [Consumer clientId = consumer-1,groupId = connect-sink-postgres]发现组协调器INTRIVMPIOT01.xpetize.local :9092(id:2147483647机架:null)(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-07-29 12:52:23,372]信息[消费者clientId = consumer-1,groupId = connect-sink-postgres]撤消先前分配的分区[](org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)
[2019-07-29 12: 52:23,373] INFO [Consumer clientId = consumer-1,groupId = connect-sink-postgres](重新)加入组(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019年-07-29 12:52:23,383] INFO [Consumer clientId = consumer-1,groupId = connect-sink-postgres](重新)加入组(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,482]信息[Consumer clientId = consumer-1,groupId = connect-sink-postgres]已成功加入第1代的组(org.apache.kafka.clients.consumer。 internals.AbstractCoordinator:455)
[2019-07-29 12:52:23,486]信息[Consumer clientId = consumer-1,groupId = connect-sink-postgres]设置新分配的分区:kafkadad-0(org。 apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-07-29 12:52:23, 501] INFO [Consumer clientId = consumer-1,groupId = connect-sink-postgres]将分区kafkadad-0的偏移量重置为偏移量0。(org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-07-29 12:52:35,338]错误WorkerSinkTask {id = sink-postgres-0}任务引发了一个未捕获且不可恢复的异常(org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect。错误.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
在org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
在org.apache.kafka.connect.runtime.WorkerSinkTask。在org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
上的org.apache.kafka.connect.runtime.WorkerSinkTask上的poll(WorkerSinkTask.java:320)
。 execute(WorkerSinkTask.java:192)
在org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
在org.apache.kafka.connect.runtime.WorkerTask。运行(WorkerTask.java:219)
at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java: 266)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)
在java.lang.Thread.run(Thread.java:748)
原因:java.lang.NullPointerException
在org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java: 701)
at org.apache.kafka.connect.json.J sonConverter.access $ 000(JsonConverter.java:61)
在org.apache.kafka.connect.json.JsonConverter $ 12.convert(JsonConverter.java:181)
在org.apache.kafka.connect。 json.JsonConverter.convertToConnect(JsonConverter.java:745)
在org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
在org.apache.kafka.connect。 runtime.WorkerSinkTask.lambda $ convertAndTransformRecord $ 1(WorkerSinkTask.java:487)在org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
在org.apache .kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
...还有13个
[2019-07-29 12:52:35,347]错误WorkerSinkTask {id = sink -postgres-0}任务被杀死,并且直到手动重新启动后才能恢复(org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-07-29 12:52:35,347]信息正在停止任务(io.confluent.connect.jdbc.sink.JdbcSinkTask:105)
[2019-07-29 12:52:35,349]信息[Consumer clientId = consumer-1,groupId = connect-sink-postgres]成员消费者-1-bdbc7035-7625-4701-9ca7-c1ffa6863456发送LeaveGroup请求到协调器INTRIVMPIOT01.xpetize.local:9092(id:2147483647 rack:null)(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)
生产者控制台:
connect-standalone.properties文件
引导程序.servers = localhost:9092
键.converter = org.apache.kafka.connect.json.JsonConverter
value.converter = org.apache.kafka.connect.json.JsonConverter
键。 converter.schemas.enable = false
value.converter.schemas.enable = true
offset.storage.file.filename = / tmp / connect.offsets
offset.flush.interval.ms = 10000
plugin.path = / home / kafka / confluent-5.2.1 / share / java
connect-post.properties文件
名称= sink-postgres
connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max = 2
topic = kafkada
connection.url = jdbc:postgresql:/ / localhost:5432 / kafkadb?
user = postgres& password = postgres
insert.mode = upsert
table.name.format = kafkatable
pk.mode = none
pk.fields = none
auto.create = true
auto.evolve = false
offset.storage.file.filename = / tmp / post-sink.offsets
当我通过apache kafka执行./bin/connect-standalone.sh config / connect-standalone.properties config.postgresql.properties时,导致上述错误。 / p>
然后,我尝试并实现了此链接中提到的流程:
但是,这里的数据是从使用avro的Python代码。但是对于我来说,我已经有来自kafka主题中传感器(JSON格式)的数据,我想发送给postgreSQL,而不是通过代码生成数据。
所以,我如何实现从kafka主题向PostgreSQL发送数据的流程。
我的属性文件请让我知道是否需要更正。
我正在发送简单的json数据,例如 {{cust_id:1313131, month:12, expenses:1313.13},我也尝试发送这种类型的数据,但仍然存在错误
样本json数据
{
schema:{
type: struct,
fields:[
{
type: int32,
optional: false,
field: customer_id
},
{
type: int32,
可选:true,
field: month
},
{
type: string,
可选:true,
field : amount_paid
}
],
可选:false,
name: msgschema
},
pa yload:{
cust_id:13,
month:12,
expenses:1313.13
}
}
,我有一个名为kafkatable的表,其表列名称为(customer_id,month,amount_paid)使用
$ b $创建b创建表kafkatable(customer_id int8,month int4,amount_paid decimal(9,2));
我通过进行以下更改来解决此错误
- insert.mode = insert
- 注释掉table.name.format = kafkatable,因为将通过自动
create - 从connection.url行的末尾删除问号来创建表。
- pk.fields不应在此保留,请确保使用列名
以避免复杂化。 - int32不支持PostgreSQL,因此当我将其更改为int8时,它可以正常工作。
- 架构和有效负载中的字段不同名称,请确保使用相同的名称。
Error after updates
[2019-07-29 12:52:23,301] INFO Initializing writer using SQL dialect: PostgreSqlDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:57)
[2019-07-29 12:52:23,303] INFO WorkerSinkTask{id=sink-postgres-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301)
[2019-07-29 12:52:23,367] WARN [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Error while fetching metadata with correlation id 2 : {kafkadad=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:1023)
[2019-07-29 12:52:23,368] INFO Cluster ID: _gRuX5-0SUu72wzy6PV0Ag (org.apache.kafka.clients.Metadata:365)
[2019-07-29 12:52:23,369] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Discovered group coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-07-29 12:52:23,372] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)
[2019-07-29 12:52:23,373] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,383] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-07-29 12:52:23,482] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455)
[2019-07-29 12:52:23,486] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Setting newly assigned partitions: kafkadad-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
[2019-07-29 12:52:23,501] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Resetting offset for partition kafkadad-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
[2019-07-29 12:52:35,338] ERROR WorkerSinkTask{id=sink-postgres-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:701)
at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:745)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
[2019-07-29 12:52:35,347] ERROR WorkerSinkTask{id=sink-postgres-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2019-07-29 12:52:35,347] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:105)
[2019-07-29 12:52:35,349] INFO [Consumer clientId=consumer-1, groupId=connect-sink-postgres] Member consumer-1-bdbc7035-7625-4701-9ca7-c1ffa6863456 sending LeaveGroup request to coordinator INTRIVMPIOT01.xpetize.local:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)
Producer console:
connect-standalone.properties file
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/home/kafka/confluent-5.2.1/share/java
connect-post.properties file
name=sink-postgres
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=2
topics=kafkada
connection.url=jdbc:postgresql://localhost:5432/kafkadb?
user=postgres&password=postgres
insert.mode=upsert
table.name.format=kafkatable
pk.mode=none
pk.fields=none
auto.create=true
auto.evolve=false
offset.storage.file.filename=/tmp/post-sink.offsets
The above error is caused when I did ./bin/connect-standalone.sh config/connect-standalone.properties config.postgresql.properties through apache kafka.
Then, I have tried and achieved the flow mentioned in this link:
But, here the data is being generated from Python code using avro. But in my case, I already have data coming from sensors(in JSON format) in kafka topic which I want to send to postgreSQL, instead of generating data through code.
So, How can I achieve this flow of sending data from kafka topic to postgreSQL.
I have shared my properties file Please let me know if corrrection is required. I am sending simple json data like "{"cust_id": 1313131, "month": 12, "expenses": 1313.13}" and I also tried sending this type of data but still error exists
sample json data
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "customer_id"
},
{
"type": "int32",
"optional": true,
"field": "month"
},
{
"type": "string",
"optional": true,
"field": "amount_paid"
}
],
"optional": false,
"name": "msgschema"
},
"payload": {
"cust_id": 13,
"month": 12,
"expenses": 1313.13
}
}
and I have a table called kafkatable which has column names as (customer_id, month, amount_paid) created using
"CREATE TABLE kafkatable( customer_id int8, month int4, amount_paid decimal(9,2) );"
I solved this error by making following changes
- insert.mode=insert
- Comment out the table.name.format=kafkatable because table will be created through auto create
- Remove the question mark from the end of connection.url line.
- pk.fields should not be kept none here, please make sure to give a column name instead to avoid complications.
- int32 is not supported by postgresql, so when I changed it to int8 it is working fine.
- The fields in your schema and payload have different names, please make sure to give same name.
这篇关于将数据从Kafka主题以JSON推送到PostgreSQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!