apache-beam相关内容

阿帕奇梁 ->BigQuery - 用于重复数据删除的 insertId 不起作用

我正在将数据从 kafka 流式传输到 BigQuery,使用 apache beam 和 google dataflow runner.我想使用 insertId 进行重复数据删除,我发现在 google docs 中有描述.但是即使插入发生在彼此之间的几秒钟内,我仍然看到很多具有相同 insertId 的行.现在我想知道也许我没有正确使用 API 来利用 BQ 提供的流式插入的重复数据删除机制 ..
发布时间:2021-12-30 23:27:32 其他开发

在 Apache Beam 中强制流中的空窗格/窗口

我正在尝试实现一个管道并接收数据流,如果分钟间隔中有任何元素,则每分钟输出一个 True,如果没有,则输出 False.如果持续时间内没有元素,则窗格(具有永久时间触发器)或窗口(固定窗口)似乎不会触发. 我正在考虑的一种解决方法是将流放入全局窗口中,使用 ValueState 来保持队列来累积数据,并使用计时器作为触发器来检查队列.我想知道是否有任何更简洁的方法来实现这一点. 谢谢. ..
发布时间:2021-11-28 18:13:49 其他开发

如何将流数据与 Dataflow/Beam 中的大型历史数据集结合起来

我正在研究通过 Google Dataflow/Apache Beam 处理来自网络用户会话的日志,并且需要将用户的日志(流式传输)与上个月的用户会话历史记录结合起来. 我研究了以下方法: 使用 30 天固定窗口:最有可能是一个很大的窗口以适应内存,而且我不需要更新用户的历史记录,只需参考它 使用 CoGroupByKey 连接两个数据集,但两个数据集的窗口大小必须相同(https: ..
发布时间:2021-11-28 18:13:35 Java开发

数据流管道 - “处理卡在步骤 <STEP_NAME>至少<TIME>不输出或完成状态完成......"

我的团队开发的 Dataflow 管道突然开始卡住,停止处理我们的事件.他们的工作日志充满了警告消息,说某个特定步骤卡住了.奇怪的是,失败的步骤是不同的,一个是 BigQuery 输出,另一个是 Cloud Storage 输出. 以下是我们收到的日志消息: 对于 BigQuery 输出: 处理卡在步骤 /StreamingInserts/StreamingWriteTables/S ..
发布时间:2021-11-28 18:13:25 其他开发

使用apache-beam按顺序读取文件和文件夹

我有一个 year/month/day/hour/* 类型的文件夹结构,我希望光束按时间顺序将其作为无限源读取.具体来说,这意味着在记录的第一个小时内读取所有文件并添加其内容以进行处理.然后,添加下一小时的文件内容进行处理,直到当前等待新文件到达最新year/month/day/hour文件夹的时间. 是否可以使用 apache beam 来做到这一点? 解决方案 所以我要做的是根据 ..
发布时间:2021-11-28 18:13:06 其他开发

Apache Beam 和 Apache Nifi 的区别

Apache Beam 和 Apache Nifi 的用例是什么?似乎它们都是数据流引擎.如果两者都有相似的用例,两者哪个更好? 解决方案 Apache Beam 是 Apache Flink、Apache Spark(流)、Apache Apex 和 Apache Storm 等流处理系统的抽象层.它允许您针对标准 API 编写代码,然后使用任何底层平台执行代码.因此,理论上,如果您针对 ..
发布时间:2021-11-12 03:51:17 其他开发

org.apache.kafka.common.errors.RecordTooLargeException - 丢弃大小超过最大限制的消息并推送到另一个 kafka 主题

org.apache.kafka.common.errors.RecordTooLargeException:[Partition=Offset] 处有一些消息:{binlog-0=170421} 其大小大于获取大小 1048576 和因此无法退货. 嗨,我遇到了上述异常并且我的 apache 光束数据管道失败了.我希望 kafka 阅读器忽略大小大于默认大小的消息 &可能会将其推送到另一个 ..
发布时间:2021-11-12 03:17:07 其他开发

使用 KakfaIO 从给定的偏移量开始

我正在使用 KafkaIO.read() 并且我想从特定偏移量开始消费. 在某些时候,曾经有一个 KafkaIO.read().withStartFromCheckpointMark() 方法来做到这一点. 我从文档 有一种方法可以通过: 由runner提供的KafkaCheckpointMark; 我该怎么做? 谢谢 解决方案 没有直接支持,但有几个选项: ..
发布时间:2021-11-12 02:44:01 其他开发

如何应对(Apache Beam)高IO瓶颈?

让我们举一个简单的例子,我有一个非常简单的光束管道,它只是从文件中读取数据并将数据转储到输出文件中.现在让我们考虑输入文件是巨大的(一些 GB 大小,您通常无法在文本编辑器中打开的文件类型).由于直接运行器的实现非常简单(它将整个输入集读取到内存中),它将无法读取和输出这些大文件(除非您为 java vm 进程分配了不切实际的大量内存);所以我的问题是:“像 flink/spark/cloud 数 ..

apache_beam.examples.wordcount 的输入

我试图运行 Beam Python-SDK 示例,但在读取输入时遇到问题. https://cwiki.apache.org/confluence/display/BEAM/Usage+Guide#UsageGuide-RunaPython-SDKPipeline 当我使用 gs://dataflow-samples/shakespeare/kinglear.txt 作为输入时,错误是 ..
发布时间:2021-11-12 01:12:49 Python

apache_beam.examples.wordcount 的输入

我试图运行 Beam Python-SDK 示例,但在读取输入时遇到问题. https://cwiki.apache.org/confluence/display/BEAM/Usage+Guide#UsageGuide-RunaPython-SDKPipeline 当我使用 gs://dataflow-samples/shakespeare/kinglear.txt 作为输入时,错误是 ..
发布时间:2021-11-12 01:10:26 Python

在 Flink 上运行 Beam 管道期间与内存段相关的 EOFException

我正在尝试在我们的测试集群上的 Flink 上运行 Apache Beam 管道.它在 org.apache.flink.runtime.io.disk.SimpleCollectingOutputView:79 在通过序列化对对象进行编码期间.我还没有能够在本地重现该错误.您可以在此处找到整个作业日志.某些值已被假数据替换. 用于运行管道的命令: bin/flink 运行\-m 纱线簇\ ..
发布时间:2021-11-12 01:07:46 其他开发