从Spark访问kererized远程HBASE集群 [英] Accessing a kererized remote HBASE cluster from Spark

查看:122
本文介绍了从Spark访问kererized远程HBASE集群的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用:我正在将客户端计算机上的Spark作业提交到一个远程Spark独立群集,而该作业正在尝试从单独 HBASE群集中读取数据.

I'm attempting to read data from a kerberized HBASE instance from Spark using the Hortonworks SPARK-ON-HBASE connector. My cluster configuration essentially looks like this: I am submitting my spark jobs from a client machine to a remote Spark standalone cluster, and that job is attempting to read data from a seperate HBASE cluster.

如果我通过直接在客户端上使用master = local [*]运行Spark绕过独立群集,那么只要我首先从客户端启动,我就可以访问远程HBASE群集.但是,当我将主服务器设置为具有所有其他配置的远程集群时,在org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:43)处会收到一个空指针异常(下面的完整堆栈跟踪)

If I bypass the standalone cluster by running Spark with master=local[*] directly on my client, I can access the remote HBASE cluster no problem as long as I first kinit from the client. However, when I set my master as the remote cluster with all other configs the same, I receive a null pointer exception at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:43) (full stack trace below)

任何人都完成了类似的架构,也许可以借出一些 洞察力?尽管日志中没有提到任何有关身份验证问题的信息,但我还是有一种直觉,即从非kerberized Spark群集访问HBASE时,我可能遇到身份验证问题.

Has anyone accomplished a similar architecture that can perhaps lend some insight? Despite the logs not saying anything about an authentication issue, I have a hunch that I may be having an authentication issue when accessing HBASE from the non-kerberized Spark cluster.

完整堆栈跟踪:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0: java.lang.NullPointerException
        at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:43)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
        at org.apache.spark.sql.execution.datasources.hbase.TableResource.init(HBaseResources.scala:125)
        at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.liftedTree1$1(HBaseResources.scala:57)
        at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.acquire(HBaseResources.scala:54)
        at org.apache.spark.sql.execution.datasources.hbase.TableResource.acquire(HBaseResources.scala:120)
        at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.releaseOnException(HBaseResources.scala:74)
        at org.apache.spark.sql.execution.datasources.hbase.TableResource.releaseOnException(HBaseResources.scala:120)
        at org.apache.spark.sql.execution.datasources.hbase.TableResource.getScanner(HBaseResources.scala:144)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$7.apply(HBaseTableScan.scala:267)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$7.apply(HBaseTableScan.scala:266)
        at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
        at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
        at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
        at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
        at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
        at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
        at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:43)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
        at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
        at org.apache.spark.sql.execution.datasources.hbase.TableResource.init(HBaseResources.scala:125)
        at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.liftedTree1$1(HBaseResources.scala:57)
        at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.acquire(HBaseResources.scala:54)
        at org.apache.spark.sql.execution.datasources.hbase.TableResource.acquire(HBaseResources.scala:120)
        at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.releaseOnException(HBaseResources.scala:74)
        at org.apache.spark.sql.execution.datasources.hbase.TableResource.releaseOnException(HBaseResources.scala:120)
        at org.apache.spark.sql.execution.datasources.hbase.TableResource.getScanner(HBaseResources.scala:144)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$7.apply(HBaseTableScan.scala:267)
        at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$7.apply(HBaseTableScan.scala:266)
        at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
        at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

推荐答案

我偶然发现了这种症状(但根本原因可能并不相同),并且发现了 非常肮脏的解决方法 ,您可能不想尝试.

I stumbled on that symptom (but the root cause may not be the same) and found a very dirty workaround that you may not want to try.

$$上下文$$    Cloudera 发行版,HBase 1.2.0-CDH5.7.0

$$ Context $$    Cloudera distro, HBase 1.2.0-CDH5.7.0


$$问题#1 $$    Apache/HortonWorks发行版中的某些代码清理尚未移植到Cloudera发行版中,例如
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Scan.setCaching(I)Lorg/apache/hadoop/hbase/client/Scan;


$$ Issue #1 $$    Some code clean-ups in the Apache / HortonWorks distros have not been ported to the Cloudera distro, e.g.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Scan.setCaching(I)Lorg/apache/hadoop/hbase/client/Scan;

$$解决方法#1 $$

  • Download the HBase client JARs from the Horton repo -- specifically "client", "common" and "protocol" -- for version 1.1.2 (that's the dependency shown in the POM for Spark-HBase module).
  • Add these JARs (and directory /etc/hbase/conf/) to spark.driver.extraClassPath along with Spark-HBase JAR.
  • Ship these JARs to the executors via command-line option --jars along with Spark-HBase JAR
    (and don't forget directory /etc/hbase/conf/ in spark.executor.extraClassPath if the conf is present on all YARN nodes; otherwise find a way to ship the XML to a directory in their container CLASSPATH)


$$问题#2 $$   不知何故,在YARN模式下,Spark执行程序无法正确生成传递给方法org.apache.hadoop.hbase.security.UserProvider.instantiate(Configuration)org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(Configuration, boolean, ExecutorService, User)的HBase配置,因此
java.lang.NullPointerException


$$ Issue #2 $$    Somehow, in YARN mode, the Spark executors do not generate correctly the HBase configuration that is passed to methods org.apache.hadoop.hbase.security.UserProvider.instantiate(Configuration) and org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(Configuration, boolean, ExecutorService, User), hence
java.lang.NullPointerException

$$解决方法#2 $$

  • GitHub,这两个类的分支1.1
  • 修补代码以确保每当conf参数为NULL时,它都会被对org.apache.hadoop.hbase.HBaseConfiguration.create()
  • 的调用无提示地替换.
  • 编译这两个类,并使用修补程序的版本替换相应JAR中的原始.class可执行文件
  • Download the HBase source code from GitHub, branch 1.1 for these two classes
  • Patch the code to make sure that whenever the conf argument is NULL, it is replaced silently with a call to org.apache.hadoop.hbase.HBaseConfiguration.create()
  • Compile both classes, and replace the original .class executables in the appropriate JARs with your patched versions

打补丁Spark-HBase插件肯定更有意义(请参见ray3888 的注释. com/hortonworks-spark/shc/issues/17"rel =" nofollow noreferrer>该帖子)但是Scala让我p了,所以我坚持使用普通的Java.

It would certainly make more sense to patch the Spark-HBase plug-in (cf. comment from ray3888 in that post) but Scala makes me puke so I stick to plain'old Java.

这篇关于从Spark访问kererized远程HBASE集群的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆