Kafka Connect正在发送格式错误的json [英] Kafka connect is sending a malformed json

查看:148
本文介绍了Kafka Connect正在发送格式错误的json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用带有RabbitMQ连接器的kafka-connect进行概念验证.基本上,我有两个简单的spring boot应用程序;一个RabbitMQ生产者和一个Kafka消费者.使用者无法处理来自连接器的消息,因为它正在以某种方式转换我的JSON消息; RabbitMQ发送{"transaction": "PAYMENT", "amount": "$125.0"}和kafka-connect打印X{"transaction": "PAYMENT", "amount": "$125.0"}.请注意开头的X.如果我添加一个字段,假设"foo": "bar",则该字母变为t或其他任何内容.

I'm trying to perform a proof of concept using kafka-connect with a rabbitMQ connector. Basically, I have two simple spring boot applications; a RabbitMQ producer and a Kafka consumer. The consumer can not handle the messages from the connector because it's transforming somehow my JSON message; RabbitMQ sends {"transaction": "PAYMENT", "amount": "$125.0"} and kafka-connect prints X{"transaction": "PAYMENT", "amount": "$125.0"}. Please note the X at the beginning. If I add a field, let's say "foo": "bar" then that letter becomes a t or whatever.

Dockerfile(连接器):

FROM confluentinc/cp-kafka-connect-base:5.3.2
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-rabbitmq:latest

请生成如下图像:docker build . -t rabbit-connector,因此您可以在docker-compose文件中将其引用为rabbit-connector.

Please generate the image as follows: docker build . -t rabbit-connector, so you can reference it in the docker-compose file as rabbit-connector.

docker-compose.yml :

version: '2'

networks:
  kafka-connect-network:
    driver: bridge

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.2
    networks: 
      - kafka-connect-network
    ports:
      - '31000:31000'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      KAFKA_JMX_HOSTNAME: "localhost"
      KAFKA_JMX_PORT: 31000

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.3.2
    networks: 
      - kafka-connect-network
    ports:
      - '9092:9092'
      - '31001:31001'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_JMX_HOSTNAME: "localhost"
      KAFKA_JMX_PORT: 31001

  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.2
    depends_on:
      - zookeeper
      - kafka
    networks: 
      - kafka-connect-network
    ports:
      - '8081:8081'
      - '31002:31002'
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_JMX_HOSTNAME: "localhost"
      SCHEMA_REGISTRY_JMX_PORT: 31002

  rabbitmq:
    image: rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
      RABBITMQ_DEFAULT_VHOST: "/"
    networks: 
      - kafka-connect-network
    ports:
      - '15672:15672'
      - '5672:5672'

  kafka-connect:
    image: rabbit-connector
    networks: 
      - kafka-connect-network
    ports:
      - '8083:8083'
      - '31004:31004'
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "ERROR"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
      KAFKA_JMX_HOSTNAME: "localhost"
      KAFKA_JMX_PORT: 31004
    depends_on:
      - zookeeper
      - kafka
      - schema-registry
      - rabbitmq

  rest-proxy:
    image: confluentinc/cp-kafka-rest:5.3.2
    depends_on:
      - zookeeper
      - kafka
      - schema-registry
    networks: 
      - kafka-connect-network
    ports:
      - '8082:8082'
      - '31005:31005'
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KAFKAREST_JMX_HOSTNAME: "localhost"
      KAFKAREST_JMX_PORT: 31005

schema.avsc :

{
  "type": "record",
  "name": "CustomMessage",
  "namespace": "com.poc.model",
  "fields": [
    {
      "name": "transaction",
      "type": "string"
    },
    {
      "name": "amount",
      "type": "string"
    }
  ]
}

因此,在这里,我将StringConverter用作密钥(说实话,我不在乎),而将AvroConverter用作值.也许我遗漏了某些东西,或者我配置了我的kafka-connect worker.

So here I am using a StringConverter for my key (which I don't care to be honest) and AvroConverter for the value. Maybe I am missing something or I'm misconfiguring my kafka-connect worker.

我的连接器配置为( connector-config.json ):

My connector configuration is (connector-config.json):

 {
   "name" : "rabbit_to_kafka_poc",
   "config" : {
    "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "tasks.max" : "1",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "kafka.topic" : "spectrum-message",
    "rabbitmq.queue" : "spectrum-queue",
    "rabbitmq.username": "guest",
    "rabbitmq.password": "guest",
    "rabbitmq.host": "rabbitmq",
    "rabbitmq.port": "5672",
    "rabbitmq.virtual.host": "/"
   } 
 }

要注册我的连接器,请执行curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector-config.json.

To register my connector I do curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector-config.json.

配置完所有内容后,我将运行以下命令以打印出消息:

Once I configure everything, I run the following command to print out my messages:

kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                            --topic spectrum-message \
                            --from-beginning

并且JSON以字母开头,所以我的问题是为什么会这样?我认为某些消息正在编码我的消息,但是我的RabbitMQ生产者正在发送纯JSON消息.我可以通过与RabbitMQ使用者进行测试并调试我的应用程序到发出消息的位置来确认.

And the JSON starts with a letter, so my question is why is this happening? I think something is encoding my message but my rabbitMQ producer is sending a plain JSON message. I can confirm by testing with a RabbitMQ consumer and debugging my application to the point where the message is being sent out.

推荐答案

您需要使用

You need to use the ByteArrayConverter. It's just bytes that the connector pulls from RabbitMQ - it won't try to coerce it to a schema. Even if you serialise it to Avro, the schema is just a single field of bytes:

$ curl -s -XGET localhost:8081/subjects/rabbit-test-avro-00-value/versions/1 | jq '.'
{
  "subject": "rabbit-test-avro-00-value",
  "version": 1,
  "id": 1,
  "schema": "\"bytes\""
}

如果要使用模式将其写到Avro中的一个主题(一个好主意) ,请使用诸如Kafka Streams或ksqlDB之类的方法将流处理器应用于源代码Kafka Connect使用ByteArrayConverter写入的主题.

If you want to write it to a topic in Avro (which is a good idea) with a schema then use something like Kafka Streams or ksqlDB to do this, applying a stream processor to the source topic which Kafka Connect writes to with the ByteArrayConverter.

例如,在ksqlDB中,您可以这样做:

For example in ksqlDB you would do:

-- Inspect the topic - ksqlDB recognises the format as JSON

ksql> PRINT 'rabbit-test-00' FROM BEGINNING;
Format:JSON
{"ROWTIME":1578477403591,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0"}
{"ROWTIME":1578477598555,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0"}

-- Declare the schema
CREATE STREAM rabbit (transaction VARCHAR,
                      amount VARCHAR)
  WITH (KAFKA_TOPIC='rabbit-test-00',
        VALUE_FORMAT='JSON');

-- Reserialise to Avro
CREATE STREAM TRANSACTIONS WITH (VALUE_FORMAT='AVRO', 
                                 KAFKA_TOPIC='reserialised_data') AS
  SELECT *
    FROM rabbit
    EMIT CHANGES;

有关更多详细信息,请参见

For more details, see this blog that I've written up.

这篇关于Kafka Connect正在发送格式错误的json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆