apache-kafka相关内容
我正在尝试运行一个应用程序,该应用程序使用Kafka生产者(Python客户端)和一个阿帕奇光束管道,它(目前)只是通过将这些消息打印到STDOUT来使用它们。 我了解,将Kafka外部转换与ApacheBEAM一起使用是一项跨语言的工作,因为它调用Java外部服务。我遵循了following link's选项1: 选项1:使用默认扩展服务 这是使用Python时推荐且最简单的设置选
..
我正在使用Quarkus框架构建一个Kafka消费者,它将读取带有3个分区的主题。下面的代码片段正在工作,但根据日志,我只是启动了具有3个分区的1个使用者。我现在的问题是,一旦我运行我的应用程序,我如何才能产生3个消费者。 @Incoming("topic-1") public CompletionStage onMessage(KafkaRecord
..
我在Mac上将Kafka和Flink作为坞站容器运行。 我已经实现了Flink Job,它应该使用来自Kafka主题的消息。 我运行一个向主题发送消息的python生成器。 作业开始时没有问题,但零消息到达。 我相信消息发送到了正确的主题,因为我有能够使用消息的python使用者。 闪烁作业(Java): package com.p81.datapipeline.swg;
..
这是我的表: CREATE TABLE orders ( `id` STRING, `currency_code` STRING, `total` DECIMAL(10,2), `order_time` TIMESTAMP(3), WATERMARK FOR `order_time` AS order_time - INTERVAL '30' SECONDS ) WITH
..
我想对安全Kafka集群的Kafka主题支持的Flink SQL表执行查询。我能够以编程方式执行查询,但无法通过Flink SQL Client执行相同的操作。我不确定如何通过Flink SQL客户端传递JAAS配置(java.security.auth.login.config)和其他系统属性。 以编程方式刷新SQL查询 private static void simpleExec
..
您好,我正在尝试从一个卡夫卡主题中读取数据,并在进行一些处理后写入到另一个主题中。 当我试图将数据写入另一个主题时,我能够读取数据并对其进行处理。它会显示错误 如果我尝试按原样写入数据,而不对其进行任何处理。Kafka生产者SimpleStringSchema接受它。 但我想将字符串转换为Json。玩Json,然后以字符串格式将其写入另一个主题。 我的代码: import jso
..
我正在尝试使用Kafka Source和Sink测试Flink只需一次的语义: 运行Flink应用程序,只需将消息从一个主题传输到另一个主题,并行度=1,检查点间隔20秒 每隔2秒使用Python脚本生成整数递增的消息。 读取控制台使用者处于READ_COMMITTED隔离级别的输出主题。 手动终止TaskManager 我希望在输出主题中看到整数单调递增,而不考虑TaskMan
..
我是一个卡夫卡和闪烁的初学者。 我注意到一些令人不安的事情。当我将Kafka作业的并行度增加到任何大于1的值时,我没有窗口来执行它们的进程。我希望使用并行度来提高分析速度。 查看Apache Flink Web Dashboard中可视化问题的图像示例。 这是完全相同的代码和接收到的完全相同的数据集,区别仅是并行性。在第一个示例中,摄取的数据流经窗口函数,但是当并行度增加时,数据只是堆积在从
..
主要目标是聚合两个Kafka主题,一个是压缩的慢速移动数据,另一个是每秒接收的快速移动数据。 我已经能够在KV(Long,String)等简单场景中使用如下内容消费消息: PCollection> input = p.apply(KafkaIO.read() .withKeyDeserializer(LongDeserial
..
尝试启动多个不同brokerId的Kafka Broker时,一个是默认的server.properties,另一个是更改了2行的serverTest.properties,分别是broker.id=1和listeners=PLAINTEXT://localhost:6000。睡觉是相同的默认设置。我先启动ZooKeeper,然后启动默认的Kafkaserver.properties,然后启动se
..
基本 apt-get install librdkafka1 在 Debian 8.x 上工作,但在 Debian 9.x 上失败.这看起来像是关于 libssl 的依赖版本问题.Debian 8.x 有 libssl1.0.0,而 Debian 9.x 有 libssl1.0.2 和 libssl1.1,但没有 libssl1.0.0,而且这个版本冲突只会导致 librdkafka1 安装中断.
..
使用 Alpine,Alpine 完全支持最新版本的 librdkafka,我可以在我的 Dockerfile 中执行 apk add,并且可以进行以下操作: FROM golang:1.13-alpine3.10 作为构建器工作目录/app复制 go.mod go.sum ./复制源代码 ./src/运行设置-eux;\apk add --no-cache gcc git libc-dev l
..
我目前正在使用 这个 camel-kafka-startermaven 依赖 在 Spring Boot 中自动配置我的 kafka camel 组件. 如果我添加说,像这样的设置 camel.component.kafka.configuration.linger-ms=20.Camel kafka 组件在路由中拾取它,我可以在日志输出中看到它的配置值.例如 @Component公共类路
..
我正在使用 apache camel kafka 作为客户端来生成消息,我观察到 kafka 生产者需要 1 毫秒来推送消息,如果我使用骆驼聚合将消息合并到批处理中,那么推送单个消息需要 100 毫秒. 安装简述3 kafka 集群 16Core 32GB RAM 示例代码 String endpoint="kafka:test?topic=test&brokers=nodekfa
..
根据问题如何手动控制使用camel-kafka 提交偏移量? 我想使用camel-kafka 手动提交偏移量.我的路线: .from(kafka:topic1).aggregate(新 GroupByExchangeStrategy()).to(kafka:topic2).process(新ManualCommitProcessor()) ,其中 ManualCommitProcessor 将
..
我没有看到如何使用 camel-avro 组件生成和使用 kafka avro 消息的示例?目前我的骆驼路线是这样的.为了使用模式注册和其他类似的道具,应该改变它使用camel-kafka-avro consumer &制片人. props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost
..
Kafka 可以配置为使用多种身份验证机制:纯文本用户名/密码、Kerberos 或 SSL.前 2 个使用 SASL,其中需要 JAAS 配置文件. 对于纯文本身份验证方法,配置看起来像(取自 文档): KafkaServer {需要 org.apache.kafka.common.security.plain.PlainLoginModule用户名="管理员"密码="管理员秘密"use
..
启动 Kafka Connect (connect-standalone) 后,我的任务在启动后立即失败: java.lang.OutOfMemoryError: Java 堆空间在 java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)在 java.nio.ByteBuffer.allocate(ByteBuffer.java:335)在
..
我对使用 kafka 主题的代码进行了一些 JUnit 测试.我尝试过的模拟 kafka 主题不起作用,并且在线找到的示例非常旧,因此它们也不适用于 0.8.2.1.如何使用 0.8.2.1 创建模拟 kafka 主题? 澄清一下:我选择使用主题的实际嵌入式实例来测试真实实例,而不是在 mockito 中模拟手关.这样我就可以测试我的自定义编码器和解码器是否实际工作,并且当我使用真正的 ka
..
是否可以使用 JMeter 将消息推送到 Apache Kafka. 如何实现生产者(在JAVA中)将消息推送到Kafka. 问候,阿南 解决方案 我以为之前有答案,也许没有.这些你看了吗?我自己用的是原来的 kafkameter. https://github.com/BrightTag/kafkameter https://github.com/EugeneYshi
..