如何在带有Kerberos的Kafka上使用Spark Streaming? [英] How to use Spark Streaming with Kafka with Kerberos?

查看:161
本文介绍了如何在带有Kerberos的Kafka上使用Spark Streaming?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在使用Kerberized Hadoop集群中的Spark Streaming应用程序使用来自Kafka的消息时遇到了一些问题.我尝试了两种方法在此处列出:

I have met some issues while trying to consume messages from Kafka with a Spark Streaming application in a Kerberized Hadoop cluster. I tried both of the two approaches listed here :

  • 基于接收者的方法:KafkaUtils.createStream
  • 直接方法(无接收者):KafkaUtils.createDirectStream
  • receiver-based approach : KafkaUtils.createStream
  • direct approach (no receivers) : KafkaUtils.createDirectStream

基于接收者的方法(KafkaUtils.createStream)抛出两种类型的异常(无论我处于本地模式(--master local[*])还是处于YARN模式(--master yarn --deploy-mode client)的不同异常:

The receiver-based approach (KafkaUtils.createStream) throws 2 types of exceptions (different exceptions whether I am in local mode (--master local[*]) or in YARN mode (--master yarn --deploy-mode client) :

  • Spark本地应用程序中的一个奇怪的kafka.common.BrokerEndPointNotAvailableException
  • Spark on YARN应用程序中的Zookeeper超时.我曾经设法完成这项工作(成功连接到Zookeeper),但是没有收到任何消息

在两种模式下(本地或YARN),直接进场(KafkaUtils.createDirectStream)都会返回无法解释的EOFException(请参见下面的详细信息).

In both modes (local or YARN), the direct approach (KafkaUtils.createDirectStream) returns an unexplained EOFException (see details below).

我的最终目标是在YARN上启动 Spark Streaming作业,所以我将Spark本地作业放在一边.

My final goal is to launch a Spark Streaming job on YARN, so I will leave the Spark local job aside.

这是我的测试环境:

  • Cloudera CDH 5.7.0
  • 火花1.6.0
  • 卡夫卡0.10.1.0

出于测试目的,我正在单节点群集(主机名= quickstart.cloudera)上工作.对于那些有兴趣重现测试的人,我正在研究基于cloudera/quickstart的自定义Docker容器(

I'm working on a single-node cluster (hostname = quickstart.cloudera) for testing purposes. For those interested to reproduce the tests, I'm working on a custom Docker container based on cloudera/quickstart (Git repo).

下面是我在spark-shell中使用的示例代码.当然,此代码在未启用Kerberos的情况下有效:Spark应用程序会接收kafka-console-producer生成的消息.

Below is my sample code I used in a spark-shell. Of course this code works when Kerberos is not enabled : messages produced by kafka-console-producer are received by the Spark application.

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import kafka.serializer.StringDecoder

val ssc = new StreamingContext(sc, Seconds(5))

val topics = Map("test-kafka" -> 1)

def readFromKafkaReceiver(): Unit = {
    val kafkaParams = Map(
        "zookeeper.connect" -> "quickstart.cloudera:2181",
        "group.id" -> "gid1",
        "client.id" -> "cid1",
        "zookeeper.session.timeout.ms" -> "5000",
        "zookeeper.connection.timeout.ms" -> "5000"
    )

    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_2)
    stream.print
}

def readFromKafkaDirectStream(): Unit = {
    val kafkaDirectParams = Map(
        "bootstrap.servers" -> "quickstart.cloudera:9092",
        "group.id" -> "gid1",
        "client.id" -> "cid1"
    )

    val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams, topics.map(_._1).toSet)
    directStream.print
}

readFromKafkaReceiver() // or readFromKafkaDirectStream()

ssc.start

Thread.sleep(20000)

ssc.stop(stopSparkContext = false, stopGracefully = true)

在启用Kerberos的情况下,此代码不起作用.我遵循了该指南:配置Kafka安全性,并创建了两个配置文件:

With Kerberos enabled, this code does not work. I followed this guide : Configuring Kafka Security, and created two configuration files :

jaas.conf:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

client.properties:

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

我可以使用以下消息生成消息:

I can produce messages with :

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-producer \
    --broker-list quickstart.cloudera:9092 \
    --topic test-kafka \
    --producer.config client.properties

但是我无法使用Spark Streaming应用程序中的那些消息.为了以yarn-client模式启动spark-shell,我刚刚创建了一个新的JAAS配置(jaas_with_zk_yarn.conf),其中有Zookeeper部分(Client),并且对keytab的引用只是文件名(然后通过--keytab选项传递keytab):

But I can't consume those messages from a Spark Streaming application. To launch spark-shell in yarn-client mode, I just created a new JAAS configuration (jaas_with_zk_yarn.conf), with a Zookeeper section (Client), and with the reference to the keytab being only the name of the file (the keytab is then passed through --keytab option) :

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

此新文件通过--files选项传递:

This new file is passed in --files option :

spark-shell --master yarn --deploy-mode client \
    --num-executors 2 \
    --files /home/simpleuser/jaas_with_zk_yarn.conf \
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
    --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
    --keytab /home/simpleuser/simpleuser.keytab \
    --principal simpleuser

我使用了与以前相同的代码,除了我添加了另外两个与consumer.properties文件内容相对应的Kafka参数:

I used the same code as previously, except that I added two other Kafka parameters, corresponding to the contents of consumer.properties file :

"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka"

一旦启动Spark Streaming上下文(无法连接到Zookeeper),

readFromKafkaReceiver()会引发以下错误:

readFromKafkaReceiver() throws the following error once Spark Streaming Context is started (cannot connect to Zookeeper) :

ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
        at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
        at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
        at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
        at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:191)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:139)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:156)
        at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)
        at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
        at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
        at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

有时与ZK的连接已建立(未达到超时),但是没有收到消息.

Sometimes the connection to ZK is established (no timeout reached), but then no messages are received.

readFromKafkaDirectStream() 在调用此方法后立即引发以下错误:

org.apache.spark.SparkException: java.io.EOFException
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.readFromKafkaDirectStream(<console>:47)

没有更多的解释,只是一个EOFException.我认为Spark和Kafka经纪人之间存在通信问题,但不再赘述.我还尝试了metadata.broker.list而不是bootstrap.servers,但是没有成功.

There is no more explanation, just an EOFException. I presume there are communication problems between Spark and Kafka broker, but no more explanations. I also tried metadata.broker.list instead of bootstrap.servers, but without success.

也许我在JAAS配置文件或Kafka参数中丢失了某些内容?也许Spark选项(extraJavaOptions)无效?我尝试了太多的可能性,但我有点迷失了.

Maybe I'm missing something in the JAAS config files, or in Kafka parameters ? Maybe the Spark options (extraJavaOptions) are invalid ? I tried so much possibilities I'm a little bit lost.

如果有人可以帮助我解决至少其中一个问题(直接方法或基于接收者的方法),我会感到很高兴.谢谢:)

I'll be glad if someone could help me to fix at least one of these problems (direct approach or receiver-based). Thanks :)

推荐答案

Spark 1.6不支持它,如Cloudera文档中所述:

It is not supported with Spark 1.6, as stated in Cloudera docs:

在开始使用Kafka 0.9 Consumer API之前,Spark Streaming无法从安全的Kafka消费

Spark Streaming cannot consume from secure Kafka till it starts using Kafka 0.9 Consumer API

https://www.cloudera.com /documentation/enterprise/release-notes/topics/cdh_rn_spark_ki.html#ki_spark_streaming_consumer_api

1.6中的火花流使用旧的使用者API,不支持安全使用.

Spark-streaming in 1.6 uses old consumer API, where secure consuming is not supported.

您可以使用Spark 2.1,它支持安全的Kafka: https://blog.cloudera.com/blog/2017/05/reading-data-securely-from-apache-kafka-to-apache-spark/

You can use Spark 2.1, which supports secure Kafka: https://blog.cloudera.com/blog/2017/05/reading-data-securely-from-apache-kafka-to-apache-spark/

这篇关于如何在带有Kerberos的Kafka上使用Spark Streaming?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆