google-cloud-dataflow相关内容
我正在使用光束/数据流运行流式管道.我正在从 pub/sub 读取我的输入,并将其转换为如下 dict: raw_loads_dict = (p|'ReadPubsubLoads' >>ReadFromPubSub(topic=PUBSUB_TOPIC_NAME).with_output_types(bytes)|'JSONParse' >>梁.Map(lambda x: json.loads
..
我正在开发一个 Apache Beam 管道,它从发布/订阅中读取一堆事件,然后根据事件类型将它们写入单独的 BigQuery 表中. 我知道 WriteToBigQuery 支持动态目的地,但就我而言,问题在于目的地是从从事件中读取的数据派生而来的.例如:一个事件看起来像 {“object_id":123,...一些元数据,“object_data":{对象相关信息}} 应写入 Big
..
我正在尝试在 Dataflow runner 中运行 apache 光束管道;该作业从 bigquery 表读取数据并将数据写入数据库. 我在数据流中使用经典模板选项运行作业 - 意味着首先我必须暂存管道,然后使用适当的参数运行它. 我的管道选项如下 options = PipelineOptions()options.view_as(SetupOptions).save_main_
..
当我运行 示例笔记本 Dataflow_Word_count.ipynb 在 Google Cloud Platform 的网站上提供,我可以使用 Apache Beam notebooks 启动一个 Dataflow 作业并且该作业成功完成.管道定义如下. class ReadWordsFromText(beam.PTransform):def __init__(self, file_patt
..
我正在用 Java 开发 Dataflow,输入来自 Pubsub.后来,我在这里看到了关于如何使用本地 Pubsub 模拟器的指南,所以我不会需要部署到 GCP 才能进行测试. 这是我的简单代码: 私有接口 Options extends PipelineOptions, PubsubOptions, StreamingOptions {@Description("要从中读取消息的发布/
..
大家好,当我在 Google Cloud Dataflow 中创建自定义模板时,我努力理解发生了什么,但未能理解.感谢 GCP 文档.以下是我正在实现的目标. 从 Google Cloud Bucket 读取数据 预处理 加载深度学习模型(每个 1 GB)并获得预测 将结果转储到 BigQuery 中. 我成功创建了模板并且能够执行该作业.但我有以下问题. 当我执行作业时,
..
可以发送java代码,但目前没有必要. 我有一个问题,当我以(DirectRunner - 使用 Google VM 实例)运行作业时,它工作正常,因为它将信息保存到本地文件并继续...... 尝试使用 (DataflowRunner) 时出现问题,以及我收到的错误: java.nio.file.NoSuchFileExtension: XXXX.csv..........XXXX.cs
..
我无法按照以下说明使用 wordcount 示例创建自定义 Google Cloud Dataflow 模板:https://cloud.google.com/dataflow/docs/guides/templates/creating-templates 我收到与无法访问 RuntimeValueProvider 相关的错误.我做错了什么? 我的主函数wordcount.py:
..
我有一个 Dataflow 管道正在运行,它获取活动租户的配置(存储在 GCS 中)并将其作为 sideInput 提供给 ActiveTenantFilter.配置很少更新,因此我决定在更新时使用 --update 标志重新部署管道. 但是,当使用更新标志时,不会再次获取文件,即保持状态.是否可以强制在重新部署管道时更新此 PCollectionView? 解决方案 你是对的,当你
..
我正在使用 Java 将 JSON 转换为 Avro,并使用 Google DataFlow 将它们存储到 GCS.Avro 架构是在运行时使用 SchemaBuilder 创建的. 我在架构中定义的字段之一是可选的 LONG 字段,它的定义如下: SchemaBuilder.FieldAssembler字段 = SchemaBuilder.record(mainName).fields(
..
我在本地开发了一个 apache 光束管道,我在其中对示例文件运行预测. 在我的计算机本地,我可以像这样加载模型: with open('gs://newbucket322/my_dumped_classifier.pkl', 'rb') as fid:gnb_loaded = cPickle.load(fid) 但是在谷歌数据流上运行时显然不起作用.我尝试将路径更改为 GS://但这
..
我有一个 Beam 管道,它从读取多个文本文件开始,其中文件中的每一行代表一行,该行稍后会在管道中插入 Bigtable.该场景需要确认从每个文件中提取的行数后来插入到 Bigtable 匹配中的行数.为此,我计划开发一个自定义窗口策略,以便根据文件名作为将传递给窗口函数的键,将来自单个文件的行分配给单个窗口. 是否有创建自定义窗口函数的代码示例? 解决方案 虽然我改变了确认插入行数
..
当我通过以下步骤使用数据流时:- 从 bigquery 读取- 将表格行转换为 json 字符串- 插入到elasticsearch (7.5.2)它看起来很适合处理 ~100k 条记录,但实际上(8m 条记录 ~65gb)数据流在插入 300k 条记录后会抛出异常. 来自工人的错误消息:java.lang.RuntimeException:意外的 org.apache.beam.runner
..
我正在尝试在 Apache Beam 中实施 CDC,部署在 Google Cloud Dataflow 中. 我已经卸载了主数据和新数据,预计每天都会出现.联接未按预期工作.缺了点什么. master_data = (p |'从 BigQuery 读取库' >>beam.io.Read(beam.io.BigQuerySource(query=master_data, use_stand
..
有人可以澄清 ReafFromPubSub 转换? 我正在使用 BigQuery 接收器,我的理解它就像 BQ Streaming API 的 insertId,表格数据:insertAll 每一行的唯一 ID.BigQuery 使用此属性尽最大努力检测重复的插入请求.如需了解详情,请参阅数据一致性. 但是我没有看到这种预期的行为. 我正在向 Pub/Sub 发布消息,每条
..
我有一个管道,可以获取文件的 URL 并下载这些文件,为除标题之外的每一行生成 BigQuery 表行. 为了避免重复下载,我想根据以前下载的表检查 URL,如果 URL 不在此“历史"表中,则仅继续存储该 URL. 为此,我需要将历史记录存储在允许唯一值的数据库中,或者为此使用 BigQuery 可能更容易,但对表的访问必须严格串行. 我可以强制执行单线程执行(在一台机器上)来
..
我一直在使用 Kafka 实例作为输入和 Bigquery 作为输出在 GCP 提供的 Apache Beam Notebook 中测试 Apache Beam 管道.我已经能够通过 Interactive runner 成功使用管道,但是当我将相同的管道部署到 Dataflow runner 时,它似乎从未真正从已定义的 Kafka 主题中读取.查看日志给了我错误: 无法读取数据平面中的输
..
背景:我们有一个 Dataflow 作业,它将 PubSub 消息转换为 Avro GenericRecords 并将它们作为“.avro"写入 GCS.PubSub 消息和 GenericRecords 之间的转换需要一个模式.此架构每周更改一次,仅添加字段.我们希望能够在不更新 Dataflow 作业的情况下更新字段. 我们做了什么:我们听取了这篇博文 并创建了一个每分钟刷新一次内容的
..
我在 Apache Beam(在 Dataflow 中运行)中有一个情况,我创建了一个简单的有状态 DoFn,基于 这篇文章.上游窗口是全局的,更改它会影响下游聚合. 目前,我没有做任何事情来缩小状态,它似乎只是无限增长.这是真的?无界状态增长是个问题吗? 我想简单地将 TTL 附加到状态,但没有看到此功能. 我正在考虑在数据上存储我自己的时间戳,并使用计时器定期清理表.这是可取
..
好的,我必须在这里遗漏一些东西.将管道作为模板进行暂存需要什么?当我尝试通过 这些说明,它运行模块但不进行任何操作.它似乎按预期运行而没有错误,但我没有看到任何文件实际上被添加到存储桶位置,在我的 --template_location 中侦听.我的python代码应该出现在那里吗?我猜对吗?我已经确定我已经安装了所有的 Beam 和谷歌云 SDK,但也许我遗漏了什么?您需要做什么来暂存此数据流模
..