apache-beam-io相关内容

在使用Beam IO ReadFromPubSub模块时,是否可以拉取具有Python属性的消息?目前还不清楚它是否得到支持

正在尝试将具有存储在PubSub中的属性的消息拉入Beam管道。我想知道是否添加了对Python的支持,这就是我无法阅读它们的原因。我看到它存在于Java中。 pipeline_options = PipelineOptions() pipeline_options.view_as(StandardOptions).streaming = True pipeline = beam.Pipe ..
发布时间:2022-09-06 17:33:35 Python

运行束流管道时,对象没有窗口属性

在运行数据流作业时,我得到‘PBegin’对象没有‘windowing’属性。 我正在调用Pardo函数中的ConnectClass类。 我正在尝试从Beam pythonSDK连接NOSQL数据库,并运行SQL从表中提取数据。然后使用另一个Pardo将输出写入单独的文件。 class Connector(beam.DoFn): def __init__(self,userna ..

Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的 sideinput 第 2 部分

不确定这个 GenerateSequence 如何为我工作,因为我必须每小时或每天定期从 Mongo 读取值,创建一个读取 MongoDB 的 ParDo,还使用触发器将窗口添加到 GlobalWindows(触发器我将更新为公关要求).但是下面的代码片段给出了返回类型错误,所以你能帮我更正下面的代码行吗?还可以找到错误的快照.还有这个 Generate Sequence 对我有什么帮助? ..
发布时间:2021-11-11 22:45:19 其他开发

使用 Apache 梁`GroupByKey` 并构造一个新列 - Python

来自这个问题:如何对数据进行分组并构造一个新列——python pandas?,我知道如何使用pandas对多列进行分组并构造一个新的唯一ID,但是如果我想使用Apache beam 在 Python 中实现与该问题中描述的相同的事情,我怎样才能实现它,然后将新数据写入换行符分隔的 JSON 格式文件(每一行是一个 unique_id 具有属于该 unique_id 的对象数组)? 假设数据 ..
发布时间:2021-11-11 22:45:16 Python

光束使用模式注册表动态解码 avro 记录

我一直在尝试编写一个从 kafka 主题读取的光束管道,其中该主题由 avro 记录组成.这些记录的架构可以快速更改,因此我想在提取相关公共字段之前使用 Confluent Schema Registry 获取架构并解码事件.要么我做错了什么,要么文档已经过时.我按照这里的例子:https://github.com/apache/beam/blob/dfa1e475194ac6f65c42da7b ..
发布时间:2021-11-11 22:44:28 Java开发

用java在apche_beam中编写tfrecords

如何在java中编写以下代码?如果我在 java 中有记录/字典列表,我如何编写梁代码以将它们写入 tfrecords 中,其中 tf.train.Examples 被序列化.有很多例子可以用python来做,下面是python中的一个例子,我如何在java中编写相同的逻辑? 将 tensorflow 导入为 tf导入 apache_beam 作为梁从apache_beam.runners.i ..
发布时间:2021-11-11 22:43:58 Java开发

AttributeError: 'RuntimeValueProvider' 对象没有属性 'projectId'

我正在尝试在 Dataflow runner 中运行 apache 光束管道;该作业从 bigquery 表读取数据并将数据写入数据库. 我在数据流中使用经典模板选项运行作业 - 意味着首先我必须暂存管道,然后使用适当的参数运行它. 我的管道选项如下 options = PipelineOptions()options.view_as(SetupOptions).save_main_ ..
发布时间:2021-11-11 22:43:43 其他开发

如何在 apache 光束管道中记录传入消息

我正在编写一个简单的 apache 光束流管道,从发布订阅主题获取输入并将其存储到 bigquery 中.几个小时以来,我以为我什至无法阅读消息,因为我只是试图将输入记录到控制台: events = p |'阅读 PubSub' >>ReadFromPubSub(订阅=订阅)logging.info(事件) 当我把它写成文本时,它工作正常!然而,我对 logger 的调用从未发生过. 人 ..

Apache Beam 如何管理运动检查点?

我有一个在 Apache Beam(使用 Spark Runner)中开发的流式管道,它从 kinesis 流中读取数据. 我正在寻找 Apache Beam 中的选项来管理 kinesis 检查点(即定期存储 kinesis 流的当前位置),以便它允许系统从故障中恢复并在流停止的地方继续处理. Apache Beam 是否提供了类似于 Spark Streaming 的支持 kine ..
发布时间:2021-11-11 22:42:10 其他开发

使用 Beam 从 oracle 获取 JDBC

下面的程序是连接到 Oracle 11g 并获取记录.它是如何为我在 pipeline.apply() 处的编码器提供 NullPointerException 的. 我已将 ojdbc14.jar 添加到项目依赖项中. public static void main(String[] args) {管道 p = Pipeline.create(PipelineOptionsFactory ..
发布时间:2021-11-11 22:41:52 其他开发

Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的侧输入

我正在从 MongoDB 读取 PCollection mongodata 并使用这个 PCollection 作为我的 ParDo(DoFN).withSideInputs(PCollection) 的 sideInput 从后端,我的MongoDB 集合每天或每月或每年更新.我需要在我的管道中添加新的价值. 我们可以认为这是在正在运行的管道中刷新 mongo 集合值.例如,mong ..
发布时间:2021-11-11 22:41:10 其他开发

使用每个元素的 apache 光束流式写入 gcs

当前光束管道正在使用 FileIO.matchAll().continuously() 将文件作为流读取.这将返回 PCollection .我想用相同的名称将这些文件写回另一个 gcs 存储桶,即每个 PCollection 是一个文件 metadata/readableFile ,经过一些处理后应该写回另一个存储桶.我应该使用任何接收器来实现将每个 PCollection 项目写回 GCS 或 ..

Apache Beam 中的顺序执行 - Java SDK 2.18.0

嗨,我有几个查询要运行 &使用 Apache Beam 依次保存结果,我见过一些类似的问题,但找不到答案.我习惯于使用 Airflow 设计管道,而我对 Apache Beam 还是比较陌生.我正在使用 Dataflow 运行程序.这是我现在的代码:我希望 query2 仅在 query1 结果保存到相应表后运行.我如何链接它们? PCollectionresultsStep1 = getDa ..
发布时间:2021-11-11 22:39:42 其他开发