wso2 入站端点 - Kafka 消费者 [英] wso2 Inbound endpoint - Kafka consumer

查看:33
本文介绍了wso2 入站端点 - Kafka 消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在创建一个 Kafka 消费者.

使用这篇文章我安装了 Zooker 和 Kafka.https://dzone.com/articles/running-apache-kafka-在 Windows 操作系统上

使用此文档,我正在配置入站 Kafka 端点https://docs.wso2.com/display/EI611/Kafka+Inbound+协议

当我尝试部署入站端点时出现此错误

[2017-09-22 12:19:06,161] [] 错误 - 创建 Kafka 消费者连接器时出现 KAFKAPollingConsumer 错误[2017-09-22 12:19:08,150] [] INFO - KAFKAMessageListener 创建 Kafka 消费者连接器...

<块引用>

[2017-09-22 12:19:08,152] [] 错误 - KAFKAMessageListener 错误加载 Zookeeper 时创建 Kafka Consumer Connector.ExceptionJAAS 登录上下文客户端"org.apache.kafka.common.KafkaException:加载 Zookeeper JAAS 登录上下文客户端"时出现异常

 在 org.apache.kafka.common.security.JaasUtils.isZkSecurityEnabled(JaasUtils.java:43)在 kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:197)在 kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:142)在 kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:67)在 kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:70)在 kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:123)在 kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)在 org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAMessageListener.createKafkaConsumerConnector(KAFKAMessageListener.java:56)在 org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAPollingConsumer.poll(KAFKAPollingConsumer.java:145)在 org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAPollingConsumer.execute(KAFKAPollingConsumer.java:116)在 org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKATask.taskExecute(KAFKATask.java:48)在 org.wso2.carbon.inbound.endpoint.common.InboundTask.execute(InboundTask.java:45)在 org.wso2.carbon.mediation.ntask.NTaskAdapter.execute(NTaskAdapter.java:98)在 org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter.execute(TaskQuartzJobAdapter.java:67)在 org.quartz.core.JobRunShell.run(JobRunShell.java:213)在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)在 java.util.concurrent.FutureTask.run(FutureTask.java:266)在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在 java.lang.Thread.run(Thread.java:748)引起:java.lang.SecurityException: java.io.IOException: C:\WS02\WSO2EI~1.1\bin\..\repository\conf\identity\jaas.conf(没有那个文件或目录)在 sun.security.provider.ConfigFile$Spi.(ConfigFile.java:137)在 sun.security.provider.ConfigFile.(ConfigFile.java:102)在 sun.reflect.GeneratedConstructorAccessor74.newInstance(来源不明)在 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)在 java.lang.reflect.Constructor.newInstance(Constructor.java:423)在 java.lang.Class.newInstance(Class.java:442)在 javax.security.auth.login.Configuration$2.run(Configuration.java:255)在 javax.security.auth.login.Configuration$2.run(Configuration.java:247)在 java.security.AccessController.doPrivileged(Native Method)在 javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)在 org.apache.kafka.common.security.JaasUtils.isZkSecurityEnabled(JaasUtils.java:40)... 19 更多

<块引用>

由:java.io.IOException: C:\WS02\WSO2EI~1.1\bin..\repository\conf\identity\jaas.conf(没有这样的文件或目录)

 at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)在 sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:262)在 sun.security.provider.ConfigFile$Spi.(ConfigFile.java:135)

解决方案

如果需要配置Kafka不安全,推荐0.8.1.1版本.

从 Kafka 0.9 版本开始,他们引入了带有 SSL 和 SASL 的 Kafka 安全机制,这些机制将通过 JAAS 进行配置.因此,在使用 WSO2 产品(EI、DAS、CEP)配置 Kafka 时,您需要将此配置文件(jaas.conf)添加到/repository/conf/identity.

在这种情况下,WSO2 产品充当 Kafka 客户端,将使用 jaas.conf 中的配置登录到 kafka 服务器.请按照 Kafka 安全文档 [1] 为服务器和客户端配置安全性.

为 SASL/纯文本配置的 jaas.conf 文件的示例内容.

KafkaServer {需要 org.apache.kafka.common.security.plain.PlainLoginModule用户名 =卡夫卡"密码=卡夫卡秘密"user_kafka="kafka-secret"user_ibm="ibm-secret";};卡夫卡客户端{需要 org.apache.kafka.common.security.plain.PlainLoginModule用户名 =卡夫卡"密码=卡夫卡秘密";};

请注意,kafka 0.9 仅支持 SASL/Kerberos 身份验证.请仔细按照您的Kafka版本相关的Kafka安全文档进行正确配置.

[1] http://kafka.apache.org/090/documentation.html#security_sasl

I am creating a Kafka consumer.

Using this article I Installed Zooker and Kafka. https://dzone.com/articles/running-apache-kafka-on-windows-os

Using this documentation I am configuring the Inbound Kafka Endpoint https://docs.wso2.com/display/EI611/Kafka+Inbound+Protocol

When I try to deploy the Inbound Endpoint I am getting this error

[2017-09-22 12:19:06,161] [] ERROR - KAFKAPollingConsumer  Error in Creating Kafka Consumer Connector
[2017-09-22 12:19:08,150] []  INFO - KAFKAMessageListener Creating Kafka Consumer Connector...

[2017-09-22 12:19:08,152] [] ERROR - KAFKAMessageListener Error in Creating Kafka Consumer Connector.Exception while loading Zookeeper JAAS login context 'Client' org.apache.kafka.common.KafkaException: Exception while loading Zookeeper JAAS login context 'Client'

        at org.apache.kafka.common.security.JaasUtils.isZkSecurityEnabled(JaasUtils.java:43)
        at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:197)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67)
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:70)
        at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:123)
        at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
        at org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAMessageListener.createKafkaConsumerConnector(KAFKAMessageListener.java:56)
        at org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAPollingConsumer.poll(KAFKAPollingConsumer.java:145)
        at org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKAPollingConsumer.execute(KAFKAPollingConsumer.java:116)
        at org.wso2.carbon.inbound.endpoint.protocol.kafka.KAFKATask.taskExecute(KAFKATask.java:48)
        at org.wso2.carbon.inbound.endpoint.common.InboundTask.execute(InboundTask.java:45)
        at org.wso2.carbon.mediation.ntask.NTaskAdapter.execute(NTaskAdapter.java:98)
        at org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter.execute(TaskQuartzJobAdapter.java:67)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:213)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.SecurityException: java.io.IOException: C:\WS02\WSO2EI~1.1\bin\..\repository\conf\identity\jaas.conf (No such file or directory)
        at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
        at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
        at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at java.lang.Class.newInstance(Class.java:442)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
        at org.apache.kafka.common.security.JaasUtils.isZkSecurityEnabled(JaasUtils.java:40)
        ... 19 more

Caused by: java.io.IOException: C:\WS02\WSO2EI~1.1\bin..\repository\conf\identity\jaas.conf (No such file or directory)

        at sun.security.provider.ConfigFile$Spi.ioException(ConfigFile.java:666)
        at sun.security.provider.ConfigFile$Spi.init(ConfigFile.java:262)
        at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:135)

解决方案

If you need to configure Kafka without security, the recommonded version is 0.8.1.1.

From Kafka version 0.9 onwards, they have introduced Kafka security mechanism with SSL and SASL, which will be configured via JAAS. So, when configuring Kafka with WSO2 Products (EI, DAS, CEP) you need to add this config file (jaas.conf) to /repository/conf/identity.

In this case, WSO2 product acts as the Kafka client and the configuration in the jaas.conf will be used to login to the kafka server. Please follow the Kafka security documentation [1] for configuring security for server and client.

Example content for jaas.conf file which is configured for SASL/ Plaintext.

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kafka"
   password="kafka-secret"
   user_kafka="kafka-secret"
   user_ibm="ibm-secret";
};

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka"
  password="kafka-secret";
};

Please note that kafka 0.9 supports only SASL/Kerberos authentication. Please carefully follow the Kafka security documentation related to your Kafka version and configure it properly.

[1] http://kafka.apache.org/090/documentation.html#security_sasl

这篇关于wso2 入站端点 - Kafka 消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆