如何使用火花来处理一系列hbase行? [英] How to process a range of hbase rows using spark?

查看:93
本文介绍了如何使用火花来处理一系列hbase行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图用HBase作为spark的数据源。所以第一步就是从HBase表创建一个RDD。由于Spark使用hadoop输入格式,因此我可以通过创建一个rdd来找到一种使用所有行的方法。http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/ 2014/01/25 /照明-A-火花与-HBase的> http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a -spark-with-hbase 但是,我们如何为范围扫描创建RDD?



欢迎您提供所有建议。



  

> import java.io. {DataOutputStream,ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase。 HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64

def convertScanTo String(scan:Scan):String = {
val out:ByteArrayOutputStream = new ByteArrayOutputStream
val dos:DataOutputStream = new DataOutputStream(out)
scan.write(dos)
Base64 .encodeBytes(out.toByteArray)
}

val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE,table_name)
conf.set(TableInputFormat.SCAN,convertScanToString(scan))
val rdd = sc .newAPIHadoopRDD(conf,classOf [TableInputFormat],classOf [ImmutableBytesWritable],classOf [Result])
rdd.count

您需要将相关库添加到Spark类路径中,并确保它们与Spark兼容。提示:您可以使用 hbase classpath 来查找它们。


I am trying to use HBase as a data source for spark. So the first step turns out to be creating a RDD from a HBase table. Since Spark works with hadoop input formats, i could find a way to use all rows by creating an rdd http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbase But how do we create a RDD for a range scan ?

All suggestions are welcome.

解决方案

Here is an example of using Scan in Spark:

import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64

def convertScanToString(scan: Scan): String = {
  val out: ByteArrayOutputStream = new ByteArrayOutputStream
  val dos: DataOutputStream = new DataOutputStream(out)
  scan.write(dos)
  Base64.encodeBytes(out.toByteArray)
}

val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE, "table_name")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.count

You need to add related libraries to the Spark classpath and make sure they are compatible with your Spark. Tips: you can use hbase classpath to find them.

这篇关于如何使用火花来处理一系列hbase行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆