kafka-producer-api相关内容
我有 windows 环境和我自己的一套 kafka 和 zookeeper 正在运行.为了使用自定义对象,我开始使用 Avro.但我需要启动注册表.下载 Confluent 平台并运行: $ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties/c/Confluent/confluent-3.0
..
我正在使用 Kafka 生产者客户端,我的项目中没有任何 log4j 配置. 在运行时,程序打印了很多我真的不想要的 Kafka 调试日志. 因此,我尝试添加 log4j.properties 以将日志级别设置为 ERROR,如下所示,这似乎不起作用: log4j.rootLogger=错误 如何更改 Kafka 日志级别? 解决方案 在运行客户端时使用命令行标志 -Dlo
..
每当我试图从 kafka 队列读取消息时,我都会收到以下异常: [error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record 无法转换为 com.harmeetsingh13.java.Customerjava.lang.ClassCastException: org.a
..
我试图在循环中加载数据文件(以检查统计数据)而不是 Kafka 中的标准输入.下载Kafka后,我执行了以下步骤: 启动zookeeper: bin/zookeeper-server-start.sh config/zookeeper.properties 启动服务器: bin/kafka-server-start.sh config/server.properties 创建了一个名
..
我有一个像下面这样的简单 java 生产者 公共类生产者{private final static String TOPIC = "my-example-topi8";私有最终静态字符串 BOOTSTRAP_SERVERS = "localhost:8092";public static void main( String[] args ) 抛出异常 {生产者生产者 = createProduc
..
我有一个用例,我有一个 JSON,我想从 JSON 中生成模式和记录并发布记录.我已经配置了值序列化器并且架构设置向后兼容. 第一个 JSON 字符串 json = "{\n" + " \"id\": 1,\n" +" \"名称\": \"耳机\",\n" +" \"价格\": 1250.0,\n" +" \"标签\": [\"home\", \"green\"]\n" +"}\n
..
我正在将 Spring 用于 Apache Kafka,并创建了一个服务,该服务通过 Spring 的 KafkaTemplate 使用 Kafka Producer (org.apache.kafka.clients.producer) 向主题发送消息.在目标 Kafka 集群上,我禁用了自动主题创建.使用此处列出的生产者配置组合 https://kafka.apache.org/documen
..
卡夫卡 0.8V 我想发布/consume byte[] 对象、java bean 对象、可序列化对象等等.. 为这种类型的场景定义发布者和消费者的最佳方式是什么?当我从消费者迭代器消费一条消息时,我不知道它是什么类型的消息.任何人都可以指点我如何设计此类场景的指南吗? 解决方案 我强制每个 Kafka 主题使用单一模式或对象类型.这样,当您收到消息时,您就可以确切地知道自己收
..
我在端口 5000 上的 docker 容器中运行了一个 rest 服务,该服务用于通过 kafka 主题生成消息,该主题用完 docker 容器. 我已经使用以下属性配置了我的生产者客户端:- bootstrap.servers=localhost:9093 而且我已经使用以下命令开始了我的控制:- docker run -d -p 127.0.0.1:5000:5000
..
我在端口 5000 上的 docker 容器中运行了一个 rest 服务,该服务用于通过 kafka 主题生成消息,该主题用完 docker 容器. 我已经使用以下属性配置了我的生产者客户端:- bootstrap.servers=localhost:9093 而且我已经使用以下命令开始了我的控制:- docker run -d -p 127.0.0.1:5000:5000
..
我已经实现了一个简单的 Kafka 死信记录处理器. 当使用控制台生产者产生的记录时,它工作得很好. 但是,我发现我们的 Kafka Streams 应用程序并不能保证向接收器主题生成记录,即每生成一条记录,偏移量就会增加 1. 死信处理器背景: 我有一个场景,在发布处理记录所需的所有数据之前,可能会收到记录.当流应用程序处理的记录不匹配时,它们将移动到死信主题,而不是继续
..
我是 Kafka 的新手,正在研究将专有流媒体服务连接到 Kafka 的原型. 我希望获取在某个主题上发送的最后一条消息的密钥,因为我们的内部流消费者需要使用它在连接时收到的最后一条消息的 ID 登录. 是否可以使用 KafkaProducer 或 KafkaConsumer 来执行此操作? 我尝试使用消费者执行以下操作,但在运行控制台消费者时,我看到消息重播. //轮询以便我
..
如何在kafka中发送同步消息? 实现它的一种方法是设置属性参数 max.in.flight.requests.per.connection = 1. 但我想知道在 kafka 中是否有一种甚至直接或替代的方式发送同步消息.(类似于 producer.syncSend(...) 等). 解决方案 生产者 API 从 send 返回一个 Future.您可以调用 Future#ge
..
我正在研究 Kafka 9 作为一个业余项目,并完成了一些“Hello World"类型的示例. 我必须考虑基于请求响应消息的真实世界 Kafka 应用程序,更具体地说,如何将 Kafka 请求消息链接到其响应消息. 我正在考虑使用生成的 UUID 作为请求消息键,并使用此请求 UUID 作为关联的响应消息键.与 WebSphere MQ 具有消息关联 ID 的机制类型非常相似.
..
我知道无法在 Kafka 中对多个分区进行排序,并且分区排序仅对组内的单个使用者(对于单个分区)有保证.但是,使用 Kafka Streams 0.10 现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都保持顺序,那么在消费者方面,可以说使用 Kafka Streams 0.10 现在有可能吗?假设我们收到所有消息,我们是否可以不根据消费的时间戳对所有分区进行排序,然后将它
..
我对 kafka 相当陌生,所以如果这个问题微不足道,请原谅我.为了计时测试,我有一个非常简单的设置,如下所示: 机器 A -> 写入主题 1 (Broker) -> 机器 B 从主题 1 读取机器 B -> 将刚刚读取的消息写入主题 2 (Broker) -> 机器 A 从主题 2 中读取 现在我在无限循环中发送大约 1400 字节的消息,很快填满了我的小代理上的空间.我正在尝试为
..
我们正在使用标题向 Kafka 发送消息org.apache.kafka.clients.producer.ProducerRecord public ProducerRecord(String topic, Integer partition, K key, V value, Iterable headers) {this(topic, partition, (Long)null, key,
..
在 kafka 中创建主题的最佳方式是什么? 创建主题时要定义多少个副本/分区? 在新的生产者 API 中,当我尝试向不存在的主题发布消息时,它第一次失败然后成功发布. 我想知道副本、分区和集群节点数之间的关系. 我们需要在发布消息之前创建主题吗? 解决方案 当您启动 Kafka 代理时,您可以在 conf/server.properties 文件中定义一组属性.该文件
..
我正在寻找有关该主题的一些说明.在 Kafka 文档中,我发现了以下内容: Kafka 仅提供分区内消息的总顺序,而不提供主题中不同分区之间的总顺序.对于大多数应用程序来说,按分区排序与按键分区数据的能力相结合就足够了.但是,如果您需要消息的总顺序,这可以通过只有一个分区的主题来实现,尽管这意味着每个消费者组只有一个消费者进程. 这里是我的问题: 这是否意味着如果我希望有 1 个
..
我正在尝试使用 Java API v0.9.0.1 让一个简单的 Kafka Consumer 工作.我使用的 kafka 服务器是一个 docker 容器,也运行版本 0.9.0.1.以下是消费者代码: 公共类消费者{public static void main(String[] args) 抛出 IOException {KafkaConsumer消费者;尝试 (InputStream p
..