用pyspark流到HBase [英] Streaming to HBase with pyspark

查看:205
本文介绍了用pyspark流到HBase的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在线上有大量有关使用Scala通过Spark流将其批量加载到HBase的信息(

There is a fair amount of info online about bulk loading to HBase with Spark streaming using Scala (these two were particularly useful) and some info for Java, but there seems to be a lack of info for doing it with PySpark. So my questions are:

  • 如何使用PySpark将数据批量加载到HBase中?
  • 任何语言中的大多数示例都只显示每行向上插入一列.如何在每行上增加多列?

我当前拥有的代码如下:

The code I currently have is as follows:

if __name__ == "__main__":

    context = SparkContext(appName="PythonHBaseBulkLoader")
    streamingContext = StreamingContext(context, 5)

    stream = streamingContext.textFileStream("file:///test/input");

    stream.foreachRDD(bulk_load)

    streamingContext.start()
    streamingContext.awaitTermination()

我需要帮助的是批量加载功能

What I need help with is the bulk load function

def bulk_load(rdd):
    #???

之前,我已经取得了一些进步,出现了许多错误(如此处)

I've made some progress previously, with many and various errors (as documented here and here)

推荐答案

因此,在经过反复尝试之后,我在这里介绍了我所想出的最好的方法.它运行良好,并且可以成功地批量加载数据(使用Put或HFiles),我非常愿意相信这不是最好的方法,因此欢迎提出任何评论/其他答案.假设您使用的是CSV数据.

So after much trial and error, I present here the best I have come up with. It works well, and successfully bulk loads data (using Puts or HFiles) I am perfectly willing to believe that it is not the best method, so any comments/other answers are welcome. This assume you're using a CSV for your data.

到目前为止,这是最简单的批量加载方式,它只是为CSV中的每个单元格创建一个Put请求,并将它们排队直到HBase.

By far the easiest way to bulk load, this simply creates a Put request for each cell in the CSV and queues them up to HBase.

def bulk_load(rdd):
    #Your configuration will likely be different. Insert your own quorum and parent node and table name
    conf = {"hbase.zookeeper.qourum": "localhost:2181",\
            "zookeeper.znode.parent": "/hbase-unsecure",\
            "hbase.mapred.outputtable": "Test",\
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\#Split the input into individual lines
                  .flatMap(csv_to_key_value)#Convert the CSV line to key value pairs
    load_rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

函数csv_to_key_value是发生魔术的地方:

The function csv_to_key_value is where the magic happens:

def csv_to_key_value(row):
    cols = row.split(",")#Split on commas.
    #Each cell is a tuple of (key, [key, column-family, column-descriptor, value])
    #Works well for n>=1 columns
    result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
              (cols[0], [cols[0], "f2", "c2", cols[2]]),
              (cols[0], [cols[0], "f3", "c3", cols[3]]))
    return result

我们前面定义的值转换器会将这些元组转换为HBase Put s

The value converter we defined earlier will convert these tuples into HBase Puts

使用HFiles进行批量加载更有效:直接对HFile进行写入,而不是对每个单元格进行Put请求,只需告诉RegionServer指向新的HFile.这将使用Py4J,因此在Python代码之前,我们必须编写一个小的Java程序:

Bulk loading with HFiles is more efficient: rather than a Put request for each cell, an HFile is written directly and the RegionServer is simply told to point to the new HFile. This will use Py4J, so before the Python code we have to write a small Java program:

import py4j.GatewayServer;
import org.apache.hadoop.hbase.*;

public class GatewayApplication {

    public static void main(String[] args)
    {
        GatewayApplication app = new GatewayApplication();
        GatewayServer server = new GatewayServer(app);
        server.start();
    }
}

编译并运行它.只要您的流媒体正在进行,就让它继续运行.现在,如下更新bulk_load:

Compile this, and run it. Leave it running as long as your streaming is happening. Now update bulk_load as follows:

def bulk_load(rdd):
    #The output class changes, everything else stays
    conf = {"hbase.zookeeper.qourum": "localhost:2181",\
            "zookeeper.znode.parent": "/hbase-unsecure",\
            "hbase.mapred.outputtable": "Test",\
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",\
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}#"org.apache.hadoop.hbase.client.Put"}

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
                  .flatMap(csv_to_key_value)\
                  .sortByKey(True)
    #Don't process empty RDDs
    if not load_rdd.isEmpty():
        #saveAsNewAPIHadoopDataset changes to saveAsNewAPIHadoopFile
        load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
                                        "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
                                        conf=conf,
                                        keyConverter=keyConv,
                                        valueConverter=valueConv)
        #The file has now been written, but HBase doesn't know about it

        #Get a link to Py4J
        gateway = JavaGateway()
        #Convert conf to a fully fledged Configuration type
        config = dict_to_conf(conf)
        #Set up our HTable
        htable = gateway.jvm.org.apache.hadoop.hbase.client.HTable(config, "Test")
        #Set up our path
        path = gateway.jvm.org.apache.hadoop.fs.Path("/tmp/hfiles" + startTime)
        #Get a bulk loader
        loader = gateway.jvm.org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles(config)
        #Load the HFile
        loader.doBulkLoad(path, htable)
    else:
        print("Nothing to process")

最后,相当简单的dict_to_conf:

def dict_to_conf(conf):
    gateway = JavaGateway()
    config = gateway.jvm.org.apache.hadoop.conf.Configuration()
    keys = conf.keys()
    vals = conf.values()
    for i in range(len(keys)):
        config.set(keys[i], vals[i])
    return config

如您所见,使用HFiles进行批量加载要比使用Put更为复杂,但是根据您的数据加载量,这可能是值得的,因为一旦您开始使用它就不那么困难了.

As you can see, bulk loading with HFiles is more complex than using Puts, but depending on your data load it is probably worth it since once you get it working it's not that difficult.

关于使我措手不及的事情的最后一个注意事项:HFiles期望接收到的数据以词法顺序写入.并非总是保证这是正确的,特别是因为"10" <0. "9".如果您将密钥设计为唯一,则可以轻松解决此问题:

One last note on something that caught me off guard: HFiles expect the data they receive to be written in lexical order. This is not always guaranteed to be true, especially since "10" < "9". If you have designed your key to be unique, then this can be fixed easily:

load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
              .flatMap(csv_to_key_value)\
              .sortByKey(True)#Sort in ascending order

这篇关于用pyspark流到HBase的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆