如何使用spark从hbase读取 [英] How to read from hbase using spark

查看:29
本文介绍了如何使用spark从hbase读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面的代码将从hbase读取,然后将其转换为json结构并转换为schemaRDD,但问题是我使用List来存储json字符串然后传递给javaRDD,对于大约 100 GB 的数据,master 将在内存中加载数据.从 hbase 加载数据然后执行操作,然后转换为 JavaRDD 的正确方法是什么.

The below code will read from the hbase, then convert it to json structure and the convert to schemaRDD , But the problem is that I am using List to store the json string then pass to javaRDD, for data of about 100 GB the master will be loaded with data in memory. What is the right way to load the data from hbase then perform manipulation,then convert to JavaRDD.

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

    public static void main(String[] args) throws IOException, ParseException {

        List<String> jars = Lists.newArrayList("");

        SparkConf spconf = new SparkConf();
        spconf.setMaster("local[2]");
        spconf.setAppName("HBase");
        //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
        spconf.setJars(jars.toArray(new String[jars.size()]));
        JavaSparkContext sc = new JavaSparkContext(spconf);
        //spconf.set("spark.executor.memory", "1g");

        JavaSQLContext jsql = new JavaSQLContext(sc);


        HBaseConfiguration conf = new HBaseConfiguration();
        String tableName = "HBase.CounData1_Raw_Min1";
        HTable table = new HTable(conf,tableName);
        try {

            ResultScanner scanner = table.getScanner(new Scan());
            List<String> jsonList = new ArrayList<String>();

            String json = null;

            for(Result rowResult:scanner) {
                json = "";
                String rowKey  = Bytes.toString(rowResult.getRow());
                for(byte[] s1:rowResult.getMap().keySet()) {
                    String s1_str = Bytes.toString(s1);

                    String jsonSame = "";
                    for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
                        String s2_str = Bytes.toString(s2);
                        for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
                            String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
                            jsonSame += "\""+s2_str+"\":"+s3_str+",";
                        }
                    }
                    jsonSame = jsonSame.substring(0,jsonSame.length()-1);
                    json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
                }
                json = json.substring(0,json.length()-1);
                json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
                jsonList.add(json);
            }

            JavaRDD<String> jsonRDD = sc.parallelize(jsonList);

            JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);




            System.out.println(schemaRDD.take(2));

        } finally {
            table.close();
        }

    }

}

推荐答案

A Basic Example to Read the HBase data using Spark (Scala), You can also wrtie this in Java :

A Basic Example to Read the HBase data using Spark (Scala), You can also wrtie this in Java :

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    val tableName = "table1"

    System.setProperty("user.name", "hdfs")
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    conf.set("hbase.master", "localhost:60000")
    conf.setInt("timeout", 120000)
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found : " + hBaseRDD.count())
    sc.stop()
  }
}

更新 -2016 年

从 Spark 1.0.x+ 开始,现在您也可以使用 Spark-HBase 连接器:

UPDATED -2016

As of Spark 1.0.x+, Now you can use Spark-HBase Connector also :

要包含的 Maven 依赖项:

Maven Dependency to Include :

<dependency>
  <groupId>it.nerdammer.bigdata</groupId>
  <artifactId>spark-hbase-connector_2.10</artifactId>
  <version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>

并找到以下示例代码:

import org.apache.spark._
import it.nerdammer.spark.hbase._

object HBaseRead extends App {
    val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
    sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
    val sc = new SparkContext(sparkConf)

    // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:

    val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
    .select("DocID", "Title").inColumnFamily("SMPL")

    println("Number of Records found : " + docRdd .count())
}

更新 - 2017 年

从 Spark 1.6.x+ 开始,现在您也可以使用 SHC 连接器(Hortonworks 或 HDP 用户):

UPDATED - 2017

As of Spark 1.6.x+, Now you can use SHC Connector also (Hortonworks or HDP users) :

要包含的 Maven 依赖项:

Maven Dependency to Include :

    <dependency>
        <groupId>com.hortonworks</groupId>
        <artifactId>shc</artifactId>
        <version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
    </dependency>

使用此连接器的主要优点是它在架构定义中具有灵活性,并且不需要像在 nerdammer/spark-hbase-connector 中那样的硬编码参数.另请记住,它支持 Spark 2.x,因此此连接器非常灵活,并在问题和 PR 中提供端到端支持.

The Main advantage of using this connector is that it have flexibility in the Schema definition and doesn't need Hardcoded params just like in nerdammer/spark-hbase-connector. Also remember that it supports Spark 2.x so this connector is pretty much flexible and provides end-to-end support in Issues and PRs.

找到最新自述文件和示例的以下存储库路径:

Find the below repository path for the latest readme and samples :

Hortonworks Spark HBase 连接器

您还可以将此 RDD 转换为 DataFrame 并在其上运行 SQL,或者您可以将这些 Dataset 或 DataFrame 映射到用户定义的 Java Pojo 或 Case 类.效果很好.

You can also convert this RDD's to DataFrames and run SQL over it or You can map these Dataset or DataFrames to user defined Java Pojo's or Case classes. It works brilliant.

如果还有什么需要请在下方留言.

Please comment below if you need anything else.

这篇关于如何使用spark从hbase读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆