如何从火花HBase的表中提取所有数据 [英] how to fetch all of data from hbase table in spark

查看:202
本文介绍了如何从火花HBase的表中提取所有数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在HBase的一大桌的名字是UserAction,它有三个列族(歌曲,专辑,歌手)。我需要从歌列族提取所有数据作为JavaRDD对象。我试试这个code,但它的效率不高。有没有更好的解决办法做到这一点?

 静态SparkConf sparkConf =新SparkConf()。setAppName(测试)。setMaster(
        本地[4]);
静态JavaSparkContext JSC =新JavaSparkContext(sparkConf);静态无效getRatings(){    配置CONF = HBaseConfiguration.create();
    conf.set(TableInputFormat.INPUT_TABLEUserAction);
    conf.set(TableInputFormat.SCAN_COLUMN_FAMILY,曲);    JavaPairRDD< ImmutableBytesWritable,结果> hBaseRDD = JSC
            .newAPIHadoopRDD(
                    CONF,
                    TableInputFormat.class,
                    org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
                    org.apache.hadoop.hbase.client.Result.class);    JavaRDD<评价>数= hBaseRDD
            .MAP(新功能< Tuple2< ImmutableBytesWritable,结果>中JavaRDD<评价>>(){                @覆盖
                公共JavaRDD<评价>呼叫(
                        Tuple2< ImmutableBytesWritable,结果> T)
                        抛出异常{
                    结果R = t._2;
                    INT用户=的Integer.parseInt(Bytes.toString(r.getRow()));
                    ArrayList的<评价> RA =新的ArrayList<>();                    为(小区C:r.rawCells()){                        INT产品=的Integer.parseInt(字节
                                的ToString(CellUtil.cloneQualifier(C)));
                        双等级= Double.parseDouble(字节
                                的ToString(CellUtil.cloneValue(C)));                        ra.add(新等级(用户,产品额定));
                    }                    返回jsc.parallelize(RA);
                }
            })
            。降低(新功能2< JavaRDD<评价>中JavaRDD<评价>中JavaRDD<评价>>(){
                @覆盖
                公共JavaRDD<评价>致电(JavaRDD<评价> R1,
                        JavaRDD<评价> R2)抛出异常{
                    返回r1.union(R2);
                }
            });
    jsc.stop();
}

宋列族方案的设计是:

  RowKey =用户ID,columnQualifier = songID和值=评级。


解决方案

更​​新:好吧,我现在看到你的问题,你把你的数组到RDDS一些疯狂的原因返回jsc.parallelize(RA); 。你为什么这样做?你为什么要创建RDDS的RDD?为什么不把它们作为数组?当你的减少则可以串联阵列。一个RDD是一种耐分布式数据集 - 它没有逻辑意义有的分布式数据集的数据集分布式的。我很惊讶你的工作,甚至运行,并不会崩溃!反正这就是为什么你的工作是如此缓慢。

总之,在斯卡拉后你的地图,你只是做一个 flatMap(身份)键,将所有的列表拼接在一起。

我真的不明白,为什么你需要做一个降低,也许这就是你有什么低效的事情。这里是我的code阅读HBase的表(其推广 - 即适用于任何方案)。有一件事可以肯定的是,以确保当你阅读HBase的表可以确保分区的数量是合适的(通常你想了很多)。

键入HBaseRow = java.util.NavigableMap中[阵列[字节]
  java.util.NavigableMap中的[数组[字节],java.util.NavigableMap中[java.lang.Long中,数组[字节]]]]
//地图(CF - >地图(列预选赛 - >地图(时间戳 - >值)))
键入CFTimeseriesRow =地图[数组[字节],地图[数组[字节],地图[龙,数组[字节]]]]高清navMapToMap(navMap:HBaseRow):CFTimeseriesRow =
  navMap.asScala.toMap.map(CF =>
    (cf._1,cf._2.asScala.toMap.map(COL =>
      (col._1,col._2.asScala.toMap.map(ELEM = GT;(elem._1.toLong,elem._2))))))高清readTableAll(表:字符串):RDD [(数组[字节],CFTimeseriesRow)] = {
  VAL CONF = HBaseConfiguration.create()
  conf.set(TableInputFormat.INPUT_TABLE,表)
  sc.newAPIHadoopRDD(CONF,classOf [TableInputFormat]
    classOf [org.apache.hadoop.hbase.io.ImmutableBytesWritable]
    classOf [org.apache.hadoop.hbase.client.Result])
  .MAP(KV =>(kv._1.get(),navMapToMap(kv._2.getMap)))
}

正如你所看到的,我没有必要在我的code一减少。其方法为pretty自我explainatory。我可以进一步深入到你的code,但我缺乏耐心看完Java作为它是如此epically详细。

我有一些更code专门为从行(而不是整个历史)获取最新的要素。让我知道如果你想看到这一点。

最后,建议你考虑使用卡桑德拉过的HBase作为datastax与databricks合作。

I have a big table in hbase that name is UserAction, and it has three column families(song,album,singer). I need to fetch all of data from 'song' column family as a JavaRDD object. I try this code, but it's not efficient. Is there a better solution to do this ?

    static SparkConf sparkConf = new SparkConf().setAppName("test").setMaster(
        "local[4]");
static JavaSparkContext jsc = new JavaSparkContext(sparkConf);

static void getRatings() {

    Configuration conf = HBaseConfiguration.create();
    conf.set(TableInputFormat.INPUT_TABLE, "UserAction");
    conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "song");

    JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc
            .newAPIHadoopRDD(
                    conf,
                    TableInputFormat.class,
                    org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
                    org.apache.hadoop.hbase.client.Result.class);

    JavaRDD<Rating> count = hBaseRDD
            .map(new Function<Tuple2<ImmutableBytesWritable, Result>, JavaRDD<Rating>>() {

                @Override
                public JavaRDD<Rating> call(
                        Tuple2<ImmutableBytesWritable, Result> t)
                        throws Exception {
                    Result r = t._2;
                    int user = Integer.parseInt(Bytes.toString(r.getRow()));
                    ArrayList<Rating> ra = new ArrayList<>();

                    for (Cell c : r.rawCells()) {

                        int product = Integer.parseInt(Bytes
                                .toString(CellUtil.cloneQualifier(c)));
                        double rating = Double.parseDouble(Bytes
                                .toString(CellUtil.cloneValue(c)));

                        ra.add(new Rating(user, product, rating));
                    }

                    return jsc.parallelize(ra);
                }
            })
            .reduce(new Function2<JavaRDD<Rating>, JavaRDD<Rating>, JavaRDD<Rating>>() {
                @Override
                public JavaRDD<Rating> call(JavaRDD<Rating> r1,
                        JavaRDD<Rating> r2) throws Exception {
                    return r1.union(r2);
                }
            });
    jsc.stop();
}

Song column family scheme design is :

RowKey = userID, columnQualifier = songID and value = rating.

解决方案

UPDATE: OK I see your problem now, for some crazy reason your turning your arrays into RDDs return jsc.parallelize(ra);. Why are you doing that?? Why are you creating an RDD of RDDs?? Why not leave them as arrays? When you do the reduce you can then concatenate the arrays. An RDD is a Resistant Distributed Dataset - it does not make logical sense to have a Distributed Dataset of Distributed Datasets. I'm surprised your job even runs and doesn't crash! Anyway that's why your job is so slow.

Anyway, in Scala after your map, you would just do a flatMap(identity) and that would concatenate all your lists together.

I don't really understand why you need to do a reduce, maybe that is where you have something inefficient going on. Here is my code to read HBase tables (its generalized - i.e. works for any scheme). One thing to be sure of is to make sure that when you read the HBase table you ensure the number of partitions is suitable (usually you want a lot).

type HBaseRow = java.util.NavigableMap[Array[Byte],
  java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]]
// Map(CF -> Map(column qualifier -> Map(timestamp -> value)))
type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]]

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

def readTableAll(table: String): RDD[(Array[Byte], CFTimeseriesRow)] = {
  val conf = HBaseConfiguration.create()
  conf.set(TableInputFormat.INPUT_TABLE, table)
  sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])
  .map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
}

As you can see, I have no need for a reduce in my code. The methods are pretty self explainatory. I could dig further into your code, but I lack the patience to read Java as it's so epically verbose.

I have some more code specifically for fetching the most recent elements from the row (rather than the entire history). Let me know if you want to see that.

Finally, recommend you look into using Cassandra over HBase as datastax is partnering with databricks.

这篇关于如何从火花HBase的表中提取所有数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆