apache-samza相关内容
我有一些 Samza 作业运行所有读取来自 Kafka 主题的消息并将新消息写入新主题.为了发送新消息,我使用了 Samza 内置的 OutgoingMessageEnvelope.还使用 MessageCollector 发送新消息.它看起来像这样: collector.send(new OutgoingMessageEnvelope(SystemStream, newMessage)) 有
..
我使用 Kafka 和 Zookeeper 作为数据管道的主要组件,每秒处理数千个请求.我使用 Samza 作为实时数据处理工具,用于我需要对数据进行的小型转换. 我的问题是我的一个消费者(比如 ConsumerA)消费了来自 Kafka 的几个主题并处理它们.基本上创建一个被消化的主题的摘要.我还想将此数据作为单独的主题推送到 Kafka,但这会在 Kafka 和我的组件上形成一个循环.
..
我正在运行将数据写入 Kafka 主题的 Samza 流作业.Kafka 正在运行一个 3 节点集群.Samza 作业部署在纱线上.我们在容器日志中看到了很多这样的异常: INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[Contai
..
我正在使用 Samza 处理来自 Kafka 主题的消息.一些消息在未来带有时间戳,我想将处理推迟到该时间戳之后.与此同时,我想继续处理其他传入的消息. 我试图做的是让我的 Task 排队消息并实现 WindowableTask 以定期检查消息,如果它们的时间戳允许处理它们.基本思路如下: public class MyTask 实现了StreamTask, WindowableTask
..
我有一些Samza作业正在运行,它们全部从Kafka主题中读取消息并将新消息写入新主题.要发送新消息,我正在使用Samza内置的OutgoingMessageEnvelope.还使用MessageCollector发送新消息.看起来像这样: collector.send(new OutgoingMessageEnvelope(SystemStream, newMessage)) 有没有一
..
我正在使用Kafka和Zookeeper作为我的数据管道的主要组件,该管道每秒处理数千个请求.我正在使用Samza作为实时数据处理工具来进行我需要对数据进行的小转换. 我的问题是我的一个使用者(让我们说ConsumerA)消耗了Kafka中的多个主题并对其进行处理.基本上创建摘要的摘要.我还想将这些数据作为一个单独的主题推送到Kafka,但这在Kafka和我的组件上形成了一个循环. 这
..
我正在与Samza处理来自Kafka主题的消息.某些消息将来会带有时间戳,我想将处理推迟到该时间戳之后.同时,我想继续处理其他传入消息. 我试图做的是让我的Task排队消息,并实现WindowableTask来定期检查消息(如果它们的时间戳允许处理它们).基本思路如下: public class MyTask implements StreamTask, WindowableTask
..
我正在运行一个Samza流作业,该作业正在将数据写入Kafka主题. Kafka正在运行一个3节点群集. Samza作业部署在纱线上.我们在容器日志中看到了许多此类异常: INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[Cont
..
如果您使用Samza的 OutgoingMessageEnvelope 使用以下格式发送消息: public OutgoingMessageEnvelope(SystemStream systemStream, java.lang .Object partitionKey, java.lang.Object key, java.lang.Object message) 从指定组件构造
..