apache-beam相关内容
我按照this link创建了一个模板,该模板构建了一个从KafkaIO读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider无法转换为java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。BEAM版本为2.9.0,以下是我的部分代
..
我们希望提高在GCP数据流中运行特定的阿帕奇光束管道(Python SDK)的成本。 我们构建了一个内存密集型的ApacheBeam管道,它需要在每个执行器上运行大约8.5 GB的RAM。大型机器学习模型当前加载在转换DoFn.setup方法中,因此我们可以为数百万用户预先计算推荐。 现有GCP计算引擎计算机类型的内存/vCPU比率要么低于我们的要求(每个vCPU最多8 GB RAM),要
..
我的数据流使用.sql文件。此文件包含一个查询,它位于名为queries的目录中。 我需要将此文件与我的数据流一起上载。 我发现使用的是清单.in文件,但据我所知,这并不起任何作用,我在根目录中创建了名为MANIFEST.in的文件,它只包含一行: recursive-include queries * 其他一些来源告诉我,为此需要使用setup.py文件。所以现在它看起来
..
我有一个PCollection,我希望将其作为侧输入传递,并在Pardo中访问其元素。 所以我为它创建了一个PCollectionView: final PCollectionView> view = myPCollection.apply(View.asList()); 但如何在传递侧向输入时访问Pardo中的元素? 举个例子会很有帮助。
..
我使用固定窗口按事件时间对数据进行批处理,以便高效地将其发送到外部API(以60秒为一批),累计模式设置为丢弃,因为延迟数据发送到外部API而不包括之前的数据无关紧要。 是否可以指定无限允许的延迟,这样就永远不会丢弃延迟数据? 推荐答案 绝对有可能,您可以将允许的延迟设置为非常高的Duration(例如Duration.standardDays(36500))。另一方面,这样做会
..
它不是显式的,但我认为数据流可以使用Persistent Disk Resources 无论如何,我找不到对此的确认。 我想知道我是否可以假设使用计时器的限制和预期性能与此处提供的相同:https://cloud.google.com/compute/docs/disks/performance 推荐答案 数据流使用永久磁盘存储计时器。但这也涉及大量缓存,因此性能应该比仅从
..
我有一个非常小的Python数据流包,包的结构如下所示 . ├── __pycache__ ├── pubsubtobigq.py ├── requirements.txt └── venv requirements.txt的内容为 protobuf==3.11.2 protobuf3-to-dict==0.1.5 我使用以下代码运行管道 python -m pubsub
..
我正在尝试运行一个应用程序,该应用程序使用Kafka生产者(Python客户端)和一个阿帕奇光束管道,它(目前)只是通过将这些消息打印到STDOUT来使用它们。 我了解,将Kafka外部转换与ApacheBEAM一起使用是一项跨语言的工作,因为它调用Java外部服务。我遵循了following link's选项1: 选项1:使用默认扩展服务 这是使用Python时推荐且最简单的设置选
..
这里的数据流有点新手,但已经成功地创建了一个工作良好的管道。 管道从BigQuery读取查询,应用Pardo(NLP函数),然后将数据写入新的BigQuery表。 我尝试处理的数据集大约为500 GB,包含46M条记录。 当我使用相同数据的子集(大约300k记录)尝试此操作时,它工作得很好,而且速度很快,请参见下面的内容: 当我尝试使用完整的数据集运行它时,它启动得非常快,
..
我正在尝试传递一个BigQuery表名作为一个ApacheBEAM管道模板的值提供者。根据their documentation和StackOverflow answer,可以将值提供程序传递给apache_beam.io.gcp.bigquery.ReadFromBigQuery。 这就是我的管道代码 class UserOptions(PipelineOptions): "
..
我正在构建一个Apache Beam管道,但在尝试导入管道选项时遇到了AttributeError。 我正在使用python3.6在干净的虚拟环境中的Ubuntu服务器上进行测试 步骤: virtualenv -p python3.6 beam-env . beam-env/bin/activate pip install apache_beam==2.12.0 python3.
..
我正在尝试使用fileio.MatchFiles将几个csv文件转换为pd.DataFrame文件,然后将它们连接成一个csv文件。为此,我创建了两个ParDo类,将文件转换为DataFrame,然后将它们合并为merged csv。整个代码片段如下所示: class convert_to_dataFrame(beam.DoFn): def process(self, element
..
首先 生成简单数据后在Google云平台BigQuery表中存储数据的代码。 导入了阿帕奇-光束库并使用了它。 Runner使用了Google Cloud Platform Dataflow。 此处是代码。 from apache_beam.options.pipeline_options import PipelineOptions import apache_beam as bea
..
我正在尝试使用以下流程在Google数据流上运行作业: 实质上是获取单个数据源,根据词典中的某些值进行筛选,并为每个筛选条件创建单独的输出。 我编写了以下代码: # List of values to filter by x_list = [1, 2, 3] with beam.Pipeline(options=PipelineOptions().from_dictionary
..
使用用于Google数据流的ApacheBeam SDK,我想计算一组交易的每日余额。 示例数据集可能如下所示:收款人名称、交易日期和金额: John, 2017-12-01, 100 John, 2017-12-01, 200 Jane, 2017-12-01, 150 John, 2017-12-02, -100 John, 2017-12-02, 300 所需的结果集如下所示
..
我正在Google Cloud数据流中构建一个流程,该流程将使用发布/订阅中的消息,并基于一个键的值将它们写入BQ或GCS。我能够拆分消息,但我不确定如何将数据写入BigQuery。我已尝试使用beam.io.gcp.bigquery.WriteToBigQuery,但没有成功。 我的完整代码在这里:https://pastebin.com/4W9Vu4Km 基本上我的问题是我不知道如
..
我有一个使用Google数据流的项目。几个月来,我已经成功地使用以下命令(以及类似的命令)来部署模板。 .venv/bin/python -m dataflow.registry_files.delimited_file --runner=DataflowRunner --region=us-central1 --project=myproject --staging_location=gs
..
ApacheBeam2.1.0添加了对在专用子网络上没有公共IP的数据流运行器上提交作业的支持,这是我们满足防火墙规则所需的。我计划使用Squid代理访问apt-get、pip等来安装python依赖项;一个代理实例已经在运行,并且我们在setup.py脚本中设置了代理。 python $DIR/submit.py --runner DataflowRunner
..
在运行数据流作业时,我得到‘PBegin’对象没有‘windowing’属性。 我正在调用Pardo函数中的ConnectClass类。 我正在尝试从Beam pythonSDK连接NOSQL数据库,并运行SQL从表中提取数据。然后使用另一个Pardo将输出写入单独的文件。 class Connector(beam.DoFn): def __init__(self,userna
..
主要目标是聚合两个Kafka主题,一个是压缩的慢速移动数据,另一个是每秒接收的快速移动数据。 我已经能够在KV(Long,String)等简单场景中使用如下内容消费消息: PCollection> input = p.apply(KafkaIO.read() .withKeyDeserializer(LongDeserial
..