Kafka Broker 为生成快速压缩消息的客户端抛出错误 [英] Kafka Broker throws error for clients that produce snappy compressed messages

查看:166
本文介绍了Kafka Broker 为生成快速压缩消息的客户端抛出错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚使用 Confluent Platform 6.0.0(Apache Kafka 版本 2.6.0)完成了我的 Kafka 集群设置.Kafka 代理部署在 Kubernetes 中.只要未压缩,就可以向新主题生成消息.

I just finished my Kafka cluster setup using Confluent Platform 6.0.0 (Apache Kafka version 2.6.0). The Kafka brokers are deployed in Kubernetes. Producing messages to a new topic works fine as long as it's not compressed.

然而,我只是尝试生成一个活泼的压缩消息并返回一个错误.因此,我查看了代理日志,并在代理中看到了以下异常:

However I just tried to produce a snappy compressed message and got an error returned. Hence I looked at the broker logs and saw the following exception in the broker:

[2020-11-24 13:14:37,834] ERROR (data-plane-kafka-request-handler-0:Logging) [ReplicaManager broker=1] Error processing append operation on partition customers-2
org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
        at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:92)
        at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:261)
        at org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:340)
        at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1(LogValidator.scala:401)
        at kafka.log.LogValidator$.$anonfun$validateMessagesAndAssignOffsetsCompressed$1$adapted(LogValidator.scala:394)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
        at kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:394)
        at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
        at kafka.log.Log.$anonfun$append$2(Log.scala:1095)
        at kafka.log.Log.append(Log.scala:2340)
        at kafka.log.Log.appendAsLeader(Log.scala:1019)
        at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:984)
        at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:972)
        at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:883)
        at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
        at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
        at scala.collection.mutable.HashMap.map(HashMap.scala:34)
        at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:871)
        at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:571)
        at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:605)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
        at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:145)
        at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
        at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
        at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:90)
        ... 24 more

附加信息:在 Kubernetes 中,我配置了容器没有将文件写入本地文件系统的权限.我不确定这是否相关,但也许这是成功初始化 snappy 类所必需的?

Additional info: In Kubernetes I configured that the container has no permissions to write files to the local filesystem. I'm not sure if this is relevant, but maybe this is required for the snappy class to be initialized successfully?

为什么 Kafka 无法处理 snappy 压缩消息?

Why does Kafka fail to handle snappy compressed messages?

推荐答案

您可以通过两种不同的方式解决此问题:

You can solve this in two different ways:

  1. 设置 env-var KAFKA_OPTS='-Dorg.xerial.snappy.tempdir=/some/other/path/with/exec/permissions'.这将由 kafka-run-class.sh (& kafka-server-start) 获取,以便您的代理使用 exec 将库解压缩到合适的文件系统权限.
  2. 重建您的 docker 镜像,以便您使用 exec 权限挂载/tmp (mount -o remount,exec/tmp)
  1. Set env-var KAFKA_OPTS='-Dorg.xerial.snappy.tempdir=/some/other/path/with/exec/permissions'. This will be picked-up by kafka-run-class.sh (& kafka-server-start) so that your broker unpack the library to a suitable filesystem with exec permissions.
  2. Rebuild your docker image so that you mount the /tmp with exec permissions (mount -o remount,exec /tmp)

问题解释如下:

  1. https://issues.apache.org/jira/browse/KAFKA-8622
  2. https://docs.datastax.com/en/dse-trblshoot/doc/troubleshooting/snappytrbl.html

这篇关于Kafka Broker 为生成快速压缩消息的客户端抛出错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆