apache-beam相关内容

如何修复不兼容的类型:org.apache.beam.sdk.options.ValueProvider<;java.lang.String>;无法转换为java.lang.String";

我按照this link创建了一个模板,该模板构建了一个从KafkaIO读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider无法转换为java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。BEAM版本为2.9.0,以下是我的部分代 ..
发布时间:2022-08-12 19:55:34 Java开发

优化内存密集型数据流流水线的GCP成本

我们希望提高在GCP数据流中运行特定的阿帕奇光束管道(Python SDK)的成本。 我们构建了一个内存密集型的ApacheBeam管道,它需要在每个执行器上运行大约8.5 GB的RAM。大型机器学习模型当前加载在转换DoFn.setup方法中,因此我们可以为数百万用户预先计算推荐。 现有GCP计算引擎计算机类型的内存/vCPU比率要么低于我们的要求(每个vCPU最多8 GB RAM),要 ..

将其他文件包括在数据流中

我的数据流使用.sql文件。此文件包含一个查询,它位于名为queries的目录中。 我需要将此文件与我的数据流一起上载。 我发现使用的是清单.in文件,但据我所知,这并不起任何作用,我在根目录中创建了名为MANIFEST.in的文件,它只包含一行: recursive-include queries * 其他一些来源告诉我,为此需要使用setup.py文件。所以现在它看起来 ..
发布时间:2022-08-10 17:50:56 Python

有没有办法在阿帕奇光束中指定无限允许的延迟?

我使用固定窗口按事件时间对数据进行批处理,以便高效地将其发送到外部API(以60秒为一批),累计模式设置为丢弃,因为延迟数据发送到外部API而不包括之前的数据无关紧要。 是否可以指定无限允许的延迟,这样就永远不会丢弃延迟数据? 推荐答案 绝对有可能,您可以将允许的延迟设置为非常高的Duration(例如Duration.standardDays(36500))。另一方面,这样做会 ..
发布时间:2022-07-15 21:21:35 其他开发

Dataflow使用哪种持久存储来保持由阿帕奇光束定时器实施的持久状态?

它不是显式的,但我认为数据流可以使用Persistent Disk Resources 无论如何,我找不到对此的确认。 我想知道我是否可以假设使用计时器的限制和预期性能与此处提供的相同:https://cloud.google.com/compute/docs/disks/performance 推荐答案 数据流使用永久磁盘存储计时器。但这也涉及大量缓存,因此性能应该比仅从 ..

如何使用BEAM的外部Kafka变换(本地)消费消息

我正在尝试运行一个应用程序,该应用程序使用Kafka生产者(Python客户端)和一个阿帕奇光束管道,它(目前)只是通过将这些消息打印到STDOUT来使用它们。 我了解,将Kafka外部转换与ApacheBEAM一起使用是一项跨语言的工作,因为它调用Java外部服务。我遵循了following link's选项1: 选项1:使用默认扩展服务 这是使用Python时推荐且最简单的设置选 ..
发布时间:2022-04-13 12:38:59 Java开发

数据流BigQuery-BigQuery管道对较小的数据执行,但不对大型生产数据集执行

这里的数据流有点新手,但已经成功地创建了一个工作良好的管道。 管道从BigQuery读取查询,应用Pardo(NLP函数),然后将数据写入新的BigQuery表。 我尝试处理的数据集大约为500 GB,包含46M条记录。 当我使用相同数据的子集(大约300k记录)尝试此操作时,它工作得很好,而且速度很快,请参见下面的内容: 当我尝试使用完整的数据集运行它时,它启动得非常快, ..

PYTHON阿帕奇光束多路输出和处理

我正在尝试使用以下流程在Google数据流上运行作业: 实质上是获取单个数据源,根据词典中的某些值进行筛选,并为每个筛选条件创建单独的输出。 我编写了以下代码: # List of values to filter by x_list = [1, 2, 3] with beam.Pipeline(options=PipelineOptions().from_dictionary ..
发布时间:2022-04-13 12:28:30 Python

如何用APACHE BEAM计算运行总数

使用用于Google数据流的ApacheBeam SDK,我想计算一组交易的每日余额。 示例数据集可能如下所示:收款人名称、交易日期和金额: John, 2017-12-01, 100 John, 2017-12-01, 200 Jane, 2017-12-01, 150 John, 2017-12-02, -100 John, 2017-12-02, 300 所需的结果集如下所示 ..
发布时间:2022-04-06 12:14:34 其他开发

阿帕奇光束到BigQuery

我正在Google Cloud数据流中构建一个流程,该流程将使用发布/订阅中的消息,并基于一个键的值将它们写入BQ或GCS。我能够拆分消息,但我不确定如何将数据写入BigQuery。我已尝试使用beam.io.gcp.bigquery.WriteToBigQuery,但没有成功。 我的完整代码在这里:https://pastebin.com/4W9Vu4Km 基本上我的问题是我不知道如 ..

如何在私有网络上运行Dataflow Python?

ApacheBeam2.1.0添加了对在专用子网络上没有公共IP的数据流运行器上提交作业的支持,这是我们满足防火墙规则所需的。我计划使用Squid代理访问apt-get、pip等来安装python依赖项;一个代理实例已经在运行,并且我们在setup.py脚本中设置了代理。 python $DIR/submit.py --runner DataflowRunner ..
发布时间:2022-04-06 12:02:02 Python

运行束流管道时,对象没有窗口属性

在运行数据流作业时,我得到‘PBegin’对象没有‘windowing’属性。 我正在调用Pardo函数中的ConnectClass类。 我正在尝试从Beam pythonSDK连接NOSQL数据库,并运行SQL从表中提取数据。然后使用另一个Pardo将输出写入单独的文件。 class Connector(beam.DoFn): def __init__(self,userna ..