Bigtable-BigQuery通过DataFlow导入:关于表分区和时间戳的2个问题 [英] Bigtable-BigQuery Import via DataFlow: 2 questions on table partitioning and Timestamps
问题描述
我在Dataflow中使用内置的Dataflow API来将数据从Bigtable导入到Bigquery中。我有两个问题:
问题1:如果源数据位于Bigtable中的一个大表中,我怎样才能将它分成一组BigQuery中的子表或更小的表动态地基于给定的Bigtable行键 - 仅在运行时才知道?
Dataflow中的Java代码看起来像这样:
$ b
p.apply(Read.from(CloudBigtableIO.read(config)))
.apply(ParDo.of(new SomeDoFNonBTSourceData()))
.apply(BigQueryIO.Write
.to(PROJ_ID +:+ BQ_DataSet +。+ BQ_TableName)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
因此,必须提供 BQ_TableName
在代码级,我该如何根据 SomeDoFNonBTSourceData
中的内容以编程方式提供它,例如当前RowKey的一系列值?如果RowKey是'a-c',那么TableA,如果'd-f'然后是TableB等。
问题2:正确的方式将Bigtable Timestamp导出到Bigquery中,以便最终以BigQuery中的可读格式重新构建它?
DoFn中的processElement函数如下所示:
public void processElement(ProcessContext c)
{
String valA = new String (c.element()。getColumnLatestCell(COL_FAM,COL_NAME).getValueArray());
Long timeStamp = c.element()。getColumnLatestCell(COL_FAM,COL_NAME).getTimestamp();
tr.put(ColA,valA);
tr.put(TimeStamp,timeStamp);
c.output(tr);
}
在Pipeline构建期间,timeStamp列的BQ架构设置看起来像这:
$ b
List< TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema()。setName(ColA)。setType(STRING));
fields.add(new TableFieldSchema()。setName(TimeStamp)。setType(TIMESTAMP));
schema = new TableSchema()。setFields(fields);
所以Bigtable时间戳记的类型是 Long
,并且我尝试了BQ中的目标TimeStamp列的TIMESTAMP
和INTEGER
类型好像BQ中没有Long一样)。最终,我需要使用BQ中的TimeStamp列来处理'order by'子句,并以人类可读形式(日期和时间)显示信息。 'order by'部分似乎工作正常,但我没有设法将最终结果转换为任何有意义的结果 - 要么发生转换错误,要么仍然不可读。
顺便在这里寻找类似于问题1的问题的答案:)。对于第二个问题,我认为你首先需要确认Long timestamp确实是一个UNIX时间戳,我一直认为BQ可以把它作为一个时间戳记而不需要任何转换。
但你可以试试这个...
Long longTimeStamp = 1408452095L;
日期timeStamp = new Date();
timeStamp.setTime(longTimeStamp * 1000);
tr.put(TimeStamp,timeStamp.toInstant()。toString());
I have a job in Dataflow importing data from Bigtable into Bigquery by using built-in Dataflow APIs for both. I have two questions:
Question 1: If the source data is in one large table in Bigtable, how can I partition it into a set of sub- or smaller tables in BigQuery dynamically based on, say, the given Bigtable row-key known only at run-time?
The Java code in Dataflow looks like this:
p.apply(Read.from(CloudBigtableIO.read(config)))
.apply(ParDo.of(new SomeDoFNonBTSourceData()))
.apply(BigQueryIO.Write
.to(PROJ_ID + ":" + BQ_DataSet + "." + BQ_TableName)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
So, since BQ_TableName
has to be supplied at code-level, how can I provide it programmatically based on what is seen inside the SomeDoFNonBTSourceData
, like a range of values of the current RowKey? If RowKey is 'a-c' then TableA, if 'd-f' then TableB, etc.
Question 2: What is the right way to export the Bigtable Timestamp into Bigquery so as to eventually reconstruct it in human-readable format in BigQuery?
The processElement function within the DoFn looks like this:
public void processElement(ProcessContext c)
{
String valA = new String(c.element().getColumnLatestCell(COL_FAM, COL_NAME).getValueArray());
Long timeStamp = c.element().getColumnLatestCell(COL_FAM, COL_NAME).getTimestamp();
tr.put("ColA", valA);
tr.put("TimeStamp",timeStamp);
c.output(tr);
}
And during the Pipeline construction, the BQ schema setup for the timeStamp column looks like this:
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("ColA").setType("STRING"));
fields.add(new TableFieldSchema().setName("TimeStamp").setType("TIMESTAMP"));
schema = new TableSchema().setFields(fields);
So the Bigtable timestamp seems to be of type Long
, and I have tried both "TIMESTAMP"
and "INTEGER"
types for the destination TimeStamp column in BQ (seems like there is no Long in BQ as such). Ultimately, I need to use the TimeStamp column in BQ both for 'order by' clauses and to display the information in human-readable form (date and time). The 'order by' part seems to work OK, but I have not managed to CAST the end result into anything meaningful -- either get cast errors or something still unreadable.
Incidentally am here looking for an answer to an issue similar to Question 1 :).
For the second question, I think you first need to confirm that the Long timestamp is indeed a UNIX timestamp, I've always assumed BQ can ingest that as a timestamp without any conversion.
But you can try this...
Long longTimeStamp = 1408452095L;
Date timeStamp = new Date();
timeStamp.setTime(longTimeStamp * 1000);
tr.put("TimeStamp", timeStamp.toInstant().toString());
这篇关于Bigtable-BigQuery通过DataFlow导入:关于表分区和时间戳的2个问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!