在DataFlow DoFn子类之间共享BigTable Connection对象 [英] Sharing BigTable Connection object among DataFlow DoFn sub-classes

查看:70
本文介绍了在DataFlow DoFn子类之间共享BigTable Connection对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在DataFlow中设置Java管道来读取.csv文件,并根据文件的内容创建一堆BigTable行.我在BigTable文档中看到该注释,即连接到BigTable是一项昂贵"的操作,最好只执行一次并在需要它的功能之间共享连接.

I am setting up a Java Pipeline in DataFlow to read a .csv file and to create a bunch of BigTable rows based on the content of the file. I see in the BigTable documentation the note that connecting to BigTable is an 'expensive' operation and that it's a good idea to do it only once and to share the connection among the functions that need it.

但是,如果我在主类中将Connection对象声明为public static变量并首先在main函数中连接到BigTable,则当我随后尝试在NullPointerException >子类的processElement()函数是我的DataFlow管道的一部分.

However, if I declare the Connection object as a public static variable in the main class and first connect to BigTable in the main function, I get the NullPointerException when I subsequently try to reference the connection in instances of DoFn sub-classes' processElement() function as part of my DataFlow pipeline.

相反,如果我在实际的DoFn类中将Connection声明为静态变量,则该操作成功完成.

Conversely, if I declare the Connection as a static variable in the actual DoFn class, then the operation works successfully.

执行此操作的最佳实践或最佳方法是什么?

What is the best-practice or optimal way to do this?

我担心如果我大规模实施第二个选项,那会浪费很多时间和资源.如果我在DoFn类中将变量保持为静态,是否足以确保API不会每次都尝试重新建立连接?

I'm concerned that if I implement the second option at scale, I will be wasting a lot of time and resources. If I keep the variable as static in the DoFn class, is it enough to ensure that the APIs don't try to re-establish the connection every time?

我意识到有一个特殊的BigTable I/O调用,用于将DataFlow管道对象与BigTable同步,但是我想我需要自己编写一个,以便在DoFn processElement()函数中内置一些特殊的逻辑. ..

I realize there is a special BigTable I/O call to sync DataFlow pipeline objects with BigTable, but I think I need to write one on my own to build-in some special logic into the DoFn processElement() function...

这是工作"代码的样子:

This is what the "working" code looks like:

class DigitizeBT extends DoFn<String, String>{
    private static Connection m_locConn;

    @Override
    public void processElement(ProcessContext c)
    {       
        try
        {
            m_locConn = BigtableConfiguration.connect("projectID", "instanceID");
            Table tbl = m_locConn.getTable(TableName.valueOf("TableName"));

            Put put = new Put(Bytes.toBytes(rowKey));

            put.addColumn(
                Bytes.toBytes("CF1"),
                Bytes.toBytes("SomeName"),
                Bytes.toBytes("SomeValue"));

            tbl.put(put);
        }
        catch (IOException e)
        {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

这是更新的代码,仅供参考:

This is what updated code looks like, FYI:

    public void SmallKVJob()
    {
        CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
                .withProjectId(DEF.ID_PROJ)
                .withInstanceId(DEF.ID_INST)
                .withTableId(DEF.ID_TBL_UNITS)
                .build();

        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        options.setProject(DEF.ID_PROJ);
        options.setStagingLocation(DEF.ID_STG_LOC);
//      options.setNumWorkers(3);
//      options.setMaxNumWorkers(5);        
//      options.setRunner(BlockingDataflowPipelineRunner.class);
        options.setRunner(DirectPipelineRunner.class);
        Pipeline p = Pipeline.create(options);

        p.apply(TextIO.Read.from(DEF.ID_BAL))
        .apply(ParDo.of(new DoFn1()))
        .apply(ParDo.of(new DoFn2()))
        .apply(ParDo.of(new DoFn3(config)));

        m_log.info("starting to run the job");
        p.run();
        m_log.info("finished running the job");
    }
}

class DoFn1 extends DoFn<String, KV<String, Integer>>
{
    @Override
    public void processElement(ProcessContext c)
    {
        c.output(KV.of(c.element().split("\\,")[0],Integer.valueOf(c.element().split("\\,")[1])));
    }
}

class DoFn2 extends DoFn<KV<String, Integer>, KV<String, Integer>>
{
    @Override
    public void processElement(ProcessContext c)
    {
        int max = c.element().getValue();
        String name = c.element().getKey();
        for(int i = 0; i<max;i++)
            c.output(KV.of(name,  1));
    }
}

class DoFn3 extends AbstractCloudBigtableTableDoFn<KV<String, Integer>, String>
{   
    public DoFn3(CloudBigtableConfiguration config)
    {
        super(config);
    }

    @Override
    public void processElement(ProcessContext c) 
    {
        try
        {
            Integer max = c.element().getValue();
            for(int i = 0; i<max; i++)
            {
                String owner = c.element().getKey();
                String rnd = UUID.randomUUID().toString();  

                Put p = new Put(Bytes.toBytes(owner+"*"+rnd));
                p.addColumn(Bytes.toBytes(DEF.ID_CF1), Bytes.toBytes("Owner"), Bytes.toBytes(owner));
                getConnection().getTable(TableName.valueOf(DEF.ID_TBL_UNITS)).put(p);
                c.output("Success");
            }
        } catch (IOException e)
        {
            c.output(e.toString());
            e.printStackTrace();
        }
    }
}

.csv输入文件看起来像这样:
玛丽,3000
约翰,5000
彼得,2000年
因此,对于.csv文件中的每一行,我必须将x的行数放入BigTable中,其中x是.csv文件中的第二个单元格...

The input .csv file looks something like this:
Mary,3000
John,5000
Peter,2000
So, for each row in the .csv file, I have to put in x number of rows into BigTable, where x is the second cell in the .csv file...

推荐答案

我们构建了AbstractCloudBigtableTableDoFn(

We built AbstractCloudBigtableTableDoFn ( Source & Docs ) for this purpose. Extend that class instead of DoFn, and call getConnection() instead of creating a Connection yourself.

10,000个小行应该花费一到两秒钟的实际工作.

10,000 small rows should take a second or two of actual work.

根据评论,应使用BufferedMutator而不是Table.put()以获得最佳吞吐量.

As per the comments, BufferedMutator should be used instead of Table.put() for best throughput.

这篇关于在DataFlow DoFn子类之间共享BigTable Connection对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆