Kafka Connect正在发送格式错误的json [英] Kafka connect is sending a malformed json
问题描述
我正在尝试使用带有RabbitMQ连接器的kafka-connect进行概念验证.基本上,我有两个简单的spring boot应用程序;一个RabbitMQ生产者和一个Kafka消费者.使用者无法处理来自连接器的消息,因为它正在以某种方式转换我的JSON消息; RabbitMQ发送{"transaction": "PAYMENT", "amount": "$125.0"}
和kafka-connect打印X{"transaction": "PAYMENT", "amount": "$125.0"}
.请注意开头的X
.如果我添加一个字段,假设"foo": "bar"
,则该字母变为t
或其他任何内容.
I'm trying to perform a proof of concept using kafka-connect with a rabbitMQ connector. Basically, I have two simple spring boot applications; a RabbitMQ producer and a Kafka consumer. The consumer can not handle the messages from the connector because it's transforming somehow my JSON message; RabbitMQ sends {"transaction": "PAYMENT", "amount": "$125.0"}
and kafka-connect prints X{"transaction": "PAYMENT", "amount": "$125.0"}
. Please note the X
at the beginning. If I add a field, let's say "foo": "bar"
then that letter becomes a t
or whatever.
Dockerfile(连接器):
FROM confluentinc/cp-kafka-connect-base:5.3.2
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-rabbitmq:latest
请生成如下图像:docker build . -t rabbit-connector
,因此您可以在docker-compose文件中将其引用为rabbit-connector
.
Please generate the image as follows: docker build . -t rabbit-connector
, so you can reference it in the docker-compose file as rabbit-connector
.
docker-compose.yml :
version: '2'
networks:
kafka-connect-network:
driver: bridge
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.2
networks:
- kafka-connect-network
ports:
- '31000:31000'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31000
kafka:
image: confluentinc/cp-enterprise-kafka:5.3.2
networks:
- kafka-connect-network
ports:
- '9092:9092'
- '31001:31001'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31001
schema-registry:
image: confluentinc/cp-schema-registry:5.3.2
depends_on:
- zookeeper
- kafka
networks:
- kafka-connect-network
ports:
- '8081:8081'
- '31002:31002'
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_JMX_HOSTNAME: "localhost"
SCHEMA_REGISTRY_JMX_PORT: 31002
rabbitmq:
image: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
RABBITMQ_DEFAULT_VHOST: "/"
networks:
- kafka-connect-network
ports:
- '15672:15672'
- '5672:5672'
kafka-connect:
image: rabbit-connector
networks:
- kafka-connect-network
ports:
- '8083:8083'
- '31004:31004'
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "ERROR"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31004
depends_on:
- zookeeper
- kafka
- schema-registry
- rabbitmq
rest-proxy:
image: confluentinc/cp-kafka-rest:5.3.2
depends_on:
- zookeeper
- kafka
- schema-registry
networks:
- kafka-connect-network
ports:
- '8082:8082'
- '31005:31005'
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KAFKAREST_JMX_HOSTNAME: "localhost"
KAFKAREST_JMX_PORT: 31005
schema.avsc :
{
"type": "record",
"name": "CustomMessage",
"namespace": "com.poc.model",
"fields": [
{
"name": "transaction",
"type": "string"
},
{
"name": "amount",
"type": "string"
}
]
}
因此,在这里,我将StringConverter
用作密钥(说实话,我不在乎),而将AvroConverter
用作值.也许我遗漏了某些东西,或者我配置了我的kafka-connect worker.
So here I am using a StringConverter
for my key (which I don't care to be honest) and AvroConverter
for the value. Maybe I am missing something or I'm misconfiguring my kafka-connect worker.
我的连接器配置为( connector-config.json ):
My connector configuration is (connector-config.json):
{
"name" : "rabbit_to_kafka_poc",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"kafka.topic" : "spectrum-message",
"rabbitmq.queue" : "spectrum-queue",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"rabbitmq.host": "rabbitmq",
"rabbitmq.port": "5672",
"rabbitmq.virtual.host": "/"
}
}
要注册我的连接器,请执行curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector-config.json
.
To register my connector I do curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector-config.json
.
配置完所有内容后,我将运行以下命令以打印出消息:
Once I configure everything, I run the following command to print out my messages:
kafka-avro-console-consumer --bootstrap-server localhost:9092 \
--topic spectrum-message \
--from-beginning
并且JSON以字母开头,所以我的问题是为什么会这样?我认为某些消息正在编码我的消息,但是我的RabbitMQ生产者正在发送纯JSON消息.我可以通过与RabbitMQ使用者进行测试并调试我的应用程序到发出消息的位置来确认.
And the JSON starts with a letter, so my question is why is this happening? I think something is encoding my message but my rabbitMQ producer is sending a plain JSON message. I can confirm by testing with a RabbitMQ consumer and debugging my application to the point where the message is being sent out.
推荐答案
You need to use the ByteArrayConverter. It's just bytes that the connector pulls from RabbitMQ - it won't try to coerce it to a schema. Even if you serialise it to Avro, the schema is just a single field of bytes:
$ curl -s -XGET localhost:8081/subjects/rabbit-test-avro-00-value/versions/1 | jq '.'
{
"subject": "rabbit-test-avro-00-value",
"version": 1,
"id": 1,
"schema": "\"bytes\""
}
如果要使用模式将其写到Avro中的一个主题(一个好主意) ,请使用诸如Kafka Streams或ksqlDB之类的方法将流处理器应用于源代码Kafka Connect使用ByteArrayConverter写入的主题.
If you want to write it to a topic in Avro (which is a good idea) with a schema then use something like Kafka Streams or ksqlDB to do this, applying a stream processor to the source topic which Kafka Connect writes to with the ByteArrayConverter.
例如,在ksqlDB中,您可以这样做:
For example in ksqlDB you would do:
-- Inspect the topic - ksqlDB recognises the format as JSON
ksql> PRINT 'rabbit-test-00' FROM BEGINNING;
Format:JSON
{"ROWTIME":1578477403591,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0"}
{"ROWTIME":1578477598555,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0"}
-- Declare the schema
CREATE STREAM rabbit (transaction VARCHAR,
amount VARCHAR)
WITH (KAFKA_TOPIC='rabbit-test-00',
VALUE_FORMAT='JSON');
-- Reserialise to Avro
CREATE STREAM TRANSACTIONS WITH (VALUE_FORMAT='AVRO',
KAFKA_TOPIC='reserialised_data') AS
SELECT *
FROM rabbit
EMIT CHANGES;
For more details, see this blog that I've written up.
这篇关于Kafka Connect正在发送格式错误的json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!