问题:Spark shell 中的 Scala 代码从 Hbase 检索数据 [英] Issue: Scala code in Spark shell to retrieve data from Hbase

查看:24
本文介绍了问题:Spark shell 中的 Scala 代码从 Hbase 检索数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在尝试在 Spark shell 中执行一个简单的 Scala 代码以从 Hbase 检索数据.Hadoop 环境启用了 Kerberos,我们确保执行 kinit.

We are trying to execute a simple Scala code in Spark shell to retrieve data from Hbase. The Hadoop environment is Kerberos enabled and we have ensured to execute kinit.

调用 Spark Shell 的步骤:

Steps to invoke Spark Shell:

MASTER=yarn-client

DRIVER_CLASSPATH="/opt/cloudera/parcels/CDH/lib/hbase/lib/*"
DRIVER_LIBRARY_PATH="/opt/cloudera/parcels/CDH/lib/hadoop/lib/native"

spark-shell --driver-class-path "$DRIVER_CLASSPATH" --driver-library-path "$DRIVER_LIBRARY_PATH" --driver-memory 10G --executor-memory 15G --executor-cores 8 --num-executors 3 --master $MASTER

代码:

import org.apache.hadoop.fs._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util._
import org.apache.spark._

val hc = HBaseConfiguration.create
hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml"))

hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml"))

hc.set(TableInputFormat.INPUT_TABLE, "poc-customers")
val rdd = sc.newAPIHadoopRDD(hc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

rdd.count

以下是错误信息

org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:149)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:57)
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
        at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:293)
        at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268)
        at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140)
        at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:135)
        at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:134)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase$1.initialize(TableInputFormatBase.java:200)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not set up IO Streams to <management-node-server-hostname>/10.118.114.40:60020
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:773)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1184)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
        at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)
        at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299)
        ... 23 more
Caused by: java.lang.RuntimeException: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$1.run(RpcClientImpl.java:673)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.handleSaslConnectionFailure(RpcClientImpl.java:631)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:739)
        ... 33 more
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
        at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)
        ... 33 more
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
        at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
        at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
        at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
        at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
        ... 42 more

请注意:

  1. 我们能够从同一个会话中调用 Hbase shell 并扫描同一个表中的记录
  2. 我们能够从同一个 Spark Shell 会话中对 HDFS 文件执行字数统计
  3. 我们可以在本地模式下执行上述代码
  4. 我们能够从同一个 spark-shell 会话中执行其他操作,例如 –一个.val admin = 新的 HBaseAdmin(hc)湾打印(admin.isTableAvailable(poc-customers"))

寻求帮助以解决此问题.

Looking for help to resolve this issue.

推荐答案

当 Spark驱动程序"请求 YARN 在集群中的某处生成它的执行程序"时,它使用其本地 Kerberos TGT -- 您使用 kinit 创建的那个 - 进行身份验证.然后,YARN 发出一个全局委托令牌,该令牌由所有执行程序共享以访问 HDFS 和 YARN.

When the Spark "driver" requests YARN to spawn its "executors" somewhere in the cluster, it uses its local Kerberos TGT -- the one you created with kinit -- to authenticate. Then YARN issues a global delegation token that is shared by all executors to access HDFS and YARN.

唉,HBase 不支持该委托令牌.每个执行者必须重新验证到 ZK,然后到实际的 HBase RegionServer,使用 本地 TGT.

Alas, HBase does not support that delegation token. Each executor must re-authenticate to ZK, then to the actual HBase RegionServer, with a local TGT.

在完美的世界中,您只需要在spark-default.conf"中插入两个属性,即 spark.yarn.principalspark.yarn.keytab(创建一个密钥表来存储您的密码是您使用ktutil"实用程序所做的事情)

In a perfect world, you would just need to insert two properties in "spark-default.conf" i.e. spark.yarn.principal and spark.yarn.keytab (creating a keytab to store your password is sthg you do with "ktutil" utility)

唉,该功能是为需要更新其 HDFS 委托令牌(通常每 7 天)的长时间运行的流作业构建的,而不是用于 HBase 初始身份验证.现在,Spark 1.6 的发行说明显示了许多与 YARN 和 Kerberos 相关的错误修复,也许该功能现在也适用于 HBase.但我不会赌它.

Alas, that feature was built for long-running Streaming jobs that need to renew their HDFS delegation token (every 7 days, typically), not for HBase initial authentication. Now, the Release Notes for Spark 1.6 show a lot of bug fixes related to YARN and Kerberos, maybe the feature now works out-of-the-box for HBase also. But I wouldn't bet on it.

那么解决方法是什么?

  1. 在驱动程序运行的 Java 代码中,声明密钥表文件必须使用 addFile()
  2. 在由 Executor 运行的 Java 代码中,显式创建一个 Hadoop UserGroupInformation,在连接到 HBase 之前,它从密钥表中显式获取自己的 Kerberos TGT
  1. In the Java code run by the driver, state that the keytab file must be shipped to each executor with an addFile()
  2. In the Java code run by the Executors, explicitly create a Hadoop UserGroupInformation that explicitly gets its own Kerberos TGT from the keytab, before connecting to HBase

请注意,当以这种方式使用时,UGI 会将其 TGT 保密——它不会显示在缓存中,因此同一台机器上的其他进程无法重用它(另一方面是 kinit 来自另一个进程不会篡改它).

Note that when used that way, the UGI keeps its TGT private -- it does not show in the cache, so that other processes on the same machine cannot reuse it (and on the other hand a kinit from another process will not tamper it).

这篇关于问题:Spark shell 中的 Scala 代码从 Hbase 检索数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆