google-cloud-dataflow相关内容

Google Dataflow - 将数据保存到多个BigQuery表格中

我使用Google Dataflow 1.9将数据保存到BigQuery表格中。 我正在寻找一种方法来控制基于该元素中的某个值写入(PCollection)元素的表名称。在我们的例子中,这些元素包含一个user-id,我们希望动态地将它们写入它自己的用户表中。 解析方案 对于1.9.0,唯一的选择是(1)将元素分割成多个输出集合,然后将每个输出集合写入特定的表或(2)窗口元素并根据窗口选择目标 ..
发布时间:2018-05-07 17:44:39 其他开发

Bigtable-BigQuery通过DataFlow导入:关于表分区和时间戳的2个问题

我在Dataflow中使用内置的Dataflow API来将数据从Bigtable导入到Bigquery中。我有两个问题: 问题1:如果源数据位于Bigtable中的一个大表中,我怎样才能将它分成一组BigQuery中的子表或更小的表动态地基于给定的Bigtable行键 - 仅在运行时才知道? Dataflow中的Java代码看起来像这样: $ b p.apply(Read.fr ..

数据流中的动态bigquery表名

基本上,我们希望根据特定列的值(而不是日期)将大型(数十亿行)bigquery表拆分为大量(大约为10万个)较小的表。我无法弄清楚如何在bigquery中有效地做到这一点,所以我正在考虑使用数据流。 使用数据流,我们可以先加载数据,然后为每个记录创建一个键值对,关键字是我们想要拆分表的特定列的所有可能值,然后我们可以通过键对记录进行分组。所以在这个操作之后,我们有(密钥,[记录])的PCol ..
发布时间:2018-05-07 17:38:28 其他开发

如何从Google Dataflow中的PCollection中获取元素列表并在流水线中使用它来循环写入变换?

我想: 获取主PCollection中的唯一日期列表 循环访问该列表中的日期以创建筛选的PCollections(每个日期均具有唯一的日期),然后分别写入将过滤的PCollection过滤到BigQuery中时分区表中的分区。 如何获取该列表?在下面的组合转换之后,我创建了一个ListPCollectionView对象,但我无法迭代该对象: class ToUniqueLis ..
发布时间:2018-05-07 17:37:25 Python

Dataflow中的BigQuery无法从云存储中加载数据:为非记录字段指定的JSON对象

我在我的机器上本地运行一个Dataflow管道,以写入BigQuery。此批处理作业中的BigQuery需要临时位置。我在我的云存储中提供了一个。相关部分是: PipelineOptions选项= PipelineOptionsFactory.create(); options.as(BigQueryOptions.class) .setTempLocation(“gs:// fol ..
发布时间:2018-05-07 17:32:05 其他开发

Google Cloud Dataflow BigQueryIO.Write出现未知错误(http代码500)

有人发生了与我相同的问题,Google Cloud Dataflow BigQueryIO.Write发生未知错误(http code 500)? 我在4月份使用Dataflow处理一些数据,5月,6月,我使用相同的代码来处理4月份的数据(400MB)并写入BigQuery成功,但是当我处理5月(60MB)或6月(90MB)数据时,它是失败的。 4月,5月和6月的数据格式相同。 从Bi ..

在从Dataflow插入BigQuery之前验证行

根据 在从数据流加载BigQuery表时,我们该如何设置maximum_bad_records?在将数据加载到BigQuery时,目前没有办法设置 maxBadRecords 配置来自Dataflow。如果我有 TableSchema ,那么可以在Dataflow作业中验证这些行,然后再将它们插入到BigQuery中。和 TableRow ,我该如何确保行可以安全地插入表中? 这样做比在模 ..
发布时间:2018-05-07 17:31:32 其他开发

在Dataflow中自动检测BigQuery模式?

是否可以使用 - 自动检测 在DataFlow? 即我们可以在没有指定模式的情况下将数据加载到BQ表中,相当于我们如何使用 - autodetect 加载数据 (潜在的相关问题) 解决方案 如果您使用协议缓冲区作为您PCollections中的对象(应该在Dataflow后端执行得非常好),您可能可以使用我写的util以往。它会在运行时根据原始缓冲区描述符的检查将原始缓冲区的模 ..
发布时间:2018-05-07 17:30:55 其他开发

Google云端数据流管道中的例外情况,从BigQuery到Cloud Bigtable

执行DataFlow管道,我们每隔一段时间就会看到这些异常。我们能为他们做些什么吗?我们有一个非常简单的流程,它从BigQuery查询中读取数据并填充BigTable中的数据。 管道中的数据也会发生什么变化?它是否被重新处理?或者它在传输到BigTable时丢失了? CloudBigtableIO.initializeForWrite(p); p.apply(BigQueryIO ..

分析云数据流BigQuery吞吐量/流水线

我试图弄清楚DataFlow如何扩展某些操作,以及如何让表现最佳。首先,我创建了一个简单的流程,用于从BigQuery中读取数据(大约25M行,总共30GB),JSON抽取,一个按键的简单组,然后一个聚集(每个〜100个元素)对每个键进行转换并将其放回到一个新表中(〜500k行,总共25gb)。 总的管道执行时间为10-18分钟我分配了多少工人,或者如果我使用多核机器等等,我无法在此之下加快 ..

使用Apache Beam进行Dataflow批量加载时的性能问题

我正在做一个数据流批量加载的性能基准测试,并发现与BigQuery命令行工具上的相同加载相比,加载速度太慢。 文件大小约为20 MB,拥有数百万条记录。我尝试了不同的机器类型,并在加载目标BQ表时大约加载时间为8分钟,在 n1-highmem-4 上获得了最佳加载性能。 b $ b 当通过在命令行实用程序上运行BQ命令应用相同的表加载时,几乎不用花2分钟来处理和加载相同的数据量。 有关 ..
发布时间:2018-05-07 17:27:04 其他开发

如何将Google Cloud SQL与Google Big Query集成

我正在设计一个解决方案,使用Google Cloud SQL来存储应用程序正常运行的所有数据(各种OLTP数据)。预计这些数据会随着时间的推移而变得相当大。数据本身本质上是关系型的,因此我们选择了Cloud SQL而不是Cloud Datastore。 这些数据需要输入Big Query进行分析,这需要接近实时分析(作为最好的情况),尽管实际上可能会有一些滞后。但我试图设计一个解决方案,以尽 ..

Google Cloud DataFlow随机化WritetoBigQuery

我成功实现了一个写入BigQuery的数据流管道。此管道正在转换Cloud ML Engine作业的数据。但是,我注意到已写入的行按照我的数据标签排序(或至少分组)。通过这个,我的意思是它们在视觉上似乎以某种方式组织起来(这不是完全随机的)。然后,当我将表格导出到GCS中的分片.csv时,每个分片.csv基本上都是有序的。这意味着数据不能随机输入到TensorFlow中,因为TF一次抓取一个.cs ..

如何从Google BigQuery将数据加载到Google Cloud Bigtable中

我需要将数据填充到Google Cloud Bigtable中,并且数据的来源将是Google BigQuery。 作为练习,我可以读取数据来自BigQuery ,作为一项单独的练习,我能够将数据写入Bigtable 。 现在我必须将这两个操作合并为一个Google Cloud Dataflow作业。任何示例都会有很大的帮助。 解决方案 您可以使用这些示例中所示的转换,添加所需 ..

什么是AWS& Google云服务?

有没有人有过经验: 从亚马逊发送流式/微量批处理日志数据到BigQuery进行处理,任何延迟问题? 从Google DataFlow发送(微批)日志到Amazon(Kinesis / S3 / DynamoDB) 有人可以提供有关延迟的信息吗? 谢谢 解决方案在问题1中,我相信你对BigQuery摄入延迟感兴趣。根据将数据流式传输到BigQuery 中,流式传输的数据可用 ..

从BigQuery获取TableSchema结果PCollection< TableRow>

当我在BigQuery Web UI中运行查询时,结果显示在一个表中,其中每个字段的名称和类型都是已知的(即使字段是COUNT(),AVG()的结果)...操作,字段的类型当然是已知的)。 结果可以直接导出为表/ json / csv。 我的问题是,当我在我的java项目中检索查询结果时,例如与查询: 字符串查询=“选择国籍,COUNT(DISTINCT personID)AS人口 ..
发布时间:2018-05-07 17:25:24 Java开发

从Google App Engine应用程序运行Google Dataflow管道?

我使用DataflowPipelineRunner创建数据流作业。我尝试了以下方案。 不指定任何machineType 使用g1小型机器 与n1-highmem-2 在上述所有场景中,Input是来自GCS的文件,是非常小的文件(KB大小),输出是Big Query表。 我在所有情况下都出现了内存不足错误。 我的编译代码大小为94mb。我只尝试字数统计的例子,它没有读取 ..