apache-kafka相关内容

json文件数据转换成kafka主题

如何使用kafka-console-producer将json文件数据插入kafka topic?每个json数据集可以存储为一条消息吗? 示例- {“id":1,“名字":“约翰",“姓氏":“瑞士莲",“电子邮件":“jlindt@gmail.com",“性别":“男性",“ip_address":“1.2.3.4";} 如果你使用这个命令 - cat sampledata.jso ..
发布时间:2021-12-24 17:28:21 其他开发

多个主题的 Kafka 消费者

我有一个主题列表(现在是 10 个),它们的大小将来会增加.我知道我们可以产生多个线程(每个主题)来从每个主题消费,但在我的情况下,如果主题数量增加,那么从主题消费的线程数量会增加,这是我不想要的,因为主题不是将过于频繁地获取数据,因此线程将处于理想状态. 有没有办法让一个消费者从所有主题中消费?如果是,那么我们如何才能实现它?Kafka 将如何维护偏移量?请提出答案. 解决方案 我 ..
发布时间:2021-12-24 17:28:02 Java开发

Kafka最优保留和删除策略

我对 kafka 相当陌生,所以如果这个问题微不足道,请原谅我.为了计时测试,我有一个非常简单的设置,如下所示: 机器 A -> 写入主题 1 (Broker) -> 机器 B 从主题 1 读取机器 B -> 将刚刚读取的消息写入主题 2 (Broker) -> 机器 A 从主题 2 中读取 现在我在无限循环中发送大约 1400 字节的消息,很快填满了我的小代理上的空间.我正在尝试为 ..

Kafka 可以提供自定义 LoginModule 来支持 LDAP 吗?

Kafka 可以配置为使用多种身份验证机制:纯文本用户名/密码、Kerberos 或 SSL.前 2 个使用 SASL,其中需要一个 JAAS 配置文件. 对于纯文本身份验证方法,配置看起来像(取自 文档): KafkaServer {需要 org.apache.kafka.common.security.plain.PlainLoginModule用户名 =“管理员"密码=“管理员秘密" ..
发布时间:2021-12-24 17:23:45 其他开发

我可以在 Kafka 集群中拥有数百个主题吗?

我有一个数据流用例,我想根据每个客户存储库(可能是 100,000 个)定义主题,每个数据流都是一个带有分区的主题(按照一个几个 10 秒)定义流程的不同阶段. Kafka 适合这样的场景吗?如果不是,我将如何改造我的用例来处理这些场景.此外,即使在处理过程中,每个客户存储库数据也不能与其他存储库数据混合. 解决方案 2021 年 3 月更新: Kafka 新的 KRaft 模式将 ..
发布时间:2021-12-24 17:20:55 其他开发

如何在 spark 3.0 结构化流媒体中使用 kafka.group.id 和检查点以继续从 Kafka 读取它在重启后停止的位置?

基于Spark 3.0中的介绍,https:///spark.apache.org/docs/latest/structured-streaming-kafka-integration.html.应该可以设置“kafka.group.id"跟踪偏移量.对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失.根据我之前的问题,我觉得 Spark 3.0 中的 kafka.g ..

如何管理Kafka KStream到Kstream窗口连接?

基于 apache Kafka 文档 KStream-to-KStream Joins 总是窗口连接,我的问题是如何控制窗口的大小?保持主题数据的大小是否相同?或者,例如,我们可以保留 1 个月的数据,但只加入过去一周的数据流? 有没有什么好的例子来展示一个窗口化的 KStream-to-kStream 窗口化连接? 就我而言,假设我有 2 个 KStream,kstream1 和 k ..
发布时间:2021-12-24 17:19:42 其他开发

Kafka 快速入门,advertised.host.name 给出 kafka.common.LeaderNotAvailableException

我能够在一台 Linux 机器上本地运行一个简单的单节点 Kafka (kafka_2.11-0.8.2.1),但是当我尝试远程运行生产者时,我遇到了一些令人困惑的错误. 我正在遵循 http://kafka.apache.org/documentation.html#quickstart.我停止了kafka进程并删除了所有zookeeper &/tmp 中的业力文件.我在本地 10.0.0 ..
发布时间:2021-12-24 17:17:17 其他开发

未能找到话题的领导者;java.lang.NullPointerException NullPointerException 在 org.apache.kafka.common.utils.Utils.formatAddress

当我们尝试从启用 SSL 的 Kafka 主题流式传输数据时,我们面临以下错误.你能帮我们解决这个问题吗? 19/11/07 13:26:54 INFO ConsumerFetcherManager:[ConsumerFetcherManager-1573151189884] 为分区 ArrayBuffer() 添加了提取器19/11/07 13:26:54 WARN ConsumerFetch ..

Kafka Stream StateStore 在所有实例上是全局的还是本地的?

在 Kafka Stream WordCount 示例中,它使用 StateStore 来存储字数.如果同一个消费者组中有多个实例,StateStore 对组来说是全局的,还是只是一个消费者实例的局部? 感谢 解决方案 这取决于您对 state store 的看法. 在 Kafka Streams 中,状态是共享的,因此每个实例都包含整个应用程序状态的一部分.例如,使用 DSL ..
发布时间:2021-11-28 21:39:27 其他开发

如何批量处理最大大小的 KStream 或回退到时间窗口?

我想创建一个基于 Kafka 流的应用程序,该应用程序处理一个主题并分批接收大小为 X(即 50)的消息,但如果流的流量较低,则在 Y 秒内为我提供流中的任何内容(即5). 因此,我不是一个一个地处理消息,而是处理一个 List[Record],其中列表的大小为 50(或可能更少). 这是为了让一些 I/O 绑定处理更高效. 我知道这可以用经典的 Kafka API 来实现,但我 ..
发布时间:2021-11-28 21:39:16 其他开发

Kafka 接收器连接器:即使重启后也未分配任务

我在一组 Docker 容器中使用 Confluent 3.2,其中一个正在运行 kafka-connect worker. 由于我不清楚的原因,我的四个连接器中的两个 - 具体来说,hpgraphsl 的 MongoDB接收器连接器 - 停止工作.我能够确定主要问题:连接器没有分配任何任务,这可以通过调用 GET/connectors/{my_connector}/status 看到.其他 ..

Apache Kafka 客户端什么时候会抛出“Batch Expired"?例外?

使用 Apache Kafka Java 客户端 (0.9),我尝试使用 Kafka Producer 类. 异步send 方法 立即返回一段时间,然后在短时间内开始阻塞每次调用.大约三十秒后,客户端开始抛出异常 (TimeoutException),带有消息“批量过期". 什么情况会导致抛出这个异常? 解决方案 此异常表示您正在以比发送记录更快的速度排队记录. 当您调用 ..
发布时间:2021-11-28 21:38:17 Java开发

如何将 KAFKA 的属性外部化为不同的类并将其调用到主类?

我一直在尝试使用抽象 OOP 来外部化属性代码,这样我就可以将它调用到生产者类,但我似乎无法调用它.这是代码,任何帮助将不胜感激. 公共类生产者{私有静态最终记录器记录器 = LogManager.getLogger(Producer.class);公共静态无效主(字符串 [] args){logger.info(“创建卡夫卡生产者...");KafkaProducer生产者 = 新的 Kafk ..
发布时间:2021-11-26 17:34:28 Java开发

如何在彼此独立的同一个盒子上运行多个 kafka 消费者?

我有两个 Kafka 消费者 ConsumerA 和 ConsumerB.我想在同一台机器上运行这两个相互独立的 kafka 消费者.他们之间根本没有任何关系.这两个 kafka 消费者将在同一台机器上处理不同的主题. 每个使用者都应该有一个不同的 Properties 对象. 每个使用者都应该有不同的线程池配置,因为它们可以在需要时独立于其他使用者以多线程方式(使用者组)运行. 以 ..