使用 strimzi 在 Openshift 上设置 Kafka [英] Setting up Kafka on Openshift with strimzi

查看:47
本文介绍了使用 strimzi 在 Openshift 上设置 Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用本指南在 Openshift 平台上设置 kafka 集群:当我的应用程序作为引导服务器运行时,我将路由输入到 my-cluster-kafka-external 引导程序.但是当我尝试向 Kafka 发送消息时,我收到了这条消息:

21:32:40.548 [http-nio-8080-exec-1] 错误 osksLoggingProducerListener () - 发送带有 key='key' 和 payload='Event(id=null,number=30446C77213B40000004tgst15, itemId=, serialNumber=0, locat...' to topic tag-topic:org.apache.kafka.common.errors.TimeoutException:60000 毫秒后无法更新元数据.

主题已成功创建,应用程序在我的计算机上使用本地 kafka 运行时运行良好.那么我做错了什么,为什么我不能访问Kafka并发送消息?

这是我在 spring-kafka 中的 kafka 生产者配置:

 @Value("${kafka.bootstrap-servers}")私有字符串引导服务器;@豆角,扁豆公共地图<字符串,对象>生产者配置(){映射<字符串,对象>props = new HashMap<>();props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "........kafka.EventSerializer");props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);返回道具;}@豆角,扁豆公共 ProducerFactory生产者工厂(){返回新的 DefaultKafkaProducerFactory<>(producerConfigs());}

我将日志记录级别设置为调试并发现:

23:59:27.412 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG OakcNetworkClient() - [Consumer clientId=consumer-1, groupId=id] 初始化与节点的连接my-cluster-kafka-bootstrap-kafka-test.........(id:-1 rack:null)用于发送元数据请求23:59:27.412 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG OakcNetworkClient () - [Consumer clientId=consumer-1, groupId=id] 开始连接到节点 my-cluster-kafka-bootstrap-kafka-test…………(id:-1 机架:空)23:59:28.010 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG OakcnSelector () - [Consumer clientId=consumer-1, groupId=id] 使用 SO_RCVBUF = 65536, SO_SNDBUF = 131072 创建套接字, SO_TIMEOUT = 0 到节点 -123:59:28.010 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] 完成与节点-1 的连接.获取 API 版本.23:59:28.010 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] 启动 API 版本从节点 -1 获取.23:59:28.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG OakcnSelector() - [Consumer clientId=consumer-1, groupId=id] 与 my-cluster-kafka-bootstrap 的连接卡夫卡测试........../52.215.40.40 断开连接java.io.EOFException: null在 org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) ~[kafka-clients-1.0.2.jar:?]在 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93) ~[kafka-clients-1.0.2.jar:?]在 org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235) ~[kafka-clients-1.0.2.jar:?]在 org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196) ~[kafka-clients-1.0.2.jar:?]在 org.apache.kafka.common.network.Selector.attemptRead(Selector.java:547) ~[kafka-clients-1.0.2.jar:?]在 org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:483) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.common.network.Selector.poll(Selector.java:412) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:221) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:153) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) [kafka-clients-1.0.2.jar:?]在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) [kafka-clients-1.0.2.jar:?]在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) [spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514) [?:?]在 java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?]在 java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]在 java.lang.Thread.run(Thread.java:844) [?:?]23:59:28.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] 节点-1 断开连接.23:59:28.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] 放弃发送元数据请求,因为没有节点可用2

这是否与代理的 connections.max.idle.ms 属性有关?这里有人遇到了类似的问题.

我尝试通过运行以下命令使用 kafka-console-producer:

bin\windows\kafka-console-producer --broker-list https://my-cluster-kafka-bootstrap-kafka-test.domain.com:443 --topic tag-topic --producer.config config/producer.properties

并在 producer.properties 中使用此配置:

compression.type=nonesecurity.protocol=SSLssl.truststore.location=C:\\Tools\\kafka_2.12-2.2.0\\config\\store.jksssl.truststore.password=密码ssl.keystore.location=C:\\Tools\\kafka_2.12-2.2.0\\config\\store.jksssl.keystore.password=密码ssl.key.password=密码

但我收到一条响应,说在进行身份验证时连接已终止:

[2019-05-21 16:15:58,444] WARN [Producer clientId=console-producer] 连接到节点 1 (my-cluster-kafka-1-kafka-test.domain.com/52.xxx.xx.40:443) 在身份验证期间终止.这可能是由于以下任何原因造成的:(1) 由于使用 1.0.0 之前的代理的无效凭据导致身份验证失败,(2) 防火墙阻止了 Kafka TLS 流量(例如,它可能只允许 HTTPS 流量),(3) 瞬态网络问题.(org.apache.kafka.clients.NetworkClient)

openshift 的证书有什么问题吗?

解决方案

只有使用 Strimzi 生成的 CA 证书才能通过 TLS 进行路由访问,您必须按照文章中的说明提取该证书.然后,您必须创建一个导入证书并将其提供给客户端应用程序的密钥库.我在你的制作人中没有看到这样的配置.

I am trying to set up a kafka cluster on the Openshift platform using this guide: https://developers.redhat.com/blog/2018/10/29/how-to-run-kafka-on-openshift-the-enterprise-kubernetes-with-amq-streams/

I have my zookeeper and kafka clusters running as shown here: and when running my application as the bootstrap-servers I input the route to the my-cluster-kafka-external bootstrap. But when I try to send a message to Kafka i get this message:

21:32:40.548 [http-nio-8080-exec-1] ERROR o.s.k.s.LoggingProducerListener () - Exception thrown when sending a message with key='key' and payload='Event(id=null, number=30446C77213B40000004tgst15, itemId=, serialNumber=0,  locat...' to topic tag-topic:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

The topic was successfully created and the application runs fine when running with local kafka on my computer. So what am I doing wrong, why can't I access Kafka and send messages?

Here is my kafka producer config in spring-kafka:

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;    

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "........kafka.EventSerializer");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        return props;
    }


    @Bean
    public ProducerFactory<String, Event> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

EDIT: I set the logging level to debug and found this:

23:59:27.412 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Initialize connection to node my-cluster-kafka-bootstrap-kafka-test............... (id: -1 rack: null) for sending metadata request
23:59:27.412 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Initiating connection to node my-cluster-kafka-bootstrap-kafka-test............ (id: -1 rack: null)
23:59:28.010 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.n.Selector () - [Consumer clientId=consumer-1, groupId=id] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
23:59:28.010 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Completed connection to node -1. Fetching API versions.
23:59:28.010 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Initiating API versions fetch from node -1.
23:59:28.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.n.Selector () - [Consumer clientId=consumer-1, groupId=id] Connection with my-cluster-kafka-bootstrap-kafka-test........../52.215.40.40 disconnected
java.io.EOFException: null
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) ~[kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93) ~[kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235) ~[kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196) ~[kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:547) ~[kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:483) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:412) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:221) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:153) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) [kafka-clients-1.0.2.jar:?]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) [spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514) [?:?]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
    at java.lang.Thread.run(Thread.java:844) [?:?]
23:59:28.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Node -1 disconnected.
23:59:28.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Give up sending metadata request since no node is available
2

Can this have something to do with the connections.max.idle.ms property of the broker? Here someone had a similar problem.

I tried using kafka-console-producer by running this command:

bin\windows\kafka-console-producer --broker-list https://my-cluster-kafka-bootstrap-kafka-test.domain.com:443 --topic tag-topic --producer.config config/producer.properties

and with this configuration in the producer.properties:

compression.type=none
security.protocol=SSL
ssl.truststore.location=C:\\Tools\\kafka_2.12-2.2.0\\config\\store.jks
ssl.truststore.password=password
ssl.keystore.location=C:\\Tools\\kafka_2.12-2.2.0\\config\\store.jks
ssl.keystore.password=password
ssl.key.password=password

but I get a response saying that the connection was terminated while authenticating:

[2019-05-21 16:15:58,444] WARN [Producer clientId=console-producer] Connection to node 1 (my-cluster-kafka-1-kafka-test.domain.com/52.xxx.xx.40:443) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)

Is there any way that the certificate from openshift is wrong?

解决方案

The access through routes is possible via TLS only using the CA certificate generated by Strimzi that you have to extract as described in the article. Then you have to create a key store importing the certificate and providing that to the client application. I don't see such configuration in your producer.

这篇关于使用 strimzi 在 Openshift 上设置 Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆