google-cloud-dataflow相关内容
我按照this link创建了一个模板,该模板构建了一个从KafkaIO读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider无法转换为java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。BEAM版本为2.9.0,以下是我的部分代
..
我们希望提高在GCP数据流中运行特定的阿帕奇光束管道(Python SDK)的成本。 我们构建了一个内存密集型的ApacheBeam管道,它需要在每个执行器上运行大约8.5 GB的RAM。大型机器学习模型当前加载在转换DoFn.setup方法中,因此我们可以为数百万用户预先计算推荐。 现有GCP计算引擎计算机类型的内存/vCPU比率要么低于我们的要求(每个vCPU最多8 GB RAM),要
..
我的数据流使用.sql文件。此文件包含一个查询,它位于名为queries的目录中。 我需要将此文件与我的数据流一起上载。 我发现使用的是清单.in文件,但据我所知,这并不起任何作用,我在根目录中创建了名为MANIFEST.in的文件,它只包含一行: recursive-include queries * 其他一些来源告诉我,为此需要使用setup.py文件。所以现在它看起来
..
我有一个PCollection,我希望将其作为侧输入传递,并在Pardo中访问其元素。 所以我为它创建了一个PCollectionView: final PCollectionView> view = myPCollection.apply(View.asList()); 但如何在传递侧向输入时访问Pardo中的元素? 举个例子会很有帮助。
..
它不是显式的,但我认为数据流可以使用Persistent Disk Resources 无论如何,我找不到对此的确认。 我想知道我是否可以假设使用计时器的限制和预期性能与此处提供的相同:https://cloud.google.com/compute/docs/disks/performance 推荐答案 数据流使用永久磁盘存储计时器。但这也涉及大量缓存,因此性能应该比仅从
..
我有一个非常小的Python数据流包,包的结构如下所示 . ├── __pycache__ ├── pubsubtobigq.py ├── requirements.txt └── venv requirements.txt的内容为 protobuf==3.11.2 protobuf3-to-dict==0.1.5 我使用以下代码运行管道 python -m pubsub
..
这里的数据流有点新手,但已经成功地创建了一个工作良好的管道。 管道从BigQuery读取查询,应用Pardo(NLP函数),然后将数据写入新的BigQuery表。 我尝试处理的数据集大约为500 GB,包含46M条记录。 当我使用相同数据的子集(大约300k记录)尝试此操作时,它工作得很好,而且速度很快,请参见下面的内容: 当我尝试使用完整的数据集运行它时,它启动得非常快,
..
我正在尝试使用fileio.MatchFiles将几个csv文件转换为pd.DataFrame文件,然后将它们连接成一个csv文件。为此,我创建了两个ParDo类,将文件转换为DataFrame,然后将它们合并为merged csv。整个代码片段如下所示: class convert_to_dataFrame(beam.DoFn): def process(self, element
..
使用用于Google数据流的ApacheBeam SDK,我想计算一组交易的每日余额。 示例数据集可能如下所示:收款人名称、交易日期和金额: John, 2017-12-01, 100 John, 2017-12-01, 200 Jane, 2017-12-01, 150 John, 2017-12-02, -100 John, 2017-12-02, 300 所需的结果集如下所示
..
启动DataFlow Flex模板时遇到以下问题。 Error occurred in the launcher container: Template launch failed. See console logs 2020年12月13日凌晨2:16:51之前一切正常。 但2020年12月13日凌晨2:16:51之后,所有作业都失败。 我不知道出了什么问题。我没有更新任何内
..
我们正在实现一个相当复杂的管道,它由几个链接在一起的GroupBy和Combine组成。除此之外,管道还应用了KeyedPCollectionTuple。 此管道可以成功执行,但图形不会显示在Google开发人员控制台中。仅显示日志。 也缺少这些步骤。 有什么方法可以让它们显示出来吗? 推荐答案 这是因为如果任何转换的分配名称为空,则无法显示图形。 Github问题#
..
我正在Google Cloud数据流中构建一个流程,该流程将使用发布/订阅中的消息,并基于一个键的值将它们写入BQ或GCS。我能够拆分消息,但我不确定如何将数据写入BigQuery。我已尝试使用beam.io.gcp.bigquery.WriteToBigQuery,但没有成功。 我的完整代码在这里:https://pastebin.com/4W9Vu4Km 基本上我的问题是我不知道如
..
我有包含各种字符的字符串,需要写入Google BigQuery,它需要严格的UTF8字符串。在尝试使用各种表情符号输入编写字符串时,我收到错误: java.lang.IllegalArgumentException: Unpaired surrogate at index 3373 at org.apache.beam.sdk.repackaged.com.google.commo
..
我当前正在使用Dataflow在Python中执行循环批处理。 基本上我从BigQuery读取数据并在其上执行操作。我的管道如下所示 pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) lines = (p | 'read_big_query' >>
..
我有一个使用Google数据流的项目。几个月来,我已经成功地使用以下命令(以及类似的命令)来部署模板。 .venv/bin/python -m dataflow.registry_files.delimited_file --runner=DataflowRunner --region=us-central1 --project=myproject --staging_location=gs
..
ApacheBeam2.1.0添加了对在专用子网络上没有公共IP的数据流运行器上提交作业的支持,这是我们满足防火墙规则所需的。我计划使用Squid代理访问apt-get、pip等来安装python依赖项;一个代理实例已经在运行,并且我们在setup.py脚本中设置了代理。 python $DIR/submit.py --runner DataflowRunner
..
我需要实现微服务,它在逻辑和架构上相当简单,但需要处理每秒305k左右的请求。 它要做的就是接收JSON数据,根据简单的规则进行验证,然后作为JSON文件记录到Google Cloud Storage中。有很多Google Cloud服务和API可用,但我很难选择合适的堆栈和管道,因为我没有太多使用它们和HighLoad的经验。 下面是我看到的一个例子 https://cloud.goog
..
我想要读取pubSub主题,并使用用Python编写的数据流代码将数据写入BigTable。我可以在Java中找到样例代码,但在Python中找不到。 如何将一行中的列从pubSub分配给不同的列族并将数据写入BigTable? 推荐答案 要在数据流管道中写入大表,您需要创建直接行并将它们传递给WriteToBigTabledoFn。下面是一个简短的示例,它只是传入行键,并为每个键添加
..
我要在GCP上的一个数据流作业内运行两个并行管道。我已经创建了一个管道,并且工作正常,但我希望在不创建另一个作业的情况下创建另一个管道。 我搜索了这么多答案,但没有找到任何代码示例:( 如果我这样运行它,它不工作: pipe1.run(); pipe2.run(); 显示“已有活动作业名称...如果要提交第二个作业,请尝试使用--jobName重新设置其他名称” 推荐答
..
在运行数据流作业时,我得到‘PBegin’对象没有‘windowing’属性。 我正在调用Pardo函数中的ConnectClass类。 我正在尝试从Beam pythonSDK连接NOSQL数据库,并运行SQL从表中提取数据。然后使用另一个Pardo将输出写入单独的文件。 class Connector(beam.DoFn): def __init__(self,userna
..