apache-beam-io相关内容

Apache Beam 根据上一行的值更新当前行值

Apache Beam 根据上一行的值更新值 我已将 CSV 文件中的值分组.在分组的行中,我们发现一些缺失值需要根据前一行的值进行更新.如果该行的第一列为空,则需要将其更新为 0. 我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现? 记录 客户 ID日期金额 BS:89481 1/1/2012 100 BS:89482 1/1/2012 BS:89483 ..
发布时间:2021-11-11 22:38:47 Java开发

添加 ReadAllFromText 转换时管道失败

我正在尝试在 Apache Beam 中运行一个非常简单的程序来测试它的工作原理. 导入 apache_beam 作为梁类拆分(beam.DoFn):定义过程(自我,元素):返回元素使用 beam.Pipeline() 作为 p:行 = (p | beam.io.ReadAllFromText("输入.csv") |梁.ParDo(Split())) 运行时出现以下错误 .... 一些更多的 ..
发布时间:2021-11-11 22:38:04 其他开发

带有数据流的 Apache Beam - 从 BigQuery 读取时出现空指针

我正在使用从 BigQuery 表和文件中读取的 apache beam 编写的 google 数据流上运行作业.转换数据并将其写入其他 BigQuery 表.工作“通常"会成功,但有时我会在从大查询表中读取时随机收到空指针异常并且我的工作失败: (288abb7678892196): java.lang.NullPointerException在 org.apache.beam.sdk.io. ..
发布时间:2021-11-11 22:38:02 其他开发

如何从 Apache Beam 的 HTTP 响应中读取大文件?

Apache Beam 的 TextIO 可用于读取某些文件系统中的 JSON 文件,但如何从 Java SDK 中的 HTTP 响应产生的大型 JSON (InputStream) 中创建 PCollection? 解决方案 我认为目前 Beam 中没有通用的内置解决方案可以做到这一点,查看支持的 IO 列表. 我可以想到多种方法,哪种方法适合您可能取决于您的要求: 我可能会 ..
发布时间:2021-11-11 22:37:55 其他开发

如何使用多个工作人员加速批量导入谷歌云数据存储?

我有一个基于 apache-beam 的数据流作业要使用 vcf source 来自单个文本文件(存储在谷歌云存储中),将文本行转换为数据存储Entities 并将它们写入数据存储接收器.工作流程工作正常,但我注意到的缺点是: 写入数据存储的速度最多约为每秒 25-30 个实体. 我尝试使用 --autoscalingAlgorithm=THROUGHPUT_BASED --numWork ..

使用带有自定义类型的 CoGroupByKey 会导致 Coder 错误

我想加入两个 PCollection(分别来自不同的输入)并按照此处描述的步骤实现,“加入 CoGroupByKey"部分:https://cloud.google.com/dataflow/model/group-by-键 就我而言,我想加入 GeoIP 的“块"信息和“位置"信息.所以我将 Block 和 Location 定义为一个自定义类,然后像下面这样写: final Tuple ..
发布时间:2021-11-11 22:36:59 其他开发

Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnectionFactory(不支持方法)

我正在尝试使用 Apache beam-dataflow 连接到安装在云实例中的配置单元实例.当我运行它时,我收到以下异常.当我使用 Apache Beam 访问这个数据库时,就会发生这种情况.我看到了许多与 apache beam 或 google dataflow 无关的相关问题. (c9ec8fdbe9d1719a): java.lang.RuntimeException: org.apa ..
发布时间:2021-11-11 22:36:52 其他开发

DoFn 中的 HTTP 客户端

我想通过 DoFn 为在 Dataflow 上运行的 Apache Beam Pipeline 发出 POST 请求. 为此,我创建了一个客户端,它实例化了在 PoolingHttpClientConnectionManager 上配置的 HttpClosableClient. 但是,我为我处理的每个元素实例化了一个客户端. 如何设置一个供我所有元素使用的持久客户端? 还有 ..

将 MutationGroup 流式传输到 Spanner

我正在尝试使用 SpannerIO 将 MutationGroups 流式传输到 spanner 中.目标是每 10 秒编写一次新的 MationGroup,因为我们将使用 spanner 查询近期 KPI. 当我不使用任何窗口时,出现以下错误: 线程“main"中的异常 java.lang.IllegalStateException:GroupByKey 不能在没有触发器的情况下应用于 ..

apache 光束数据流中的外部 api 调用

我有一个用例,我读入存储在谷歌云存储中的换行 json 元素并开始处理每个 json.在处理每个 json 时,我必须调用外部 API 来执行重复数据删除,无论该 json 元素之前是否被发现.我在每个 json 上做一个 ParDo 和一个 DoFn . 我还没有看到任何在线教程说明如何从 apache beam DoFn Dataflow 调用外部 API 端点. 我正在使用 Be ..
发布时间:2021-11-11 22:33:20 Java开发

使用 Python 中的 BigQuery 接收器流式传输管道

我正在构建一个 apache 光束流管道,其源是 Pubsub,接收器是 BigQuery.我收到了错误消息: “工作流程失败.原因:未知的消息代码." 尽管这条消息很神秘,但我现在相信 BigQuery 不支持作为流管道的接收器,它在这里说:从 Pub/Sub 流式传输到 BigQuery 我确定这是导致问题的原因吗?或者,如果不是,无论如何它仍然不受支持? 谁能暗示这个 ..
发布时间:2021-11-11 22:33:17 Python

使用 TextIO 和 ValueProvider 创建数据流模板时出错

我正在尝试创建一个 google 数据流模板,但我似乎无法找到一种方法来做到这一点,而不会产生以下异常: 警告:源的大小估计失败:RuntimeValueProvider{propertyName=inputFile, default=null}java.lang.IllegalStateException:值仅在运行时可用,但从非运行时上下文访问:RuntimeValueProvider{pr ..
发布时间:2021-11-11 22:32:44 Java开发

Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库

我正在使用 Dataflow SDK 2.X Java API (Apache Beam SDK) 将数据写入 mysql.我已经基于 Apache Beam SDK 文档 使用数据流将数据写入 mysql.它一次插入单行,因为我需要实现批量插入.我在官方文档中找不到任何启用批量插入模式的选项. 想知道是否可以在数据流管道中设置批量插入模式?如果是,请让我知道我需要在下面的代码中更改什么. ..
发布时间:2021-11-11 22:32:39 数据库

输出类型中beam.ParDo 和beam.Map 的区别?

我正在使用 Apache-Beam 运行一些数据转换,包括从 txt、csv 和不同数据源中提取数据.我注意到的一件事是使用 beam.Map 和 beam.ParDo 时结果的差异 在下一个示例中: 我正在读取 csv 数据,在第一种情况下,使用 beam.ParDo 将它传递给 DoFn,它提取第一个元素,即日期,然后打印它.第二种情况,我直接用beam.Map做同样的事情,然后打印 ..
发布时间:2021-11-11 22:31:47 其他开发

如何从 Apache Beam KafkaIO 中的 kafka 主题推断 avro 模式

我正在使用 Apache Beam 的 kafkaIO 从 Confluent 模式注册表中具有 avro 模式的主题中读取数据.我能够反序列化消息并写入文件.但最终我想写信给 BigQuery.我的管道无法推断架构.如何提取/推断架构并将其附加到管道中的数据,以便我的下游进程(写入 BigQuery)可以推断架构? 这是我使用模式注册表 url 设置解串器以及我从 Kafka 读取的代码: ..

在 Apache Beam 中如何处理流水线 IO 级别的异常/错误

我在 apache 光束中使用运行 spark runner 作为管道运行程序并发现错误.通过得到错误,我的问题出现了.我知道错误是由于 sql 查询中的 Column_name 不正确,但我的问题是如何在 IO 级别处理错误/异常 org.apache.beam.sdk.util.UserCodeException:java.sql.SQLSyntaxErrorException:“字段列表" ..
发布时间:2021-11-11 22:29:53 其他开发

为什么 Dataflow-BigTable 连接器不支持增量?

我们在流模式中有一个用例,我们希望在需要增量操作的管道中跟踪 BigTable 上的计数器(某些 #items 已完成处理).从查看 https://cloud.google.com/bigtable/docs/dataflow-hbase,我看到此客户端不支持 HBase API 的追加/增量操作.陈述的原因是批处理模式下的重试逻辑,但如果 Dataflow 保证恰好一次,为什么支持它是一个坏主 ..