春季靴子Kafka,生产者抛出了key ='null'异常 [英] Spring Boot & Kafka, Producer thrown exception with key='null'

查看:146
本文介绍了春季靴子Kafka,生产者抛出了key ='null'异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将 Spring Boot Kafka ZooKeeper Docker

docker-compose.yml:

version: '2'

services:

zookeeper:
 image: wurstmeister/zookeeper
 restart: always
 ports:
   - "2181:2181"

kafka:
 image: wurstmeister/kafka
 restart: always
 ports:
   - "9092:9092"
 environment:
   KAFKA_ADVERTISED_HOST_NAME: localhost
   KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

docker ps 输出:

CONTAINER ID        IMAGE                    COMMAND                            CREATED             STATUS              PORTS                                                        NAMES
980e6b09f4e3        wurstmeister/kafka       "start-kafka.sh"         29 minutes ago      Up 29 minutes       0.0.0.0:9092->9092/tcp                               samplespringkafkaproducerconsumermaster_kafka_1
64519d4808aa        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   2 hours ago         Up 29 minutes       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   samplespringkafkaproducerconsumermaster_zookeeper_1

docker-compose up 输出日志:

kafka_1      | [2018-01-12 13:14:49,545] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,546] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,546] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,547] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,547] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,548] INFO Client environment:os.version=4.9.60-linuxkit-aufs (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,548] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,549] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,549] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,552] INFO Initiating client connection, connectString=zookeeper:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1534f01b (org.apache.zookeeper.ZooKeeper)
kafka_1      | [2018-01-12 13:14:49,574] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
kafka_1      | [2018-01-12 13:14:49,578] INFO Opening socket connection to server samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
zookeeper_1  | 2018-01-12 13:14:49,591 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /192.168.32.3:51466
kafka_1      | [2018-01-12 13:14:49,593] INFO Socket connection established to samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181, initiating session (org.apache.zookeeper.ClientCnxn)
zookeeper_1  | 2018-01-12 13:14:49,600 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@928] - Client attempting to establish new session at /192.168.32.3:51466
zookeeper_1  | 2018-01-12 13:14:49,603 [myid:] - INFO  [SyncThread:0:FileTxnLog@203] - Creating new log file: log.fd
zookeeper_1  | 2018-01-12 13:14:49,613 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@673] - Established session 0x160ea8232b00000 with negotiated timeout 6000 for client /192.168.32.3:51466
kafka_1      | [2018-01-12 13:14:49,616] INFO Session establishment complete on server samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181, sessionid = 0x160ea8232b00000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
kafka_1      | [2018-01-12 13:14:49,619] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
kafka_1      | [2018-01-12 13:14:49,992] INFO Cluster ID = Fgy9ybPPQQ-QdLINzHpmVA (kafka.server.KafkaServer)
kafka_1      | [2018-01-12 13:14:50,003] WARN No meta.properties file under dir /kafka/kafka-logs-980e6b09f4e3/meta.properties (kafka.server.BrokerMetadataCheckpoint)
kafka_1      | [2018-01-12 13:14:50,065] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
kafka_1      | [2018-01-12 13:14:50,065] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
kafka_1      | [2018-01-12 13:14:50,067] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
kafka_1      | [2018-01-12 13:14:50,167] INFO Log directory '/kafka/kafka-logs-980e6b09f4e3' not found, creating it. (kafka.log.LogManager)
kafka_1      | [2018-01-12 13:14:50,183] INFO Loading logs. (kafka.log.LogManager)
kafka_1      | [2018-01-12 13:14:50,199] INFO Logs loading complete in 15 ms. (kafka.log.LogManager)
kafka_1      | [2018-01-12 13:14:50,283] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
kafka_1      | [2018-01-12 13:14:50,291] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
kafka_1      | [2018-01-12 13:14:50,633] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
kafka_1      | [2018-01-12 13:14:50,639] INFO [SocketServer brokerId=1005] Started 1 acceptor threads (kafka.network.SocketServer)
kafka_1      | [2018-01-12 13:14:50,673] INFO [ExpirationReaper-1005-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,674] INFO [ExpirationReaper-1005-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,675] INFO [ExpirationReaper-1005-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,691] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
kafka_1      | [2018-01-12 13:14:50,753] INFO [ExpirationReaper-1005-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,757] INFO [ExpirationReaper-1005-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,762] INFO [ExpirationReaper-1005-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
kafka_1      | [2018-01-12 13:14:50,777] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
kafka_1      | [2018-01-12 13:14:50,791] INFO [GroupCoordinator 1005]: Starting up. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2018-01-12 13:14:50,791] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
kafka_1      | [2018-01-12 13:14:50,793] INFO [GroupCoordinator 1005]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2018-01-12 13:14:50,798] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
kafka_1      | [2018-01-12 13:14:50,811] INFO [ProducerId Manager 1005]: Acquired new producerId block (brokerId:1005,blockStartProducerId:5000,blockEndProducerId:5999) by writing to Zk with path version 6 (kafka.coordinator.transaction.ProducerIdManager)
kafka_1      | [2018-01-12 13:14:50,848] INFO [TransactionCoordinator id=1005] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1      | [2018-01-12 13:14:50,850] INFO [Transaction Marker Channel Manager 1005]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
kafka_1      | [2018-01-12 13:14:50,850] INFO [TransactionCoordinator id=1005] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
kafka_1      | [2018-01-12 13:14:50,949] INFO Creating /brokers/ids/1005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
zookeeper_1  | 2018-01-12 13:14:50,952 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:create cxid:0x70 zxid:0x102 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers
zookeeper_1  | 2018-01-12 13:14:50,952 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:create cxid:0x71 zxid:0x103 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids
kafka_1      | [2018-01-12 13:14:50,957] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
kafka_1      | [2018-01-12 13:14:50,959] INFO Registered broker 1005 at path /brokers/ids/1005 with addresses: EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
kafka_1      | [2018-01-12 13:14:50,961] WARN No meta.properties file under dir /kafka/kafka-logs-980e6b09f4e3/meta.properties (kafka.server.BrokerMetadataCheckpoint)
kafka_1      | [2018-01-12 13:14:50,992] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2018-01-12 13:14:50,993] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2018-01-12 13:14:51,004] INFO [KafkaServer id=1005] started (kafka.server.KafkaServer)
zookeeper_1  | 2018-01-12 13:14:51,263 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:delete cxid:0xe3 zxid:0x105 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka_1      | [2018-01-12 13:24:50,793] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
kafka_1      | [2018-01-12 13:34:50,795] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

Kafka Producer Consumer 中的maven依赖项:

Kafka maven dependency in Producer and Consumer:

 <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
     <version>1.5.9.RELEASE</version>
     <relativePath/>
 </parent>

 <dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
 </dependency>

application.properties c>生产者:

application.properties in Producer:

 spring.kafka.producer.bootstrap-servers=0.0.0.0:9092

 spring.kafka.consumer.topic=kafka_topic
 server.port=8080

application.properties Consumer 中:

 spring.kafka.consumer.bootstrap-servers=0.0.0.0:9092
 spring.kafka.consumer.group-id=WorkUnitApp

 spring.kafka.consumer.topic=kafka_topic
 server.port=8081

消费者

@Component 
public class Consumer {

private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

@KafkaListener(topics = "${spring.kafka.consumer.topic}")
 public void receive(ConsumerRecord<?, ?> consumerRecord) {
 LOGGER.info("received payload='{}'", consumerRecord.toString());
 }
}

制作人

@Component
public class Producer {

private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void send(String topic, String payload) {
 LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
 kafkaTemplate.send(topic, payload);
 }
}

ConsumerConfig 日志:

2018-01-12 15:25:48.220  INFO 20919 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [0.0.0.0:9092]
check.crcs = true
client.id = consumer-1
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = WorkUnitApp
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

ProducerConfig 日志:

2018-01-12 15:26:27.956  INFO 20924 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
acks = 1
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [0.0.0.0:9092]
buffer.memory = 33554432
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.StringSerializer

当我尝试发送消息时,我得到一个例外:

When i try to send a message I get an exception:

producer.send( kafka_topic, test )

例外日志:

2018-01-12 15:26:27.975  INFO 20924 --- [nio-8080-exec-1]    o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
2018-01-12 15:26:27.975  INFO 20924 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
2018-01-12 15:26:58.152 ERROR 20924 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='test' to topic kafka_topic:

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kafka_topic-0 due to 30033 ms has passed since batch creation plus linger time

如何修复它?

推荐答案

问题不在于发送密钥 null ,可能无法建立与代理的连接

Problem is not with sending key as null, Connection to a broker may not be established


  • 尝试使用本地Kafka安装。

  • try using local Kafka installation.

如果您使用Mac Docker for mac具有某些联网功能
限制
https://docs.docker.com/docker-for -mac / networking /#known-limitations-use-cases-workarounds

If you are using mac Docker for mac having some networking limitations https://docs.docker.com/docker-for-mac/networking/#known-limitations-use-cases-and-workarounds

这篇关于春季靴子Kafka,生产者抛出了key ='null'异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆