apache-kafka相关内容

如何通过Java在Kafka中创建Topic

我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个topic.如果我在命令提示符下创建主题,并且通过 java api 推送消息,则它工作正常.但是我想通过java api创建一个主题.经过长时间的搜索,我找到了下面的代码, ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);AdminU ..
发布时间:2021-11-14 23:54:29 Java开发

流数据处理和纳秒时间分辨率

我刚刚开始讨论实时流数据处理框架的话题,我有一个问题,目前我还没有找到任何决定性的答案: 通常的怀疑对象(Apache 的 Spark、Kafka、Storm、Flink 等)是否支持以 纳秒(甚至皮秒)的事件时间分辨率处理数据? 大多数人和文档都谈论毫秒或微秒分辨率,但我无法找到明确的答案,如果可能有更多分辨率或问题.我推断唯一具有此功能的框架是 influxData 的 Kapac ..

管理员模式下的Kafka

我正在尝试在监督模式下运行 kafka,以便它可以在关机时自动启动.但是所有运行 kafka 的示例都使用 shell 脚本,并且 supervisord 无法记录要监控的 PID.任何人都可以建议如何完成 kafka 的自动重启? 解决方案 如果您使用的是 Unix 或 Linux 机器,那么这就是 /etc/inittab 派上用场的时候.或者您可能想要使用 daemontools.不 ..
发布时间:2021-11-14 23:41:47 其他开发

Apache Storm - Storm UI 中不存在 spout 和 bolts

我正在本地开发风暴拓扑.我正在使用 Storm 0.9.2-incubating 并开发了一个简单的拓扑.当我使用 LocalCluster() 选项部署它时,它工作正常,但它不会显示在我的 Storm UI 中,它只是执行. 当我定期部署它时,它会在我的 Storm UI 中显示拓扑,但是当我单击它时不会看到任何 spouts 或 bolts. 我也用 WordCountTopolo ..
发布时间:2021-11-14 23:41:23 其他开发

如何确保风暴不会将消息两次写入本地文件?

我构建了一个 topo 来通过 kafka 获取消息,然后 grep 一些关键字,如果合适,写入本地文件. 我使用storm-kafka的OpaqueTridentKafkaSpout来保证元组不会遗漏或重复,但考虑一种情况:在向本地文件写入消息时,发生一些错误(例如,空间不足).此时,有些消息已经写入本地文件,有些则没有,如果spout重新发送消息,消息将被写入两次. 如何处理? ..
发布时间:2021-11-14 23:41:01 其他开发

使用 TTL 将数据从一个 Cassandra 表复制到另一个表

我们通过从分区键中删除一列来更改其中一张表的分区键.该表中的每条记录也都有 TTL.现在我们想用 TTL 保留该表中的数据.我们该怎么做? 我们可以创建具有所需架构的新表,然后将数据从旧表复制到新表.然而,我们在这个过程中失去了 TTL. 欲知更多信息 - 此 Cassandra 表由 Apache Storm 应用程序填充,该应用程序从 Kafka 读取事件.我们可以重新水合 Kaf ..
发布时间:2021-11-14 23:40:52 其他开发

在运行时部署流处理拓扑?

大家好, 我有一个要求,我需要重新提取一些旧数据.我们有一个多阶段管道,其来源是一个 Kafka 主题.一旦将记录输入其中,它就会运行一系列步骤(大约 10 个).每一步都会对推送到源主题的原始 JSON 对象进行按摩,然后推送到目标主题. 现在,有时,我们需要重新摄取旧数据并应用我上面描述的步骤的一个子集.我们打算将这些重新摄取记录推送到不同的主题,以免阻止通过的“实时"数据,这可能 ..

如何重置 Kafka 偏移量以匹配尾部位置?

我们将 Storm 与 Kafka 和 ZooKeeper 一起使用.我们遇到过必须删除一些主题并使用不同名称重新创建它们的情况.除了现在读取新主题名称之外,我们的 Kafka 喷口保持不变.但是现在,当尝试从新主题读取时,spout 正在使用旧主题分区的偏移量.因此,my-topic-name 分区 0 的尾部位置将为 500,但偏移量将类似于 10000. 有没有办法重置偏移位置,使其与 ..
发布时间:2021-11-14 23:39:30 Java开发

Storm中按顺序处理记录

我是 Storm 的新手,我在弄清楚如何按顺序处理记录时遇到了问题. 我有一个数据集,其中包含具有以下字段的记录: user_id、location_id、time_of_checking 现在,我想确定满足我指定路径的用户(例如,从位置 A 到位置 B 再到位置 C 的用户). 我正在使用 Kafka 生产者并从文件中读取这些记录来模拟实时数据.数据按日期排序. 因 ..
发布时间:2021-11-14 23:38:54 其他开发

线程“main"中的异常java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme

我正在尝试将 Storm 与 Kafka 集成.我正在使用 KafkaSpout 运行 Storm 拓扑.这是基本的字数统计拓扑.我使用 Apache Kafka 作为源和风暴来处理数据.在提交拓扑时,我面临这些问题.我对 Kafka 和 Storm 很陌生.请建议我在以下代码中需要做的更改. 这是我的代码: public class TopologyMain {private stati ..
发布时间:2021-11-14 23:38:31 Java开发

使用 KafkaOffsetMonitoring 工具监控 Kafka Spout

我正在为我的项目使用 Storm-0.9.2 发行版附带的 kafkaSpout.我想监控这个 spout 的吞吐量.我尝试使用 KafkaOffsetMonitoring,但它没有显示任何消费者阅读我的主题. 我怀疑这是因为我在 Zookeeper 中为 spout 指定了根路径来存储消费者偏移量.kafkaOffsetMonitor 如何知道在哪里查找有关我的 kafkaSpout 实例 ..
发布时间:2021-11-14 23:38:28 其他开发

Kafka Spout 的字段分组

可以对 kafka spout 发出的元组进行字段分组吗?如果是,那么 Storm 是如何知道 Kafka 记录中的字段的? 解决方案 Kafka Spout 像任何其他组件一样声明其输出字段.我的解释基于 KafkaSpout 的当前 implementation. 在KafkaSpout.java 类中,我们看到declareOutputFields 方法调用KafkaConfi ..
发布时间:2021-11-14 23:38:12 其他开发

如何在cloudfoundry上使用kafka和storm?

我想知道是否可以将 kafka 作为云原生应用程序运行,我是否可以在 Pivotal Web Services 上创建 kafka 集群作为服务.我不想只做客户端集成,我想自己运行 kafka 集群/服务? 谢谢,阿尼尔 解决方案 我可以指出几个起点,从这些起点到功能齐全的东西需要一些工作. 一种选择是使用 docker 镜像在 Cloud Foundry(例如 Pivotal ..