使用火花将RDD保存到hbase时,输出目录未设置异常 [英] Output directory not set exception when save RDD to hbase with spark

查看:50
本文介绍了使用火花将RDD保存到hbase时,输出目录未设置异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一项工作,可以用spark作为rdd从hbase检索数据,并进行过滤,然后将其作为示例数据保存回基础,如下所示:

I have a job to retrieve data from hbase with spark as rdd and do a filter then save it back to the base as sample data like this:

object FilterData {
  def main(args: Array[String]) {
    filterData()
  }

  def filterData() = {
    val sparkConf = new SparkConf().setAppName("filterData").setMaster("spark://spark:7077")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "172.16.1.10,172.16.1.11,172.16.1.12")
    conf.setInt("timeout", 120000)
    conf.set(TableInputFormat.INPUT_TABLE, "dataset")
    val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    val filtered = hbaseRDD.filter{
        case tuple => {
         val result = tuple._2
         val hostId = new String(result.getValue("user", "id"))
         hostId == "12345" // <-- only retrieve the row when user:id is 12345
        }
    }
    val jobConfig: JobConf = new JobConf(conf, this.getClass)
    jobConfig.setOutputFormat(classOf[TableOutputFormat])
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE,  "sample_data")
    filtered.saveAsNewAPIHadoopDataset(jobConfig)
  }
}

我已经使用hbase shell在hbase中创建了一个表,并指定了Family列

I have already create a table in hbase with hbase shell and appointed the column Family

但是运行此代码时,会出现以下异常:

But when run this code, it gives this exception:

Exception in thread "main" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:138)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1088)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1074)
    at com.xinmeiti.spark.weibo.FilterData$.filterData(FilterData.scala:45)
    at com.xinmeiti.spark.weibo.FilterData$.main(FilterData.scala:20)
    at com.xinmeiti.spark.weibo.FilterData.main(FilterData.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

出了什么问题?

推荐答案

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

object FilterData {
  def main(args: Array[String]) {
    filterData()
  }

  def filterData() = {
    val sparkConf = new SparkConf().setAppName("filterData").setMaster("spark://spark:7077")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "172.16.1.10,172.16.1.11,172.16.1.12")
    conf.setInt("timeout", 120000)
    conf.set(TableInputFormat.INPUT_TABLE, "dataset")
    val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    val filtered = hbaseRDD.filter {
      case tuple => {
        val result = tuple._2
        val hostId = new String(result.getValue("user".getBytes, "id".getBytes))
        hostId == "12345" // <-- only retrieve the row when user:id is 12345
      }
    }
    val jobConfig: Job = Job.getInstance(conf)
    jobConfig.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    jobConfig.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "sample_data")
    filtered.saveAsNewAPIHadoopDataset(jobConfig.getConfiguration())
  }
}

这篇关于使用火花将RDD保存到hbase时,输出目录未设置异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆