google-cloud-dataflow相关内容

具有动态目标的 WriteToBigQuery

我正在开发一个 Apache Beam 管道,它从发布/订阅中读取一堆事件,然后根据事件类型将它们写入单独的 BigQuery 表中. 我知道 WriteToBigQuery 支持动态目的地,但就我而言,问题在于目的地是从从事件中读取的数据派生而来的.例如:一个事件看起来像 {“object_id":123,...一些元数据,“object_data":{对象相关信息}} 应写入 Big ..
发布时间:2021-11-11 22:43:46 其他开发

AttributeError: 'RuntimeValueProvider' 对象没有属性 'projectId'

我正在尝试在 Dataflow runner 中运行 apache 光束管道;该作业从 bigquery 表读取数据并将数据写入数据库. 我在数据流中使用经典模板选项运行作业 - 意味着首先我必须暂存管道,然后使用适当的参数运行它. 我的管道选项如下 options = PipelineOptions()options.view_as(SetupOptions).save_main_ ..
发布时间:2021-11-11 22:43:43 其他开发

本地发布订阅模拟器不适用于 Dataflow

我正在用 Java 开发 Dataflow,输入来自 Pubsub.后来,我在这里看到了关于如何使用本地 Pubsub 模拟器的指南,所以我不会需要部署到 GCP 才能进行测试. 这是我的简单代码: 私有接口 Options extends PipelineOptions, PubsubOptions, StreamingOptions {@Description("要从中读取消息的发布/ ..

GCP 数据流计算图和作业执行

大家好,当我在 Google Cloud Dataflow 中创建自定义模板时,我努力理解发生了什么,但未能理解.感谢 GCP 文档.以下是我正在实现的目标. 从 Google Cloud Bucket 读取数据 预处理 加载深度学习模型(每个 1 GB)并获得预测 将结果转储到 BigQuery 中. 我成功创建了模板并且能够执行该作业.但我有以下问题. 当我执行作业时, ..

Java Apache Beam - 保存文件“本地"通过使用 DataflowRunner

可以发送java代码,但目前没有必要. 我有一个问题,当我以(DirectRunner - 使用 Google VM 实例)运行作业时,它工作正常,因为它将信息保存到本地文件并继续...... 尝试使用 (DataflowRunner) 时出现问题,以及我收到的错误: java.nio.file.NoSuchFileExtension: XXXX.csv..........XXXX.cs ..
发布时间:2021-11-11 22:43:31 Java开发

在更新 Dataflow 管道时强制更新 SideInput

我有一个 Dataflow 管道正在运行,它获取活动租户的配置(存储在 GCS 中)并将其作为 sideInput 提供给 ActiveTenantFilter.配置很少更新,因此我决定在更新时使用 --update 标志重新部署管道. 但是,当使用更新标志时,不会再次获取文件,即保持状态.是否可以强制在重新部署管道时更新此 PCollectionView? 解决方案 你是对的,当你 ..
发布时间:2021-11-11 22:43:19 其他开发

如何将我的pickeled ML模型从GCS加载到Dataflow/Apache Beam

我在本地开发了一个 apache 光束管道,我在其中对示例文件运行预测. 在我的计算机本地,我可以像这样加载模型: with open('gs://newbucket322/my_dumped_classifier.pkl', 'rb') as fid:gnb_loaded = cPickle.load(fid) 但是在谷歌数据流上运行时显然不起作用.我尝试将路径更改为 GS://但这 ..

在 Apache Beam 中创建自定义窗口函数

我有一个 Beam 管道,它从读取多个文本文件开始,其中文件中的每一行代表一行,该行稍后会在管道中插入 Bigtable.该场景需要确认从每个文件中提取的行数后来插入到 Bigtable 匹配中的行数.为此,我计划开发一个自定义窗口策略,以便根据文件名作为将传递给窗口函数的键,将来自单个文件的行分配给单个窗口. 是否有创建自定义窗口函数的代码示例? 解决方案 虽然我改变了确认插入行数 ..
发布时间:2021-11-11 22:43:04 其他开发

使用数据流时出现意外异常

当我通过以下步骤使用数据流时:- 从 bigquery 读取- 将表格行转换为 json 字符串- 插入到elasticsearch (7.5.2)它看起来很适合处理 ~100k 条记录,但实际上(8m 条记录 ~65gb)数据流在插入 300k 条记录后会抛出异常. 来自工人的错误消息:java.lang.RuntimeException:意外的 org.apache.beam.runner ..
发布时间:2021-11-11 22:43:01 其他开发

Beam/DataFlow ::ReadFromPubSub(id_label) :: 意外行为

有人可以澄清 ReafFromPubSub 转换? 我正在使用 BigQuery 接收器,我的理解它就像 BQ Streaming API 的 insertId,表格数据:insertAll 每一行的唯一 ID.BigQuery 使用此属性尽最大努力检测重复的插入请求.如需了解详情,请参阅数据一致性. 但是我没有看到这种预期的行为. 我正在向 Pub/Sub 发布消息,每条 ..

我可以强制我的数据流管道中的一个步骤是单线程的(并且在一台机器上)吗?

我有一个管道,可以获取文件的 URL 并下载这些文件,为除标题之外的每一行生成 BigQuery 表行. 为了避免重复下载,我想根据以前下载的表检查 URL,如果 URL 不在此“历史"表中,则仅继续存储该 URL. 为此,我需要将历史记录存储在允许唯一值的数据库中,或者为此使用 BigQuery 可能更容易,但对表的访问必须严格串行. 我可以强制执行单线程执行(在一台机器上)来 ..

使用 Beam-nuggets 库部署管道时出现 GCP Dataflow 运行器错误 - “无法读取 data_plane 中的输入".

我一直在使用 Kafka 实例作为输入和 Bigquery 作为输出在 GCP 提供的 Apache Beam Notebook 中测试 Apache Beam 管道.我已经能够通过 Interactive runner 成功使用管道,但是当我将相同的管道部署到 Dataflow runner 时,它似乎从未真正从已定义的 Kafka 主题中读取.查看日志给了我错误: 无法读取数据平面中的输 ..
发布时间:2021-11-11 22:42:43 其他开发

写入 Avro 文件时的架构更新

背景:我们有一个 Dataflow 作业,它将 PubSub 消息转换为 Avro GenericRecords 并将它们作为“.avro"写入 GCS.PubSub 消息和 GenericRecords 之间的转换需要一个模式.此架构每周更改一次,仅添加字段.我们希望能够在不更新 Dataflow 作业的情况下更新字段. 我们做了什么:我们听取了这篇博文 并创建了一个每分钟刷新一次内容的 ..
发布时间:2021-11-11 22:42:40 Java开发

有状态的 DoFn 是否可以具有随 TTL 过期的状态?或者无限增长可以吗?

我在 Apache Beam(在 Dataflow 中运行)中有一个情况,我创建了一个简单的有状态 DoFn,基于 这篇文章.上游窗口是全局的,更改它会影响下游聚合. 目前,我没有做任何事情来缩小状态,它似乎只是无限增长.这是真的?无界状态增长是个问题吗? 我想简单地将 TTL 附加到状态,但没有看到此功能. 我正在考虑在数据上存储我自己的时间戳,并使用计时器定期清理表.这是可取 ..
发布时间:2021-11-11 22:42:37 其他开发

如何暂存 GCP/Apache Beam 数据流模板?

好的,我必须在这里遗漏一些东西.将管道作为模板进行暂存需要什么?当我尝试通过 这些说明,它运行模块但不进行任何操作.它似乎按预期运行而没有错误,但我没有看到任何文件实际上被添加到存储桶位置,在我的 --template_location 中侦听.我的python代码应该出现在那里吗?我猜对吗?我已经确定我已经安装了所有的 Beam 和谷歌云 SDK,但也许我遗漏了什么?您需要做什么来暂存此数据流模 ..