问题:Spark shell中的Scala代码从Hbase检索数据 [英] Issue: Scala code in Spark shell to retrieve data from Hbase

查看:121
本文介绍了问题:Spark shell中的Scala代码从Hbase检索数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们试图在Spark shell中执行一个简单的Scala代码来从Hbase中检索数据。
Hadoop环境启用了Kerberos,我们确保执行kinit。



调用Spark Shell的步骤:

  MASTER = yarn-client 

DRIVER_CLASSPATH =/ opt / cloudera / parcels / CDH / lib / hbase / lib / *
DRIVER_LIBRARY_PATH =/ opt / cloudera / parcels / CDH / lib / hadoop / lib / native

spark-shell --driver-class-path$ DRIVER_CLASSPATH--driver-library-path $ DRIVER_LIBRARY_PATH - 驱动程序 - 内存10G - 执行程序 - 内存15G - 执行程序 - 核心8 - 执行程序3 - 主程序$ MASTER

代码:

  import org.apache.hadoop.fs._ 
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org。 apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util._
import org.apache.spark._

val hc = HBaseConfiguration。创建
hc.addResource(new Path(file:/// opt / cloudera / parcels / CDH / lib / hbase / conf / hba se-site.xml))

hc.addResource(new Path(file:///opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml) )

hc.set(TableInputFormat.INPUT_TABLE,poc-customers)
val rdd = sc.newAPIHadoopRDD(hc,classOf [TableInputFormat],classOf [ImmutableBytesWritable],classOf [Result] )

rdd.count

以下是错误

  org.apache.hadoop.hbase.client.RetriesExhaustedException:无法在org.apache.hadoop.hbase处获取位置
.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:149)
at org.apache.hadoop.hbase .client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:57)
在org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
。在组织.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:293)
at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268)
at org .apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140)
at org.apache.hadoop.hbase.client.ClientScanner。< init>(ClientScanner.java:135)
在org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
在org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
。在org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
在org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:134)
。在org.apache.hadoop.hbase.mapreduce.TableInputFormatBase $ 1.initialize(TableInputFormatBase.java:200)$ b $在org.apache.spark.rdd.NewHadoopRDD b $$匿名$ 1< INIT>(NewHadoopRDD.sca la:133)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReCheckCheck(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache .spark.executor.Executor $ TaskRunner.run(Executor.scala:203)$ b $ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)$ b $在java.lang.Thread.run(Thread.java:745)
导致:java.io.IOException:无法将IO Streams设置为< management-node-server-hostname> /10.118.114.40:60020
at org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.setup IOstreams(RpcClientImpl.java:773)
位于org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.writeRequest(RpcClientImpl.java:881)
位于org.apache.hadoop.hbase.ipc。 RpcClientImpl $ Connection.tracedWriteRequest(RpcClientImpl.java:850)
处org.apache.hadoop.hbase org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1184)
。 ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)维持在org.apache.hadoop org.apache.hadoop.hbase.ipc.AbstractRpcClient $ BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)

。 hbase.protobuf.generated.ClientProtos $ ClientService $ BlockingStub.get(ClientProtos.java:31865)在org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)

。在org.apache.hadoop.hbase.client.ConnectionManager $ HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
at org.apache.hadoop.hbase.c lient.ConnectionManager $ HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
在org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299)
... 23个
导致:java.lang.RuntimeException:SASL认证失败。最可能的原因是丢失或无效的凭证。考虑'kinit'。
at org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection $ 1.run(RpcClientImpl.java:673)$ b $ at java.security.AccessController.doPrivileged(Native Method)
at javax .security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.hbase .ipc.RpcClientImpl $ Connection.handleSaslConnectionFailure(RpcClientImpl.java:631)
在org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.setupIOstreams(RpcClientImpl.java:739)
... 33更多
导致:javax.security.sasl.SaslException:GSS启动失败[由GSSException引起:未提供有效凭证(机制级别:无法找到任何Kerberos tgt)]
at com.sun.security .sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
在org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
。在org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.setupSaslConnection(RpcClientImpl.java:605)
。在org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.access $ 600(RpcClientImpl.java:154)
at org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection $ 2.run(RpcClientImpl.java:731)
at org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection $ 2.run(
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org。 apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.hbase.ipc.RpcClientImpl $ Connection.setupIOstreams(RpcClientImpl.java:728)
... 33 more
由于:GSSException:没有提供有效的凭证(机制级别:无法找到任何Kerberos tgt)
at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
A吨sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
。在sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
。在sun.security .jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
在sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
在sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl .java:179)
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
... 42 more

p

请注意:
$ b


  1. 我们可以从同一个会话中调用Hbase shell并扫描同一个表中的记录。
  2. 我们可以从同一个Spark Shell会话执行HDFS文件的字数统计。
  3. >
  4. 我们能够以本地模式执行上述代码

  5. 我们可以执行其他操作相同的火花外壳会话 -
    a。 val admin = new HBaseAdmin(hc)
    b。 print(admin.isTableAvailable(poc-customers))

寻求帮助解决此问题。

$ b当Spark驱动程序请求YARN在集群某处产生其执行者时,它使用它的本地 Kerberos TGT - 您用 kinit 创建的人员 - 进行身份验证。然后,YARN发布全局委托令牌,该令牌由所有执行者共享以访问HDFS和YARN。

唉,HBase不支持该委托令牌。每个执行者必须重新认证到ZK,然后到本地 TGT。



在完美的世界中,你只需要在spark-default.conf中插入两个属性,即 spark.yarn.principal spark.yarn .keytab (创建一个keytab来存储你的密码是你用ktutil工具做的)

唉该功能是为需要更新其HDFS委托令牌(通常每7天)的长时间运行的Streaming作业构建的,不适用于HBase初始认证。现在,针对Spark 1.6的发行说明显示了很多与YARN和Kerberos相关的错误修复,也许该功能现在可以为HBase开箱即用。但是我不会打赌的。



那么解决方法是什么?


  1. 在驱动程序运行的Java代码中,声明keytab文件必须通过 addFile()

  2. 在由Executors运行的Java代码中,在连接到HBase之前,显式创建一个Hadoop UserGroupInformation 显式地从keytab中获取自己的Kerberos TGT

请注意,当以这种方式使用时,UGI保持私有TGT - 它不会显示在缓存中,以便其他进程在同一台机器上不能重用它(另一方面,另一个进程中的 kinit 不会篡改它)。


We are trying to execute a simple Scala code in Spark shell to retrieve data from Hbase. The Hadoop environment is Kerberos enabled and we have ensured to execute kinit.

Steps to invoke Spark Shell:

MASTER=yarn-client

DRIVER_CLASSPATH="/opt/cloudera/parcels/CDH/lib/hbase/lib/*"
DRIVER_LIBRARY_PATH="/opt/cloudera/parcels/CDH/lib/hadoop/lib/native"

spark-shell --driver-class-path "$DRIVER_CLASSPATH" --driver-library-path "$DRIVER_LIBRARY_PATH" --driver-memory 10G --executor-memory 15G --executor-cores 8 --num-executors 3 --master $MASTER

Code:

import org.apache.hadoop.fs._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util._
import org.apache.spark._

val hc = HBaseConfiguration.create
hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml"))

hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml"))

hc.set(TableInputFormat.INPUT_TABLE, "poc-customers")
val rdd = sc.newAPIHadoopRDD(hc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

rdd.count

Following is the ERROR below

org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:149)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:57)
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
        at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:293)
        at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268)
        at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140)
        at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:135)
        at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:134)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase$1.initialize(TableInputFormatBase.java:200)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not set up IO Streams to <management-node-server-hostname>/10.118.114.40:60020
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:773)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1184)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
        at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)
        at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299)
        ... 23 more
Caused by: java.lang.RuntimeException: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$1.run(RpcClientImpl.java:673)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.handleSaslConnectionFailure(RpcClientImpl.java:631)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:739)
        ... 33 more
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
        at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)
        ... 33 more
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
        at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
        at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
        at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
        at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
        ... 42 more

Please note:

  1. We are able to invoke Hbase shell from the same session and scan records from the same table
  2. We are able to execute a word count for an HDFS file from the same Spark Shell session
  3. We are able to execute the above code in local mode
  4. We are able to perform other operations from the same spark-shell session like – a. val admin = new HBaseAdmin(hc) b. print(admin.isTableAvailable("poc-customers"))

Looking for help to resolve this issue.

解决方案

When the Spark "driver" requests YARN to spawn its "executors" somewhere in the cluster, it uses its local Kerberos TGT -- the one you created with kinit -- to authenticate. Then YARN issues a global delegation token that is shared by all executors to access HDFS and YARN.

Alas, HBase does not support that delegation token. Each executor must re-authenticate to ZK, then to the actual HBase RegionServer, with a local TGT.

In a perfect world, you would just need to insert two properties in "spark-default.conf" i.e. spark.yarn.principal and spark.yarn.keytab (creating a keytab to store your password is sthg you do with "ktutil" utility)

Alas, that feature was built for long-running Streaming jobs that need to renew their HDFS delegation token (every 7 days, typically), not for HBase initial authentication. Now, the Release Notes for Spark 1.6 show a lot of bug fixes related to YARN and Kerberos, maybe the feature now works out-of-the-box for HBase also. But I wouldn't bet on it.

So what is the workaround?

  1. In the Java code run by the driver, state that the keytab file must be shipped to each executor with an addFile()
  2. In the Java code run by the Executors, explicitly create a Hadoop UserGroupInformation that explicitly gets its own Kerberos TGT from the keytab, before connecting to HBase

Note that when used that way, the UGI keeps its TGT private -- it does not show in the cache, so that other processes on the same machine cannot reuse it (and on the other hand a kinit from another process will not tamper it).

这篇关于问题:Spark shell中的Scala代码从Hbase检索数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆