带有 kerberos 的 Kafka Java Producer [英] Kafka Java Producer with kerberos

查看:27
本文介绍了带有 kerberos 的 Kafka Java Producer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 kerberosed 环境中向 kafka 主题发送消息时出错.我们在 hdp 2.3 上有集群

Getting error while sending message to kafka topic in kerberosed enviornment. We have cluster on hdp 2.3

我关注了这个http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

但是对于发送消息,我必须先明确地执行 kinit,然后才能向 kafka 主题发送消息.我试图通过 java 类来编织,但这也不起作用.PFB 代码:

But for sending messages, I have to do kinit explicitly first, then only I am able to send message to kafka topic. I tried to do knit through java class but that also doesn't work. PFB code:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

    public static void main(String[] args) {

        String principalName = "ctadmin";
        String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
        boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);

        if (!authStatus) {
            System.out.println("Authntication fails, try something else  "  + authStatus);
        } else {
            System.out.println("Authntication successfull " + authStatus);
        }

        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
        System.setProperty("sun.security.krb5.debug", "true");

        try {
            long events = Long.parseLong("3");
            Random rnd = new Random();

            Properties props = new Properties();
            System.out.println("After broker list- " + args[0]);

            props.put("metadata.broker.list", args[0]);
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", "1");
            props.put("security.protocol", "PLAINTEXTSASL");

            //props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");


            System.out.println("After config prop -1");

            ProducerConfig config = new ProducerConfig(props);

            System.out.println("After config prop -2 config" + config);

            Producer<String, String> producer = new Producer<String, String>(config);

            System.out.println("After config prop -3");

            for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
                Date runtime = new Date();
                String ip = "192.168.2" + rnd.nextInt(255);
                String msg = runtime + " www.example.com, " + ip;
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg);

                System.out.println("After config prop -1 data" + data);

                producer.send(data);
            }
            producer.close();

        } catch (Throwable th) {
            th.printStackTrace();

        }
    }
}

Pom.xml :从 hortonworks 存储库下载的所有依赖项.

Pom.xml : All dependency downloaded from hortonworks repo.

        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.jasypt</groupId>
                <artifactId>jasypt-spring31</artifactId>
                <version>1.9.2</version>
                <scope>compile</scope>
            </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.1.2.3.4.0-3485</version>
            </dependency>

        </dependencies>

错误:案例1:当我指定myuser kafka_jass.conf

Error : Case1 : when I specify myuser kafka_jass.conf

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
After config prop -2 configkafka.producer.ProducerConfig@643293ae
java.lang.SecurityException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:379)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:258)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:250)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
Caused by: java.io.IOException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
        at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413)
        at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108)

MyUser_Kafka_jass.conf

MyUser_Kafka_jass.conf

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   doNotPrompt=true
   useTicketCache=true
   renewTicket=true
   principal="ctadmin/prod-dev1-dn1@PROD.COM";
   useKeyTab=true
   serviceName="kafka"
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   client=true;
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   storeKey=true
   useTicketCache=true
   serviceName="zookeeper"
   principal="ctadmin/prod-dev1-dn1@PROD.COM";
};

case2 : 当我指定 Kafkas 自己的 jaas 文件时

case2 : When I specify Kafkas own jaas file

Java config name: /etc/krb5.conf
Loaded from Java config
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner  authentication information from the user
        at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
        at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
        at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)

这工作正常,如果我在运行此应用程序之前执行 kinit,否则它将通过上述错误.我无法在我的生产环境中执行此操作,如果我们的应用程序本身有任何方法可以执行此操作,请帮助我.如果您需要更多详细信息,请告诉我.

This works fine, if I do kinit before running this app, else it will through above error. I cant do this in my production environment, if there is any way to do this by our app itself then please help me out. Please let me know if you need any more details.

谢谢:)

推荐答案

我第一次不知道犯了什么错误,下面的事情我又做了一次,它工作正常.

I don't know what mistake did first time, below things I did again, and it works fine.

首先授予对主题的所有访问权限:

First give all access to topic:

bin/kafka-acls.sh --add --allow-principals user:ctadmin --operation ALL --topic marchTesting --authorizer-properties zookeeper.connect={hostname}:2181

创建jas文件:kafka-jaas.conf

create jass file: kafka-jaas.conf

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=true
 principal="ctadmin@HSCALE.COM"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/etc/security/keytabs/ctadmin.keytab"
 client=true;
};

Java 程序:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {

    public static void main(String[] args) {
        String topic = args[0];

        Properties props = new Properties();
        props.put("metadata.broker.list", "{Hostname}:6667");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        for (int i = 0; i < 10; i++){
            producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
        }
    }
}

运行应用程序:

java -Djava.security.auth.login.config=/home/ctadmin/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true-cp kafka-testing-0.0.1-jar-with-dependencies.jar com.ct.test.kafka.KafkaProducer

java -Djava.security.auth.login.config=/home/ctadmin/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -cp kafka-testing-0.0.1-jar-with-dependencies.jar com.ct.test.kafka.KafkaProducer

这篇关于带有 kerberos 的 Kafka Java Producer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆