apache-kafka相关内容
我在 Zookeeper 和代理身份验证上启用 SASL 时遇到以下错误. [2017-04-18 15:54:10,476] DEBUG 客户端 SASL 令牌的大小:0(org.apache.zookeeper.server.ZooKeeperServer)[2017-04-18 15:54:10,476] 错误 cnxn.saslServer 为空:cnxn 对象未正确初始化其 saslS
..
我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个topic.如果我在命令提示符下创建主题,并且通过 java api 推送消息,则它工作正常.但是我想通过java api创建一个主题.经过长时间的搜索,我找到了下面的代码, ZkClient zkClient = new ZkClient("localhost:2181", 10000, 10000);AdminU
..
我有一个 Kafka 2.1 消息代理,想对 Spark 2.4 中的消息数据进行一些处理.我想使用 Zeppelin 0.8.1 notebooks 进行快速原型设计. 我下载了结构化流媒体所必需的 spark-streaming-kafka-0-10_2.11.jar (http://spark.apache.org/docs/latest/structured-streaming-ka
..
我可以使用本地集群运行storm Kafka,但不能使用storm Submitter 下面是我的拓扑代码 谁能帮我解决这个问题:) package com.org.kafka;导入 org.apache.storm.Config;导入 org.apache.storm.LocalCluster;导入 org.apache.storm.generated.AlreadyAliveExcep
..
我刚刚开始讨论实时流数据处理框架的话题,我有一个问题,目前我还没有找到任何决定性的答案: 通常的怀疑对象(Apache 的 Spark、Kafka、Storm、Flink 等)是否支持以 纳秒(甚至皮秒)的事件时间分辨率处理数据? 大多数人和文档都谈论毫秒或微秒分辨率,但我无法找到明确的答案,如果可能有更多分辨率或问题.我推断唯一具有此功能的框架是 influxData 的 Kapac
..
我正在尝试在监督模式下运行 kafka,以便它可以在关机时自动启动.但是所有运行 kafka 的示例都使用 shell 脚本,并且 supervisord 无法记录要监控的 PID.任何人都可以建议如何完成 kafka 的自动重启? 解决方案 如果您使用的是 Unix 或 Linux 机器,那么这就是 /etc/inittab 派上用场的时候.或者您可能想要使用 daemontools.不
..
我正在本地开发风暴拓扑.我正在使用 Storm 0.9.2-incubating 并开发了一个简单的拓扑.当我使用 LocalCluster() 选项部署它时,它工作正常,但它不会显示在我的 Storm UI 中,它只是执行. 当我定期部署它时,它会在我的 Storm UI 中显示拓扑,但是当我单击它时不会看到任何 spouts 或 bolts. 我也用 WordCountTopolo
..
我构建了一个 topo 来通过 kafka 获取消息,然后 grep 一些关键字,如果合适,写入本地文件. 我使用storm-kafka的OpaqueTridentKafkaSpout来保证元组不会遗漏或重复,但考虑一种情况:在向本地文件写入消息时,发生一些错误(例如,空间不足).此时,有些消息已经写入本地文件,有些则没有,如果spout重新发送消息,消息将被写入两次. 如何处理?
..
我们通过从分区键中删除一列来更改其中一张表的分区键.该表中的每条记录也都有 TTL.现在我们想用 TTL 保留该表中的数据.我们该怎么做? 我们可以创建具有所需架构的新表,然后将数据从旧表复制到新表.然而,我们在这个过程中失去了 TTL. 欲知更多信息 - 此 Cassandra 表由 Apache Storm 应用程序填充,该应用程序从 Kafka 读取事件.我们可以重新水合 Kaf
..
我们正在 LocalCluster 中执行 Storm 拓扑.Storm 拓扑运行良好并且能够连接 Storm UI (8090).但是 Storm UI 没有显示正在运行的拓扑信息. LocalCluster cluster = new LocalCluster(); 并提交如下: bin/storm jar bin/StormTest-0.0.1-SNAPSHOT.jar com.abz
..
大家好, 我有一个要求,我需要重新提取一些旧数据.我们有一个多阶段管道,其来源是一个 Kafka 主题.一旦将记录输入其中,它就会运行一系列步骤(大约 10 个).每一步都会对推送到源主题的原始 JSON 对象进行按摩,然后推送到目标主题. 现在,有时,我们需要重新摄取旧数据并应用我上面描述的步骤的一个子集.我们打算将这些重新摄取记录推送到不同的主题,以免阻止通过的“实时"数据,这可能
..
我正在使用 Storm 1.1.2 和 Kafka 0.11 构建要在 Docker 容器中启动的 Java Spring 应用程序. 我的拓扑中的一切都按计划工作,但在 Kafka 的高负载下,Kafka 滞后随着时间的推移越来越多. 我的 KafkaSpoutConfig: KafkaSpoutConfigspoutConf =KafkaSpoutConfig.builder(
..
我们将 Storm 与 Kafka 和 ZooKeeper 一起使用.我们遇到过必须删除一些主题并使用不同名称重新创建它们的情况.除了现在读取新主题名称之外,我们的 Kafka 喷口保持不变.但是现在,当尝试从新主题读取时,spout 正在使用旧主题分区的偏移量.因此,my-topic-name 分区 0 的尾部位置将为 500,但偏移量将类似于 10000. 有没有办法重置偏移位置,使其与
..
我正在尝试使用图形 API 获取 fb 页面数据.每个帖子的大小超过 1MB,其中 kafka 默认 fetch.message 为 1MB.通过在 kafa consumer.properties 和 server.properties 文件中添加以下行,我已将 kafka 属性从 1MB 更改为 3MB. fetch.message.max.bytes=3048576 (consumer.p
..
我是 Storm 的新手,我在弄清楚如何按顺序处理记录时遇到了问题. 我有一个数据集,其中包含具有以下字段的记录: user_id、location_id、time_of_checking 现在,我想确定满足我指定路径的用户(例如,从位置 A 到位置 B 再到位置 C 的用户). 我正在使用 Kafka 生产者并从文件中读取这些记录来模拟实时数据.数据按日期排序. 因
..
我正在尝试将 Storm 与 Kafka 集成.我正在使用 KafkaSpout 运行 Storm 拓扑.这是基本的字数统计拓扑.我使用 Apache Kafka 作为源和风暴来处理数据.在提交拓扑时,我面临这些问题.我对 Kafka 和 Storm 很陌生.请建议我在以下代码中需要做的更改. 这是我的代码: public class TopologyMain {private stati
..
我正在为我的项目使用 Storm-0.9.2 发行版附带的 kafkaSpout.我想监控这个 spout 的吞吐量.我尝试使用 KafkaOffsetMonitoring,但它没有显示任何消费者阅读我的主题. 我怀疑这是因为我在 Zookeeper 中为 spout 指定了根路径来存储消费者偏移量.kafkaOffsetMonitor 如何知道在哪里查找有关我的 kafkaSpout 实例
..
可以对 kafka spout 发出的元组进行字段分组吗?如果是,那么 Storm 是如何知道 Kafka 记录中的字段的? 解决方案 Kafka Spout 像任何其他组件一样声明其输出字段.我的解释基于 KafkaSpout 的当前 implementation. 在KafkaSpout.java 类中,我们看到declareOutputFields 方法调用KafkaConfi
..
我的问题是 Storm KafkaSpout 在一段时间后停止使用来自 Kafka 主题的消息.在 Storm 中启用调试时,我得到这样的日志文件: 2016-07-05 03:58:26.097 oasdtask [INFO] 发射:packet_spout __metrics [#object[org.apache.storm.metric.api.IMetricsConsumer$Ta
..
我想知道是否可以将 kafka 作为云原生应用程序运行,我是否可以在 Pivotal Web Services 上创建 kafka 集群作为服务.我不想只做客户端集成,我想自己运行 kafka 集群/服务? 谢谢,阿尼尔 解决方案 我可以指出几个起点,从这些起点到功能齐全的东西需要一些工作. 一种选择是使用 docker 镜像在 Cloud Foundry(例如 Pivotal
..