如何将OpenShift上的KSQLDB-Cluster连接到本地的以kerberized的Kafka-cluster [英] How to connect your KSQLDB-Cluster on OpenShift to an on-premise kerberized Kafka-cluster

查看:113
本文介绍了如何将OpenShift上的KSQLDB-Cluster连接到本地的以kerberized的Kafka-cluster的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要实现的目标:
我们有一个内部Kafka集群.我想在OpenShift中设置KSQLDB并将其连接到本地Kafka集群的代理.

What I want to achieve:
We have an on premise Kafka cluster. I want to set up KSQLDB in OpenShift and connect it to the brokers of the on premise Kafka cluster.

问题:
当我尝试使用命令"/usr/bin/ksql-server-start/etc/ksqldb/ksql-server.properties"启动KSQLDB服务器时;我收到错误消息:

The problem:
When I try to start the KSQLDB server with the command "/usr/bin/ksql-server-start /etc/ksqldb/ksql-server.properties" I get the error message:

[2020-05-14 15:47:48,519] ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:60)
io.confluent.ksql.util.KsqlServerException: Could not get Kafka cluster configuration!
        at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:90)
        at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.isKafkaAuthorizerEnabled(KsqlAuthorizationValidatorFactory.java:81)
        at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.create(KsqlAuthorizationValidatorFactory.java:51)
        at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:624)
        at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:544)
        at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:98)
        at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:56)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:60)
        ... 6 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

我的配置:
我基于以下图像设置了Dockerfile: https://hub.docker.com/r/confluentinc/ksqldb-server ,则打开端口9092、9093、8080、8082和443.

My configuration:
I set up my Dockerfile on the basis of this image: https://hub.docker.com/r/confluentinc/ksqldb-server, the ports 9092, 9093, 8080, 8082 and 443 are open.

我的service-yaml看起来像这样:

My service-yaml looks like that:

kind: Service
apiVersion: v1
metadata:
  name: social-media-dev
  namespace: abc
  selfLink: xyz
  uid: xyz
  resourceVersion: '1'
  creationTimestamp: '2020-05-14T09:47:15Z'
  labels:
    app: social-media-dev
  annotations:
    openshift.io/generated-by: OpenShiftNewApp
spec:
  ports:
    - name: social-media-dev
      protocol: TCP
      port: 9092
      targetPort: 9092
      nodePort: 31364
  selector:
    app: social-media-dev
    deploymentconfig: social-media-dev
  clusterIP: XX.XX.XXX.XXX
  type: LoadBalancer
  externalIPs:
    - XXX.XX.XXX.XXX
  sessionAffinity: None
  externalTrafficPolicy: Cluster
status:
  loadBalancer:
    ingress:
      - ip: XX.XX.XXX.XXX

我的ksql-server.properties文件包含以下信息:
听众: http://0.0.0.0:8082
引导服务器:X.X.X.X:9092,X.X.X.Y:9092,X.X.X.Z:9092

My ksql-server.properties file includes the following information:
listeners: http://0.0.0.0:8082
bootstrap.servers: X.X.X.X:9092, X.X.X.Y:9092, X.X.X.Z:9092

到目前为止,我已经尝试过:

我试图从Pod内连接到代理,并且它起作用:
(超时1 bash -c'</dev/tcp/XXXX/9092&& echo PORT OPEN || echo PORT CLOSED')2>/dev/null
结果:PORT OPEN

I tried to connect from within my pod to a broker and it worked:
(timeout 1 bash -c '</dev/tcp/X.X.X.X/9092 && echo PORT OPEN || echo PORT CLOSED') 2>/dev/null
result: PORT OPEN

我还与侦听器一起玩耍,但是错误消息越来越短,仅显示信息无法获取Kafka群集配置!".而且没有超时错误.

I also played around with the listener but then the error message got shorter just with the information "Could not get Kafka cluster configuration!" and without the timeout error.

我试图将LoadBalancer交换到Nodeport,但是也没有成功.

I tried to exchange LoadBalancer to Nodeport, but also without success.

您有什么想法我接下来可以尝试吗?

Do you have any ideas what I could try next?

更新:通过升级到Cloudera CDH6,Cloudera Kafka集群现在也可以与Kafka Streams一起使用.因此,我现在能够从Openshift的KSQLDB群集连接到本地Kafka群集.

UPDATE: With an upgrade to Cloudera CDH6, the Cloudera Kafka cluster works now also with Kafka Streams. Hence I was able to connect from my KSQLDB Cluster in Openshift to the on-premise Kafka cluster now.

推荐答案

更新:通过升级到Cloudera CDH6,Cloudera Kafka集群现在也可以与Kafka Streams一起使用.因此,我现在能够从Openshift的KSQLDB群集连接到本地Kafka群集.

UPDATE: With an upgrade to Cloudera CDH6, the Cloudera Kafka cluster works now also with Kafka Streams. Hence I was able to connect from my KSQLDB Cluster in Openshift to the on-premise Kafka cluster now.

我还将在这里描述我连接到kerberized Kafka集群的最终方法,因为我一直在努力使它运行:

I will also describe my final way of connecting to the kerberized Kafka-cluster here as I have been struggling a lot to get it running:

  1. 获取Kerberos票证并通过SSL建立连接

ksql-server.properties(其中的sasl_ssl部分):

ksql-server.properties (the sasl_ssl part of it):

security.protocol=SASL_SSL
sasl.mechanism=GSSAPI

ssl.truststore.location=truststore.jks
ssl.truststore.password=password
ssl.truststore.type=JKS

ssl.ca.location=cert

sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka"  principal="myprincipal";
serviceName="kafka"

producer.ssl.endpoint.identification.algorithm=HTTPS
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=truststore.jks
producer.ssl.truststore.password=password
producer.sasl.mechanism=GSSAPI
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka"  principal="myprincipal";

consumer.ssl.endpoint.identification.algorithm=HTTPS
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=truststore.jks
consumer.ssl.truststore.password=password
consumer.sasl.mechanism=GSSAPI
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka"  principal="myprincipal";`

  1. 因此设置哨兵规则

HOST = [HOST]-> CLUSTER = kafka-cluster-> action = idempotentwrite

HOST = [HOST]-> TRANSACTIONALID = [ID]-> action = describe

HOST = [HOST]-> TRANSACTIONALID = [ID]-> action = write

这篇关于如何将OpenShift上的KSQLDB-Cluster连接到本地的以kerberized的Kafka-cluster的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆