Flink SQL客户端连接到安全的Kafka群集 [英] Flink SQL Client connect to secured kafka cluster
本文介绍了Flink SQL客户端连接到安全的Kafka群集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我想对安全Kafka集群的Kafka主题支持的Flink SQL表执行查询。我能够以编程方式执行查询,但无法通过Flink SQL Client执行相同的操作。我不确定如何通过Flink SQL客户端传递JAAS配置(java.security.auth.login.config
)和其他系统属性。
以编程方式刷新SQL查询
private static void simpleExec_auth() {
// Create the execution environment.
final EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.withBuiltInCatalogName(
"default_catalog")
.withBuiltInDatabaseName(
"default_database")
.build();
System.setProperty("java.security.auth.login.config","client_jaas.conf");
System.setProperty("sun.security.jgss.native", "true");
System.setProperty("sun.security.jgss.lib", "/usr/libexec/libgsswrap.so");
System.setProperty("javax.security.auth.useSubjectCredsOnly","false");
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
String createQuery = "CREATE TABLE test_flink11 ( " + "`keyid` STRING, " + "`id` STRING, "
+ "`name` STRING, " + "`age` INT, " + "`color` STRING, " + "`rowtime` TIMESTAMP(3) METADATA FROM 'timestamp', " + "`proctime` AS PROCTIME(), " + "`address` STRING) " + "WITH ( "
+ "'connector' = 'kafka', "
+ "'topic' = 'test_flink10', "
+ "'scan.startup.mode' = 'latest-offset', "
+ "'properties.bootstrap.servers' = 'kafka01.nyc.com:9092', "
+ "'value.format' = 'avro-confluent', "
+ "'key.format' = 'avro-confluent', "
+ "'key.fields' = 'keyid', "
+ "'value.fields-include' = 'EXCEPT_KEY', "
+ "'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.sasl.kerberos.kinit.cmd' = '/usr/local/bin/skinit --quiet', 'properties.sasl.mechanism' = 'GSSAPI', "
+ "'key.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', "
+ "'key.avro-confluent.schema-registry.subject' = 'test_flink6', "
+ "'value.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', "
+ "'value.avro-confluent.schema-registry.subject' = 'test_flink4')";
System.out.println(createQuery);
tableEnvironment.executeSql(createQuery);
TableResult result = tableEnvironment
.executeSql("SELECT name,rowtime FROM test_flink11");
result.print();
}
这工作正常。
通过SQL客户端触发SQL查询
运行此程序时出现以下错误。
Flink SQL> CREATE TABLE test_flink11 (`keyid` STRING,`id` STRING,`name` STRING,`address` STRING,`age` INT,`color` STRING) WITH('connector' = 'kafka', 'topic' = 'test_flink10','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'kafka01.nyc.com:9092','value.format' = 'avro-confluent','key.format' = 'avro-confluent','key.fields' = 'keyid', 'value.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', 'value.avro-confluent.schema-registry.subject' = 'test_flink4', 'value.fields-include' = 'EXCEPT_KEY', 'key.avro-confluent.schema-registry.url' = 'http://kafka-schema-registry:5037', 'key.avro-confluent.schema-registry.subject' = 'test_flink6', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.sasl.kerberos.kinit.cmd' = '/usr/local/bin/skinit --quiet', 'properties.sasl.mechanism' = 'GSSAPI');
Flink SQL> select * from test_flink11;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /tmp/jaas-6309821891889949793.conf
/tmp/jaas-6309821891889949793.conf
中除以下注释外没有其他内容
# We are using this file as an workaround for the Kafka and ZK SASL implementation
# since they explicitly look for java.security.auth.login.config property
# Please do not edit/delete this file - See FLINK-3929
SQL客户端运行命令
bin/sql-client.sh embedded --jar flink-sql-connector-kafka_2.11-1.12.0.jar --jar flink-sql-avro-confluent-registry-1.12.0.jar
闪烁群集命令
bin/start-cluster.sh
如何为SQL客户端传递此java.security.auth.login.config
和其他系统属性(我在上面的Java代码片段中设置)?
推荐答案
flink-conf.yaml
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.principal: XXXXX@HADOOP.COM
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /path/to/kafka.keytab
security.kerberos.login.principal: XXXX@HADOOP.COM
security.kerberos.login.contexts: Client,KafkaClient
我还没有真正测试这个解决方案是否可行,您可以试一下,希望它能对您有所帮助。
这篇关于Flink SQL客户端连接到安全的Kafka群集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文