apache-beam-io相关内容
正在尝试将具有存储在PubSub中的属性的消息拉入Beam管道。我想知道是否添加了对Python的支持,这就是我无法阅读它们的原因。我看到它存在于Java中。 pipeline_options = PipelineOptions() pipeline_options.view_as(StandardOptions).streaming = True pipeline = beam.Pipe
..
第1步:Assum角色 public static AWSCredentialsProvider getCredentials() { if (roleARN.length() > 0) { STSAssumeRoleSessionCredentialsProvider credentialsProvider = new STSAssumeRoleSe
..
在运行数据流作业时,我得到‘PBegin’对象没有‘windowing’属性。 我正在调用Pardo函数中的ConnectClass类。 我正在尝试从Beam pythonSDK连接NOSQL数据库,并运行SQL从表中提取数据。然后使用另一个Pardo将输出写入单独的文件。 class Connector(beam.DoFn): def __init__(self,userna
..
我想利用时间分区表的新 BigQuery 功能,但我不确定目前在 Dataflow SDK 1.6 版本中是否可以实现. 查看 BigQuery JSON API,创建一个一天分区表需要传入一个 "timePartitioning": { "type": "DAY" } 选项,但 com.google.cloud.dataflow.sdk.io.BigQueryIO 接口只允许指定 Ta
..
我使用 Apache Beam 2.6 读取单个 Kafka 主题并将输出写入 Google Cloud Storage (GCS).现在我想改变管道,以便它读取多个主题并将它们写成 gs://bucket/topic/... 当只阅读一个主题时,我在管道的最后一步使用了 TextIO: TextIO.write().到(新的日期命名文件(String.format("gs://bucke
..
不确定这个 GenerateSequence 如何为我工作,因为我必须每小时或每天定期从 Mongo 读取值,创建一个读取 MongoDB 的 ParDo,还使用触发器将窗口添加到 GlobalWindows(触发器我将更新为公关要求).但是下面的代码片段给出了返回类型错误,所以你能帮我更正下面的代码行吗?还可以找到错误的快照.还有这个 Generate Sequence 对我有什么帮助?
..
来自这个问题:如何对数据进行分组并构造一个新列——python pandas?,我知道如何使用pandas对多列进行分组并构造一个新的唯一ID,但是如果我想使用Apache beam 在 Python 中实现与该问题中描述的相同的事情,我怎样才能实现它,然后将新数据写入换行符分隔的 JSON 格式文件(每一行是一个 unique_id 具有属于该 unique_id 的对象数组)? 假设数据
..
我正在尝试使用 python 代码中的 kafka.ReadFromKafka() 方法从 kafka 主题读取数据.我的代码如下所示: from apache_beam.io.external import kafka导入 apache_beam 作为梁选项 = PipelineOptions()使用 beam.Pipeline(options=options) 作为 p:植物 = (磷|'阅
..
我一直在尝试编写一个从 kafka 主题读取的光束管道,其中该主题由 avro 记录组成.这些记录的架构可以快速更改,因此我想在提取相关公共字段之前使用 Confluent Schema Registry 获取架构并解码事件.要么我做错了什么,要么文档已经过时.我按照这里的例子:https://github.com/apache/beam/blob/dfa1e475194ac6f65c42da7b
..
问题说明:我正在尝试使用直接转轮读取和打印光束中 xml 文件的内容这是代码片段: 公共类 BookStore{公共静态无效主(字符串参数[]){BookOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BookOptions .class);管道管道 = Pipeline.create(op
..
如何在java中编写以下代码?如果我在 java 中有记录/字典列表,我如何编写梁代码以将它们写入 tfrecords 中,其中 tf.train.Examples 被序列化.有很多例子可以用python来做,下面是python中的一个例子,我如何在java中编写相同的逻辑? 将 tensorflow 导入为 tf导入 apache_beam 作为梁从apache_beam.runners.i
..
我正在尝试在 Dataflow runner 中运行 apache 光束管道;该作业从 bigquery 表读取数据并将数据写入数据库. 我在数据流中使用经典模板选项运行作业 - 意味着首先我必须暂存管道,然后使用适当的参数运行它. 我的管道选项如下 options = PipelineOptions()options.view_as(SetupOptions).save_main_
..
我将 Apache Beam 与 Java 结合使用.我正在尝试读取一个 csv 文件并使用本地模式在预部署的 Spark env 上使用 SparkRunner 将其写入镶木地板格式.DirectRunner 一切正常,但 SparkRunner 根本无法工作.我正在使用 maven shade 插件来构建一个胖子. 代码如下: Java: 公共类 ImportCSVToParqu
..
我有两种方法来初始化 HttpClient,以便从 Apache Beam 中的 ParDo 进行 API 调用. 方法 1: 初始化StartBundle 中的HttpClient 对象并关闭FinishBundle 中的HttpClient.代码如下: public class ProcessNewIncomingRequest extends DoFn>{@StartBundl
..
我正在编写一个简单的 apache 光束流管道,从发布订阅主题获取输入并将其存储到 bigquery 中.几个小时以来,我以为我什至无法阅读消息,因为我只是试图将输入记录到控制台: events = p |'阅读 PubSub' >>ReadFromPubSub(订阅=订阅)logging.info(事件) 当我把它写成文本时,它工作正常!然而,我对 logger 的调用从未发生过. 人
..
我有一个在 Apache Beam(使用 Spark Runner)中开发的流式管道,它从 kinesis 流中读取数据. 我正在寻找 Apache Beam 中的选项来管理 kinesis 检查点(即定期存储 kinesis 流的当前位置),以便它允许系统从故障中恢复并在流停止的地方继续处理. Apache Beam 是否提供了类似于 Spark Streaming 的支持 kine
..
下面的程序是连接到 Oracle 11g 并获取记录.它是如何为我在 pipeline.apply() 处的编码器提供 NullPointerException 的. 我已将 ojdbc14.jar 添加到项目依赖项中. public static void main(String[] args) {管道 p = Pipeline.create(PipelineOptionsFactory
..
我正在从 MongoDB 读取 PCollection mongodata 并使用这个 PCollection 作为我的 ParDo(DoFN).withSideInputs(PCollection) 的 sideInput 从后端,我的MongoDB 集合每天或每月或每年更新.我需要在我的管道中添加新的价值. 我们可以认为这是在正在运行的管道中刷新 mongo 集合值.例如,mong
..
当前光束管道正在使用 FileIO.matchAll().continuously() 将文件作为流读取.这将返回 PCollection .我想用相同的名称将这些文件写回另一个 gcs 存储桶,即每个 PCollection 是一个文件 metadata/readableFile ,经过一些处理后应该写回另一个存储桶.我应该使用任何接收器来实现将每个 PCollection 项目写回 GCS 或
..
嗨,我有几个查询要运行 &使用 Apache Beam 依次保存结果,我见过一些类似的问题,但找不到答案.我习惯于使用 Airflow 设计管道,而我对 Apache Beam 还是比较陌生.我正在使用 Dataflow 运行程序.这是我现在的代码:我希望 query2 仅在 query1 结果保存到相应表后运行.我如何链接它们? PCollectionresultsStep1 = getDa
..