apache-kafka相关内容

如何使用BEAM的外部Kafka变换(本地)消费消息

我正在尝试运行一个应用程序,该应用程序使用Kafka生产者(Python客户端)和一个阿帕奇光束管道,它(目前)只是通过将这些消息打印到STDOUT来使用它们。 我了解,将Kafka外部转换与ApacheBEAM一起使用是一项跨语言的工作,因为它调用Java外部服务。我遵循了following link's选项1: 选项1:使用默认扩展服务 这是使用Python时推荐且最简单的设置选 ..
发布时间:2022-04-13 12:38:59 Java开发

如何使用Quarkus在Kafka中设置同一主题中的多个消费者

我正在使用Quarkus框架构建一个Kafka消费者,它将读取带有3个分区的主题。下面的代码片段正在工作,但根据日志,我只是启动了具有3个分区的1个使用者。我现在的问题是,一旦我运行我的应用程序,我如何才能产生3个消费者。 @Incoming("topic-1") public CompletionStage onMessage(KafkaRecord ..

闪光。卡夫卡消费者没有收到卡夫卡的信息

我在Mac上将Kafka和Flink作为坞站容器运行。 我已经实现了Flink Job,它应该使用来自Kafka主题的消息。 我运行一个向主题发送消息的python生成器。 作业开始时没有问题,但零消息到达。 我相信消息发送到了正确的主题,因为我有能够使用消息的python使用者。 闪烁作业(Java): package com.p81.datapipeline.swg; ..
发布时间:2022-03-15 11:58:40 其他开发

Flink SQL客户端连接到安全的Kafka群集

我想对安全Kafka集群的Kafka主题支持的Flink SQL表执行查询。我能够以编程方式执行查询,但无法通过Flink SQL Client执行相同的操作。我不确定如何通过Flink SQL客户端传递JAAS配置(java.security.auth.login.config)和其他系统属性。 以编程方式刷新SQL查询 private static void simpleExec ..
发布时间:2022-03-15 11:07:17 其他开发

Flink Python数据流API Kafka生产者Sink序列化

您好,我正在尝试从一个卡夫卡主题中读取数据,并在进行一些处理后写入到另一个主题中。 当我试图将数据写入另一个主题时,我能够读取数据并对其进行处理。它会显示错误 如果我尝试按原样写入数据,而不对其进行任何处理。Kafka生产者SimpleStringSchema接受它。 但我想将字符串转换为Json。玩Json,然后以字符串格式将其写入另一个主题。 我的代码: import jso ..
发布时间:2022-03-15 10:50:39 Java开发

Flink Kafka Producer中的只有一次语义

我正在尝试使用Kafka Source和Sink测试Flink只需一次的语义: 运行Flink应用程序,只需将消息从一个主题传输到另一个主题,并行度=1,检查点间隔20秒 每隔2秒使用Python脚本生成整数递增的消息。 读取控制台使用者处于READ_COMMITTED隔离级别的输出主题。 手动终止TaskManager 我希望在输出主题中看到整数单调递增,而不考虑TaskMan ..
发布时间:2022-03-15 10:33:48 其他开发

Flink Kafka源代码中的并行性导致不执行任何操作

我是一个卡夫卡和闪烁的初学者。 我注意到一些令人不安的事情。当我将Kafka作业的并行度增加到任何大于1的值时,我没有窗口来执行它们的进程。我希望使用并行度来提高分析速度。 查看Apache Flink Web Dashboard中可视化问题的图像示例。 这是完全相同的代码和接收到的完全相同的数据集,区别仅是并行性。在第一个示例中,摄取的数据流经窗口函数,但是当并行度增加时,数据只是堆积在从 ..
发布时间:2022-03-15 10:21:14 其他开发

定义多个 kafkaComponent bean 时,Kafka Camel Spring Boot AutoConfiguration 不会被拾取

我目前正在使用 这个 camel-kafka-startermaven 依赖 在 Spring Boot 中自动配置我的 kafka camel 组件. 如果我添加说,像这样的设置 camel.component.kafka.configuration.linger-ms=20.Camel kafka 组件在路由中拾取它,我可以在日志输出中看到它的配置值.例如 @Component公共类路 ..
发布时间:2022-01-19 08:57:52 Java开发

为什么骆驼卡夫卡生产者很慢?

我正在使用 apache camel kafka 作为客户端来生成消息,我观察到 kafka 生产者需要 1 毫秒来推送消息,如果我使用骆驼聚合将消息合并到批处理中,那么推送单个消息需要 100 毫秒. 安装简述3 kafka 集群 16Core 32GB RAM 示例代码 String endpoint="kafka:test?topic=test&brokers=nodekfa ..
发布时间:2022-01-19 08:52:11 其他开发

如何为 junit 测试实例化 Mock Kafka 主题?

我对使用 kafka 主题的代码进行了一些 JUnit 测试.我尝试过的模拟 kafka 主题不起作用,并且在线找到的示例非常旧,因此它们也不适用于 0.8.2.1.如何使用 0.8.2.1 创建模拟 kafka 主题? 澄清一下:我选择使用主题的实际嵌入式实例来测试真实实例,而不是在 mockito 中模拟手关.这样我就可以测试我的自定义编码器和解码器是否实际工作,并且当我使用真正的 ka ..
发布时间:2022-01-08 22:26:12 其他开发

使用 JMeter 开发 Apache Kafka 生产者和负载测试

是否可以使用 JMeter 将消息推送到 Apache Kafka. 如何实现生产者(在JAVA中)将消息推送到Kafka. 问候,阿南 解决方案 我以为之前有答案,也许没有.这些你看了吗?我自己用的是原来的 kafkameter. https://github.com/BrightTag/kafkameter https://github.com/EugeneYshi ..
发布时间:2022-01-04 12:44:34 其他开发