如何从spark中的hbase表中获取所有数据 [英] how to fetch all of data from hbase table in spark

查看:42
本文介绍了如何从spark中的hbase表中获取所有数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 hbase 中有一个名为 UserAction 的大表,它具有三个列族(歌曲、专辑、歌手).我需要从歌曲"列族中获取所有数据作为 JavaRDD 对象.我尝试了这段代码,但效率不高.有更好的解决方案吗?

I have a big table in hbase that name is UserAction, and it has three column families(song,album,singer). I need to fetch all of data from 'song' column family as a JavaRDD object. I try this code, but it's not efficient. Is there a better solution to do this ?

    static SparkConf sparkConf = new SparkConf().setAppName("test").setMaster(
        "local[4]");
static JavaSparkContext jsc = new JavaSparkContext(sparkConf);

static void getRatings() {

    Configuration conf = HBaseConfiguration.create();
    conf.set(TableInputFormat.INPUT_TABLE, "UserAction");
    conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "song");

    JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc
            .newAPIHadoopRDD(
                    conf,
                    TableInputFormat.class,
                    org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
                    org.apache.hadoop.hbase.client.Result.class);

    JavaRDD<Rating> count = hBaseRDD
            .map(new Function<Tuple2<ImmutableBytesWritable, Result>, JavaRDD<Rating>>() {

                @Override
                public JavaRDD<Rating> call(
                        Tuple2<ImmutableBytesWritable, Result> t)
                        throws Exception {
                    Result r = t._2;
                    int user = Integer.parseInt(Bytes.toString(r.getRow()));
                    ArrayList<Rating> ra = new ArrayList<>();

                    for (Cell c : r.rawCells()) {

                        int product = Integer.parseInt(Bytes
                                .toString(CellUtil.cloneQualifier(c)));
                        double rating = Double.parseDouble(Bytes
                                .toString(CellUtil.cloneValue(c)));

                        ra.add(new Rating(user, product, rating));
                    }

                    return jsc.parallelize(ra);
                }
            })
            .reduce(new Function2<JavaRDD<Rating>, JavaRDD<Rating>, JavaRDD<Rating>>() {
                @Override
                public JavaRDD<Rating> call(JavaRDD<Rating> r1,
                        JavaRDD<Rating> r2) throws Exception {
                    return r1.union(r2);
                }
            });
    jsc.stop();
}

宋柱族方案设计为:

RowKey = userID, columnQualifier = songID and value = rating.

推荐答案

更新:好的,我现在看到了你的问题,出于某种疯狂的原因,你将数组转换为 RDD return jsc.parallelize(ra);.你为什么这么做??为什么要创建 RDD 的 RDD?为什么不将它们保留为数组?当你做 reduce 时,你可以连接数组.RDD 是一个抗性分布式数据集 - 拥有一个 of 个分布式数据集的分布式数据集在逻辑上没有意义.我很惊讶你的工作甚至可以运行并且没有崩溃!无论如何,这就是你的工作如此缓慢的原因.

UPDATE: OK I see your problem now, for some crazy reason your turning your arrays into RDDs return jsc.parallelize(ra);. Why are you doing that?? Why are you creating an RDD of RDDs?? Why not leave them as arrays? When you do the reduce you can then concatenate the arrays. An RDD is a Resistant Distributed Dataset - it does not make logical sense to have a Distributed Dataset of Distributed Datasets. I'm surprised your job even runs and doesn't crash! Anyway that's why your job is so slow.

无论如何,在 Scala 中,在您的地图之后,您只需执行 flatMap(identity) 并将所有列表连接在一起.

Anyway, in Scala after your map, you would just do a flatMap(identity) and that would concatenate all your lists together.

我真的不明白为什么你需要做一个减少,也许这就是你有一些低效的地方.这是我读取 HBase 表的代码(它是通用的 - 即适用于任何方案).要确定的一件事是确保在读取 HBase 表时确保分区数量合适(通常需要很多).

I don't really understand why you need to do a reduce, maybe that is where you have something inefficient going on. Here is my code to read HBase tables (its generalized - i.e. works for any scheme). One thing to be sure of is to make sure that when you read the HBase table you ensure the number of partitions is suitable (usually you want a lot).

type HBaseRow = java.util.NavigableMap[Array[Byte],
  java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]]
// Map(CF -> Map(column qualifier -> Map(timestamp -> value)))
type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]]

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

def readTableAll(table: String): RDD[(Array[Byte], CFTimeseriesRow)] = {
  val conf = HBaseConfiguration.create()
  conf.set(TableInputFormat.INPUT_TABLE, table)
  sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])
  .map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
}

如您所见,我的代码不需要减少.这些方法是非常自我解释的.我可以深入研究您的代码,但我缺乏阅读 Java 的耐心,因为它非常冗长.

As you can see, I have no need for a reduce in my code. The methods are pretty self explainatory. I could dig further into your code, but I lack the patience to read Java as it's so epically verbose.

我有更多代码专门用于从行中获取最新元素(而不是整个历史记录).如果你想看,请告诉我.

I have some more code specifically for fetching the most recent elements from the row (rather than the entire history). Let me know if you want to see that.

最后,建议您考虑使用 Cassandra 而不是 HBase,因为 datastax 正在与 databricks 合作.

Finally, recommend you look into using Cassandra over HBase as datastax is partnering with databricks.

这篇关于如何从spark中的hbase表中获取所有数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆