google-cloud-dataflow相关内容

如何修复不兼容的类型:org.apache.beam.sdk.options.ValueProvider<;java.lang.String>;无法转换为java.lang.String";

我按照this link创建了一个模板,该模板构建了一个从KafkaIO读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider无法转换为java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。BEAM版本为2.9.0,以下是我的部分代 ..
发布时间:2022-08-12 19:55:34 Java开发

优化内存密集型数据流流水线的GCP成本

我们希望提高在GCP数据流中运行特定的阿帕奇光束管道(Python SDK)的成本。 我们构建了一个内存密集型的ApacheBeam管道,它需要在每个执行器上运行大约8.5 GB的RAM。大型机器学习模型当前加载在转换DoFn.setup方法中,因此我们可以为数百万用户预先计算推荐。 现有GCP计算引擎计算机类型的内存/vCPU比率要么低于我们的要求(每个vCPU最多8 GB RAM),要 ..

将其他文件包括在数据流中

我的数据流使用.sql文件。此文件包含一个查询,它位于名为queries的目录中。 我需要将此文件与我的数据流一起上载。 我发现使用的是清单.in文件,但据我所知,这并不起任何作用,我在根目录中创建了名为MANIFEST.in的文件,它只包含一行: recursive-include queries * 其他一些来源告诉我,为此需要使用setup.py文件。所以现在它看起来 ..
发布时间:2022-08-10 17:50:56 Python

Dataflow使用哪种持久存储来保持由阿帕奇光束定时器实施的持久状态?

它不是显式的,但我认为数据流可以使用Persistent Disk Resources 无论如何,我找不到对此的确认。 我想知道我是否可以假设使用计时器的限制和预期性能与此处提供的相同:https://cloud.google.com/compute/docs/disks/performance 推荐答案 数据流使用永久磁盘存储计时器。但这也涉及大量缓存,因此性能应该比仅从 ..

数据流BigQuery-BigQuery管道对较小的数据执行,但不对大型生产数据集执行

这里的数据流有点新手,但已经成功地创建了一个工作良好的管道。 管道从BigQuery读取查询,应用Pardo(NLP函数),然后将数据写入新的BigQuery表。 我尝试处理的数据集大约为500 GB,包含46M条记录。 当我使用相同数据的子集(大约300k记录)尝试此操作时,它工作得很好,而且速度很快,请参见下面的内容: 当我尝试使用完整的数据集运行它时,它启动得非常快, ..

如何用APACHE BEAM计算运行总数

使用用于Google数据流的ApacheBeam SDK,我想计算一组交易的每日余额。 示例数据集可能如下所示:收款人名称、交易日期和金额: John, 2017-12-01, 100 John, 2017-12-01, 200 Jane, 2017-12-01, 150 John, 2017-12-02, -100 John, 2017-12-02, 300 所需的结果集如下所示 ..
发布时间:2022-04-06 12:14:34 其他开发

复杂的云数据流管道不会在开发人员控制台中显示执行图表

我们正在实现一个相当复杂的管道,它由几个链接在一起的GroupBy和Combine组成。除此之外,管道还应用了KeyedPCollectionTuple。 此管道可以成功执行,但图形不会显示在Google开发人员控制台中。仅显示日志。 也缺少这些步骤。 有什么方法可以让它们显示出来吗? 推荐答案 这是因为如果任何转换的分配名称为空,则无法显示图形。 Github问题# ..
发布时间:2022-04-06 12:10:37 其他开发

阿帕奇光束到BigQuery

我正在Google Cloud数据流中构建一个流程,该流程将使用发布/订阅中的消息,并基于一个键的值将它们写入BQ或GCS。我能够拆分消息,但我不确定如何将数据写入BigQuery。我已尝试使用beam.io.gcp.bigquery.WriteToBigQuery,但没有成功。 我的完整代码在这里:https://pastebin.com/4W9Vu4Km 基本上我的问题是我不知道如 ..

如何在私有网络上运行Dataflow Python?

ApacheBeam2.1.0添加了对在专用子网络上没有公共IP的数据流运行器上提交作业的支持,这是我们满足防火墙规则所需的。我计划使用Squid代理访问apt-get、pip等来安装python依赖项;一个代理实例已经在运行,并且我们在setup.py脚本中设置了代理。 python $DIR/submit.py --runner DataflowRunner ..
发布时间:2022-04-06 12:02:02 Python

每秒处理35万次请求并将数据保存到Google云存储

我需要实现微服务,它在逻辑和架构上相当简单,但需要处理每秒305k左右的请求。 它要做的就是接收JSON数据,根据简单的规则进行验证,然后作为JSON文件记录到Google Cloud Storage中。有很多Google Cloud服务和API可用,但我很难选择合适的堆栈和管道,因为我没有太多使用它们和HighLoad的经验。 下面是我看到的一个例子 https://cloud.goog ..

数据流中的流水线到Bigtable Python

我想要读取pubSub主题,并使用用Python编写的数据流代码将数据写入BigTable。我可以在Java中找到样例代码,但在Python中找不到。 如何将一行中的列从pubSub分配给不同的列族并将数据写入BigTable? 推荐答案 要在数据流管道中写入大表,您需要创建直接行并将它们传递给WriteToBigTabledoFn。下面是一个简短的示例,它只是传入行键,并为每个键添加 ..

一个数据流作业内的并行管道

我要在GCP上的一个数据流作业内运行两个并行管道。我已经创建了一个管道,并且工作正常,但我希望在不创建另一个作业的情况下创建另一个管道。 我搜索了这么多答案,但没有找到任何代码示例:( 如果我这样运行它,它不工作: pipe1.run(); pipe2.run(); 显示“已有活动作业名称...如果要提交第二个作业,请尝试使用--jobName重新设置其他名称” 推荐答 ..

运行束流管道时,对象没有窗口属性

在运行数据流作业时,我得到‘PBegin’对象没有‘windowing’属性。 我正在调用Pardo函数中的ConnectClass类。 我正在尝试从Beam pythonSDK连接NOSQL数据库,并运行SQL从表中提取数据。然后使用另一个Pardo将输出写入单独的文件。 class Connector(beam.DoFn): def __init__(self,userna ..