apache-beam相关内容
..
..
我正在将数据从 kafka 流式传输到 BigQuery,使用 apache beam 和 google dataflow runner.我想使用 insertId 进行重复数据删除,我发现在 google docs 中有描述.但是即使插入发生在彼此之间的几秒钟内,我仍然看到很多具有相同 insertId 的行.现在我想知道也许我没有正确使用 API 来利用 BQ 提供的流式插入的重复数据删除机制
..
我正在尝试实现一个管道并接收数据流,如果分钟间隔中有任何元素,则每分钟输出一个 True,如果没有,则输出 False.如果持续时间内没有元素,则窗格(具有永久时间触发器)或窗口(固定窗口)似乎不会触发. 我正在考虑的一种解决方法是将流放入全局窗口中,使用 ValueState 来保持队列来累积数据,并使用计时器作为触发器来检查队列.我想知道是否有任何更简洁的方法来实现这一点. 谢谢.
..
我正在研究通过 Google Dataflow/Apache Beam 处理来自网络用户会话的日志,并且需要将用户的日志(流式传输)与上个月的用户会话历史记录结合起来. 我研究了以下方法: 使用 30 天固定窗口:最有可能是一个很大的窗口以适应内存,而且我不需要更新用户的历史记录,只需参考它 使用 CoGroupByKey 连接两个数据集,但两个数据集的窗口大小必须相同(https:
..
我的团队开发的 Dataflow 管道突然开始卡住,停止处理我们的事件.他们的工作日志充满了警告消息,说某个特定步骤卡住了.奇怪的是,失败的步骤是不同的,一个是 BigQuery 输出,另一个是 Cloud Storage 输出. 以下是我们收到的日志消息: 对于 BigQuery 输出: 处理卡在步骤 /StreamingInserts/StreamingWriteTables/S
..
我的流式数据流作业(2017-09-08_03_55_43-9675407418829265662)使用 Apache Beam SDK for Java 2.1.0 不会超过 1 个 Worker,即使 pubsub 不断增长队列(现在有 10 万条未送达的消息)——你知道为什么吗? 它当前使用 autoscalingAlgorithm=THROUGHPUT_BASED 和 maxNumW
..
我有一个 year/month/day/hour/* 类型的文件夹结构,我希望光束按时间顺序将其作为无限源读取.具体来说,这意味着在记录的第一个小时内读取所有文件并添加其内容以进行处理.然后,添加下一小时的文件内容进行处理,直到当前等待新文件到达最新year/month/day/hour文件夹的时间. 是否可以使用 apache beam 来做到这一点? 解决方案 所以我要做的是根据
..
现在我只能使用 ParDo 获取类中的 RunTime 值,还有其他方法可以像在我的函数中一样使用运行时参数吗? 这是我现在得到的代码: class UserOptions(PipelineOptions):@类方法def _add_argparse_args(cls, 解析器):parser.add_value_provider_argument('--firestore_documen
..
Apache Beam 和 Apache Nifi 的用例是什么?似乎它们都是数据流引擎.如果两者都有相似的用例,两者哪个更好? 解决方案 Apache Beam 是 Apache Flink、Apache Spark(流)、Apache Apex 和 Apache Storm 等流处理系统的抽象层.它允许您针对标准 API 编写代码,然后使用任何底层平台执行代码.因此,理论上,如果您针对
..
我在 Apchea Beam 中编写了一个非常简单的管道,如下所示,从 Confluent Cloud 上的 kafka 集群中读取数据,如下所示: Pipeline pipeline = Pipeline.create(options);映射propertyBuilder = new HashMap();propertyBuilder.put("ssl.endpoint.identifica
..
org.apache.kafka.common.errors.RecordTooLargeException:[Partition=Offset] 处有一些消息:{binlog-0=170421} 其大小大于获取大小 1048576 和因此无法退货. 嗨,我遇到了上述异常并且我的 apache 光束数据管道失败了.我希望 kafka 阅读器忽略大小大于默认大小的消息 &可能会将其推送到另一个
..
我在 Apchea Beam 中编写了一个非常简单的管道,如下所示,从 Confluent Cloud 上的 kafka 集群中读取数据,如下所示: Pipeline pipeline = Pipeline.create(options);映射propertyBuilder = new HashMap();propertyBuilder.put("ssl.endpoint.identifica
..
我正在尝试使用以下方法从 Dataflow (Apache Beam) 写入 Confluent Cloud/Kafka: kafkaKnowledgeGraphKVRecords.apply("写入Kafka", KafkaIO.write().withBootstrapServers(".confluent.cloud:9092").withTopic("testtopic").withKe
..
我正在使用 KafkaIO.read() 并且我想从特定偏移量开始消费. 在某些时候,曾经有一个 KafkaIO.read().withStartFromCheckpointMark() 方法来做到这一点. 我从文档 有一种方法可以通过: 由runner提供的KafkaCheckpointMark; 我该怎么做? 谢谢 解决方案 没有直接支持,但有几个选项:
..
主要目标是聚合两个 Kafka 主题,一个是压缩的慢速移动数据,另一个是每秒接收到的快速移动数据. 我已经能够在诸如 KV (Long,String) 之类的简单场景中使用类似以下内容的消息: PCollection>输入 = p.apply(KafkaIO.读取().withKeyDeserializer(LongDeserializer.class).withValueDeserial
..
让我们举一个简单的例子,我有一个非常简单的光束管道,它只是从文件中读取数据并将数据转储到输出文件中.现在让我们考虑输入文件是巨大的(一些 GB 大小,您通常无法在文本编辑器中打开的文件类型).由于直接运行器的实现非常简单(它将整个输入集读取到内存中),它将无法读取和输出这些大文件(除非您为 java vm 进程分配了不切实际的大量内存);所以我的问题是:“像 flink/spark/cloud 数
..
我试图运行 Beam Python-SDK 示例,但在读取输入时遇到问题. https://cwiki.apache.org/confluence/display/BEAM/Usage+Guide#UsageGuide-RunaPython-SDKPipeline 当我使用 gs://dataflow-samples/shakespeare/kinglear.txt 作为输入时,错误是
..
我试图运行 Beam Python-SDK 示例,但在读取输入时遇到问题. https://cwiki.apache.org/confluence/display/BEAM/Usage+Guide#UsageGuide-RunaPython-SDKPipeline 当我使用 gs://dataflow-samples/shakespeare/kinglear.txt 作为输入时,错误是
..
我正在尝试在我们的测试集群上的 Flink 上运行 Apache Beam 管道.它在 org.apache.flink.runtime.io.disk.SimpleCollectingOutputView:79 在通过序列化对对象进行编码期间.我还没有能够在本地重现该错误.您可以在此处找到整个作业日志.某些值已被假数据替换. 用于运行管道的命令: bin/flink 运行\-m 纱线簇\
..