如何将 Spark Streaming 与 Kafka 与 Kerberos 一起使用? [英] How to use Spark Streaming with Kafka with Kerberos?

查看:57
本文介绍了如何将 Spark Streaming 与 Kafka 与 Kerberos 一起使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Kerberized Hadoop 集群中尝试使用 Spark Streaming 应用程序使用来自 Kafka 的消息时遇到了一些问题.我尝试了两种方法 此处列出:

I have met some issues while trying to consume messages from Kafka with a Spark Streaming application in a Kerberized Hadoop cluster. I tried both of the two approaches listed here :

  • 基于接收器的方法:KafkaUtils.createStream
  • 直接方法(无接收器):KafkaUtils.createDirectStream

基于接收器的方法 (KafkaUtils.createStream) 抛出 2 种类型的异常(不同的异常,无论我是否处于本地模式(--master local[*])或者在 YARN 模式下 (--master yarn --deploy-mode client) :

The receiver-based approach (KafkaUtils.createStream) throws 2 types of exceptions (different exceptions whether I am in local mode (--master local[*]) or in YARN mode (--master yarn --deploy-mode client) :

  • 一个奇怪的 kafka.common.BrokerEndPointNotAvailableException 在 Spark 本地应用程序中
  • Spark on YARN 应用程序中的 Zookeeper 超时.我曾经设法使这项工作(成功连接到 Zookeeper),但没有收到任何消息
  • a weird kafka.common.BrokerEndPointNotAvailableException in a Spark local application
  • a Zookeeper timeout in a Spark on YARN application. I once managed to make this work (connecting to Zookeeper successfully), but no messages were received

在两种模式(本地或 YARN)中,直接方法 (KafkaUtils.createDirectStream) 都会返回一个无法解释的 EOFException(请参阅下面的详细信息).

In both modes (local or YARN), the direct approach (KafkaUtils.createDirectStream) returns an unexplained EOFException (see details below).

我的最终目标是在 YARN 上启动一个 Spark Streaming 作业,所以我将把 Spark 本地作业放在一边.

My final goal is to launch a Spark Streaming job on YARN, so I will leave the Spark local job aside.

这是我的测试环境:

  • Cloudera CDH 5.7.0
  • Spark 1.6.0
  • 卡夫卡 0.10.1.0

我正在开发一个单节点集群(主机名 = quickstart.cloudera)以进行测试.对于那些有兴趣重现测试的人,我正在基于 cloudera/quickstart (Git 存储库).

I'm working on a single-node cluster (hostname = quickstart.cloudera) for testing purposes. For those interested to reproduce the tests, I'm working on a custom Docker container based on cloudera/quickstart (Git repo).

以下是我在 spark-shell 中使用的示例代码.当然,此代码在未启用 Kerberos 时有效:kafka-console-producer 生成的消息由 Spark 应用程序接收.

Below is my sample code I used in a spark-shell. Of course this code works when Kerberos is not enabled : messages produced by kafka-console-producer are received by the Spark application.

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import kafka.serializer.StringDecoder

val ssc = new StreamingContext(sc, Seconds(5))

val topics = Map("test-kafka" -> 1)

def readFromKafkaReceiver(): Unit = {
    val kafkaParams = Map(
        "zookeeper.connect" -> "quickstart.cloudera:2181",
        "group.id" -> "gid1",
        "client.id" -> "cid1",
        "zookeeper.session.timeout.ms" -> "5000",
        "zookeeper.connection.timeout.ms" -> "5000"
    )

    val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY_2)
    stream.print
}

def readFromKafkaDirectStream(): Unit = {
    val kafkaDirectParams = Map(
        "bootstrap.servers" -> "quickstart.cloudera:9092",
        "group.id" -> "gid1",
        "client.id" -> "cid1"
    )

    val directStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaDirectParams, topics.map(_._1).toSet)
    directStream.print
}

readFromKafkaReceiver() // or readFromKafkaDirectStream()

ssc.start

Thread.sleep(20000)

ssc.stop(stopSparkContext = false, stopGracefully = true)

启用 Kerberos 后,此代码不起作用.我遵循了本指南:配置 Kafka 安全性,并创建了两个配置文件:

With Kerberos enabled, this code does not work. I followed this guide : Configuring Kafka Security, and created two configuration files :

jaas.conf :

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/simpleuser/simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

client.properties :

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

我可以使用以下方式生成消息:

I can produce messages with :

export KAFKA_OPTS="-Djava.security.auth.login.config=/home/simpleuser/jaas.conf"
kafka-console-producer \
    --broker-list quickstart.cloudera:9092 \
    --topic test-kafka \
    --producer.config client.properties

但我无法使用来自 Spark Streaming 应用程序的这些消息.为了在 yarn-client 模式下启动 spark-shell,我刚刚创建了一个新的 JAAS 配置(jaas_with_zk_yarn.conf),其中包含一个 Zookeeper 部分(Client),并且对 keytab 的引用只是文件的名称(然后通过 --keytab 选项传递 keytab):

But I can't consume those messages from a Spark Streaming application. To launch spark-shell in yarn-client mode, I just created a new JAAS configuration (jaas_with_zk_yarn.conf), with a Zookeeper section (Client), and with the reference to the keytab being only the name of the file (the keytab is then passed through --keytab option) :

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="simpleuser.keytab"
principal="simpleuser@CLOUDERA";
};

这个新文件在 --files 选项中传递:

This new file is passed in --files option :

spark-shell --master yarn --deploy-mode client \
    --num-executors 2 \
    --files /home/simpleuser/jaas_with_zk_yarn.conf \
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
    --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas_with_zk_yarn.conf" \
    --keytab /home/simpleuser/simpleuser.keytab \
    --principal simpleuser

我使用了和之前一样的代码,只是我添加了另外两个Kafka参数,对应consumer.properties文件的内容:

I used the same code as previously, except that I added two other Kafka parameters, corresponding to the contents of consumer.properties file :

"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka"

readFromKafkaReceiver() 一旦 Spark Streaming Context 启动(无法连接到 Zookeeper),就会抛出以下错误:

readFromKafkaReceiver() throws the following error once Spark Streaming Context is started (cannot connect to Zookeeper) :

ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 5000
        at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:155)
        at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:129)
        at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
        at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
        at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:191)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:139)
        at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:156)
        at kafka.consumer.Consumer$.create(ConsumerConnector.scala:109)
        at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565)
        at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
        at org.apache.spark.SparkContext$$anonfun$38.apply(SparkContext.scala:2003)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

有时会建立与 ZK 的连接(未达到超时),但随后未收到任何消息.

Sometimes the connection to ZK is established (no timeout reached), but then no messages are received.

readFromKafkaDirectStream() 抛出以下错误只要调用此方法:

org.apache.spark.SparkException: java.io.EOFException
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.readFromKafkaDirectStream(<console>:47)

没有更多的解释,只是一个EOFException.我认为 Spark 和 Kafka broker 之间存在通信问题,但不再解释.我也试过 metadata.broker.list 而不是 bootstrap.servers,但没有成功.

There is no more explanation, just an EOFException. I presume there are communication problems between Spark and Kafka broker, but no more explanations. I also tried metadata.broker.list instead of bootstrap.servers, but without success.

也许我在 JAAS 配置文件或 Kafka 参数中遗漏了一些东西?也许 Spark 选项 (extraJavaOptions) 无效?我尝试了这么多可能,我有点迷失了.

Maybe I'm missing something in the JAAS config files, or in Kafka parameters ? Maybe the Spark options (extraJavaOptions) are invalid ? I tried so much possibilities I'm a little bit lost.

如果有人能帮我解决至少其中一个问题(直接方法或基于接收器),我会很高兴.谢谢:)

I'll be glad if someone could help me to fix at least one of these problems (direct approach or receiver-based). Thanks :)

推荐答案

Spark 1.6 不支持它,如 Cloudera 文档中所述:

It is not supported with Spark 1.6, as stated in Cloudera docs:

在开始使用 Kafka 0.9 Consumer API 之前,Spark Streaming 无法从安全的 Kafka 消费

Spark Streaming cannot consume from secure Kafka till it starts using Kafka 0.9 Consumer API

https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_spark_ki.html#ki_spark_streaming_consumer_api

1.6 中的 Spark-streaming 使用旧的消费者 API,不支持安全消费.

Spark-streaming in 1.6 uses old consumer API, where secure consuming is not supported.

您可以使用支持安全 Kafka 的 Spark 2.1:https://blog.cloudera.com/blog/2017/05/reading-data-securely-from-apache-kafka-to-apache-spark/

You can use Spark 2.1, which supports secure Kafka: https://blog.cloudera.com/blog/2017/05/reading-data-securely-from-apache-kafka-to-apache-spark/

这篇关于如何将 Spark Streaming 与 Kafka 与 Kerberos 一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆