kafka-producer-api相关内容
我创建了 3 个 Kafka 代理设置,代理 ID 为 20、21、22.然后我创建了这个主题: bin/kafka-topics.sh --zookeeper localhost:2181 \--create --topic zeta --partitions 4 --replication-factor 3 导致: 当生产者发送消息“hello world"时主题zeta,消息首先被
..
我遇到了两个关于排序的短语, 生产者发送到特定主题分区的消息将是按发送顺序附加.也就是说,如果发送一条记录 M1由与记录 M2 相同的生产者,首先发送 M1,然后发送 M1将具有比 M2 更低的偏移量,并在日志中更早出现. 另一个 (config param) max.in.flight.requests.per.connection - 最大数量客户端将在单个连接上发送的未确认
..
我对卡夫卡很陌生,所以请多多包涵.这是我的设置.我在 unix 机器上托管了 kafka.集群.在域中说 B.客户端在 Windows 上.我正在尝试使用域 A 连接到托管在 B 上的 kafka.我有密钥表.和 krb5.这两个都是在envt中设置的.krb5.ini(并设置为envt变量KRB5_CONFIG) [记录]默认值 = 控制台admin_server = 控制台kdc = 控制
..
我已经开始学习Kafka了.尝试对其进行基本操作.我一直坚持关于“经纪人"的观点. 我的 kafka 正在运行,但是当我想创建一个分区时. from kafka import TopicPartition(错误)消费者 = KafkaConsumer(bootstrap_servers='localhost:1234')consumer.assign([TopicPartition('f
..
我们编写了一个java客户端来向kafka发布消息.代码如下图 Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "202.xx.xx.xxx:9092");props.setProperty(ProducerConfig.METADATA_FETCH_TI
..
我在我的应用程序中使用了 Kafka 1.0.1,并且我已经开始使用 0.11 中引入的幂等生产者功能,但我在使用幂等功能时无法理解排序保证. 我的生产者的配置是: enable.idempotence = true max.in.flight.requests.per.connection = 5 重试 = 50 acks = all 根据文档: 重试
..
我在 ec2 上的一台机器上设置了一个 kafka zookeeper 和 3 个代理,端口为 9092..9094,我正在尝试使用另一台机器上的主题内容.端口 2181 (zk)、9092、9093 和 9094(服务器)对消费者机器开放.我什至可以做一个 bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remo
..
我正在使用带有 Spring-boot 的 Kafka: Kafka Producer 类: @Service公共类 MyKafkaProducer {@自动连线私人 KafkaTemplate卡夫卡模板;私有静态记录器 LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);//发信息public void
..
我想在开始生产和消费作业之前确保 kafka 服务器是否正在运行.它在windows环境中,这是我在eclipse中的kafka服务器代码... Properties properties = new Properties();properties.setProperty("broker.id", "1");properties.setProperty(“端口",“9092");properti
..
我正在尝试在 Kafka 中加载一个简单的文本文件而不是标准输入.下载Kafka后,我执行了以下步骤: 启动zookeeper: bin/zookeeper-server-start.sh config/zookeeper.properties 启动服务器 bin/kafka-server-start.sh config/server.properties 创建了一个
..
KeyedMessagekeyedMessage = new KeyedMessage(request.getRequestTopicName(), SerializationUtils.serialize(message));生产者.send(keyedMessage); 目前,我正在发送没有任何密钥的消息作为密钥消息的一部分,它是否仍然适用于 delete.retention.ms?我是否需
..
当我们生成消息时,我们可以定义一个回调,这个回调可能会出现异常: kafkaProducer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) {如果(e == null){//好的} 别的 {//不好}}}); 考虑到生产者内置的重
..
请帮助我想知道为什么 kafka 生产者总是连接到本地主机,但是代理 ip 不是本地主机.那么,有什么帮助吗?有什么想法吗? import org.apache.kafka.clients.producer.KafkaProducer;导入 org.apache.kafka.clients.producer.ProducerConfig;导入 org.apache.kafka.clients.p
..
只有很少的序列化程序可用,例如 org.apache.kafka.common.serialization.StringSerializer 我们如何创建自己的自定义序列化程序? 解决方案 这里有一个示例,可以将您自己的序列化器/反序列化器用于 Kafka 消息值.对于 Kafka 消息密钥是一样的. 我们希望将 MyMessage 的序列化版本作为 Kafka 值发送,并再次将
..
我开始学习Kafka,在阅读的过程中,我想到了一些问题: 当生产者生成消息时 - 它会指定要将消息发送到的主题,对吗?它关心分区吗? 当订阅者正在运行时 - 它是否指定了它的组 ID,以便它可以成为同一主题或该组消费者感兴趣的多个主题的一组消费者的一部分? 每个消费者组在broker上有对应的分区还是每个消费者都有一个分区? 分区是否由代理创建,因此消费者不关心? 既
..
我使用 apache camel kafka 作为客户端来生成消息,我观察到 kafka 生成器需要 1 毫秒来推送消息,如果我使用骆驼聚合将消息合并到批处理中,那么推送一条消息需要 100 毫秒. 安装简述3 卡夫卡集群 16 核 32GB 内存 示例代码 String endpoint="kafka:test?topic=test&brokers=nodekfa:9092,no
..
考虑到一系列不同的事件,推荐的方法是 一个包含所有事件的大主题 针对不同类型活动的多个主题 哪个选项会更好? 我了解消息不在主题的同一分区中,这意味着没有顺序保证,但是在进行此操作时是否有任何其他要考虑的因素决定? 解决方案 主题是一个逻辑抽象,应该包含相同类型的消息.假设您监控网站并捕获点击流事件,另一方面,您有一个数据库将其更改填充到更改日志主题中.您应该有两个不同
..
将同步副本配置为 Acks = all 和 min.insync.replicas = N , 想了解应如何为未处理的生产者记录的消息/记录配置重试 示例:当Kafka在处理过程中无法通过ISR在线处理记录时,记录为N-1个,而最低配置的ISR为N个副本. 解决方案 什么是 acks ? acks参数控制在生产者认为写入成功之前必须有多少个分区副本接收记录. acks参
..
我正在使用一个普通的Java项目来运行(无框架)Kafka生产者和使用者. 我正在尝试控制 KafkaProducer 和 KafkaConsumer 代码生成的日志,并且无法使用 log4j.properties 配置对其进行影响: log4j.rootLogger = ERROR,stdoutlog4j.logger.kafka =错误,标准输出log4j.logger.org.ap
..
正如我所见,Kafka模板内部使用了Kafka生产者.我只想知道确切的区别是什么.此外,与Kafka生产者相比,我在Kafka模板中发现了许多send()方法. 请帮我.如果有人知道更多. 解决方案 生产者是模式,而KafkaTemplate包装了Producer实例,并提供了将消息发送到Kafka主题的便捷方法.(源) Kafka 生产者是在Apache Kafka中定义的. K
..