Bigtable-BigQuery通过DataFlow导入:关于表分区和时间戳的2个问题 [英] Bigtable-BigQuery Import via DataFlow: 2 questions on table partitioning and Timestamps

查看:112
本文介绍了Bigtable-BigQuery通过DataFlow导入:关于表分区和时间戳的2个问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Dataflow中使用内置的Dataflow API来将数据从Bigtable导入到Bigquery中。我有两个问题:

问题1:如果源数据位于Bigtable中的一个大表中,我怎样才能将它分成一组BigQuery中的子表或更小的表动态地基于给定的Bigtable行键 - 仅在运行时才知道?



Dataflow中的Java代码看起来像这样:
$ b

  p.apply(Read.from(CloudBigtableIO.read(config)))
.apply(ParDo.of(new SomeDoFNonBTSourceData()))
.apply(BigQueryIO.Write
.to(PROJ_ID +:+ BQ_DataSet +。+ BQ_TableName)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();

因此,必须提供 BQ_TableName 在代码级,我该如何根据 SomeDoFNonBTSourceData 中的内容以编程方式提供它,例如当前RowKey的一系列值?如果RowKey是'a-c',那么TableA,如果'd-f'然后是TableB等。



问题2:正确的方式将Bigtable Timestamp导出到Bigquery中,以便最终以BigQuery中的可读格式重新构建它?



DoFn中的processElement函数如下所示:

  public void processElement(ProcessContext c)
{
String valA = new String (c.element()。getColumnLatestCell(COL_FAM,COL_NAME).getValueArray());
Long timeStamp = c.element()。getColumnLatestCell(COL_FAM,COL_NAME).getTimestamp();

tr.put(ColA,valA);
tr.put(TimeStamp,timeStamp);
c.output(tr);
}

在Pipeline构建期间,timeStamp列的BQ架构设置看起来像这:
$ b

  List< TableFieldSchema> fields = new ArrayList<>(); 
fields.add(new TableFieldSchema()。setName(ColA)。setType(STRING));
fields.add(new TableFieldSchema()。setName(TimeStamp)。setType(TIMESTAMP));
schema = new TableSchema()。setFields(fields);

所以Bigtable时间戳记的类型是 Long ,并且我尝试了BQ中的目标TimeStamp列的TIMESTAMPINTEGER类型好像BQ中没有Long一样)。最终,我需要使用BQ中的TimeStamp列来处理'order by'子句,并以人类可读形式(日期和时间)显示信息。 'order by'部分似乎工作正常,但我没有设法将最终结果转换为任何有意义的结果 - 要么发生转换错误,要么仍然不可读。

解决方案

顺便在这里寻找类似于问题1的问题的答案:)。对于第二个问题,我认为你首先需要确认Long timestamp确实是一个UNIX时间戳,我一直认为BQ可以把它作为一个时间戳记而不需要任何转换。



但你可以试试这个...

  Long longTimeStamp = 1408452095L; 

日期timeStamp = new Date();
timeStamp.setTime(longTimeStamp * 1000);

tr.put(TimeStamp,timeStamp.toInstant()。toString());


I have a job in Dataflow importing data from Bigtable into Bigquery by using built-in Dataflow APIs for both. I have two questions:

Question 1: If the source data is in one large table in Bigtable, how can I partition it into a set of sub- or smaller tables in BigQuery dynamically based on, say, the given Bigtable row-key known only at run-time?

The Java code in Dataflow looks like this:

p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(ParDo.of(new SomeDoFNonBTSourceData()))
        .apply(BigQueryIO.Write
                  .to(PROJ_ID + ":" + BQ_DataSet + "." + BQ_TableName)
                  .withSchema(schema)
                  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
        p.run();

So, since BQ_TableName has to be supplied at code-level, how can I provide it programmatically based on what is seen inside the SomeDoFNonBTSourceData, like a range of values of the current RowKey? If RowKey is 'a-c' then TableA, if 'd-f' then TableB, etc.

Question 2: What is the right way to export the Bigtable Timestamp into Bigquery so as to eventually reconstruct it in human-readable format in BigQuery?

The processElement function within the DoFn looks like this:

public void processElement(ProcessContext c)
{
    String valA = new String(c.element().getColumnLatestCell(COL_FAM, COL_NAME).getValueArray());
    Long timeStamp = c.element().getColumnLatestCell(COL_FAM, COL_NAME).getTimestamp();

    tr.put("ColA", valA);
    tr.put("TimeStamp",timeStamp);
    c.output(tr);
}

And during the Pipeline construction, the BQ schema setup for the timeStamp column looks like this:

List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("ColA").setType("STRING"));
    fields.add(new TableFieldSchema().setName("TimeStamp").setType("TIMESTAMP"));
    schema = new TableSchema().setFields(fields);

So the Bigtable timestamp seems to be of type Long, and I have tried both "TIMESTAMP" and "INTEGER" types for the destination TimeStamp column in BQ (seems like there is no Long in BQ as such). Ultimately, I need to use the TimeStamp column in BQ both for 'order by' clauses and to display the information in human-readable form (date and time). The 'order by' part seems to work OK, but I have not managed to CAST the end result into anything meaningful -- either get cast errors or something still unreadable.

解决方案

Incidentally am here looking for an answer to an issue similar to Question 1 :).

For the second question, I think you first need to confirm that the Long timestamp is indeed a UNIX timestamp, I've always assumed BQ can ingest that as a timestamp without any conversion.

But you can try this...

Long longTimeStamp = 1408452095L;

Date timeStamp = new Date();
timeStamp.setTime(longTimeStamp * 1000);

tr.put("TimeStamp", timeStamp.toInstant().toString());

这篇关于Bigtable-BigQuery通过DataFlow导入:关于表分区和时间戳的2个问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆