kafka-producer-api相关内容
我有Windows环境,并且运行着自己的kafka和zookeeper.为了使用自定义对象,我开始使用Avro.但是我需要启动注册表.下载Confluent平台并运行它: $ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties /c/Confluent/confluent-3.0.0-2.1
..
我正在使用Kafka生产者客户端,我的项目中没有任何log4j配置. 在运行时,程序会打印很多我真的不想要的Kafka调试日志. 因此,我尝试添加一个log4j.properties来将日志级别设置为ERROR,如下所示,这似乎不起作用: log4j.rootLogger=ERROR 我如何更改Kafka日志级别? 解决方案 在运行客户端时使用命令行标志-Dlog4j
..
我正在尝试循环加载数据文件(以检查统计信息),而不是在Kafka中加载标准输入.下载Kafka之后,我执行了以下步骤: 开始了动物园管理员: bin/zookeeper-server-start.sh config/zookeeper.properties 启动服务器: bin/kafka-server-start.sh config/server.properties
..
我有一个用例,其中有一个JSON,我想生成模式并从JSON中记录并发布记录. 我已经配置了值序列化程序,并且Schema设置是向后兼容的. 第一个JSON 字符串json ="{\ n" + " \"id\": 1,\n" + " \"name\": \"Headphones\",\n" + " \"price\": 1250.
..
我将Spring用于Apache Kafka,并创建了一个服务,该服务通过Spring的KafkaTemplate使用Kafka Producer(org.apache.kafka.clients.producer)将消息发送到主题.在目标Kafka群集上,我禁用了自动主题创建.使用此处列出的生产者配置组合 https://kafka.apache.org/documentation/#produ
..
这个问题是关于架构和kafka主题的迁移. 原始问题:没有向后兼容性的架构演进. https://docs.confluent.io/current/schema-registry /avro.html 我正在要求社区提供建议或分享文章,从中我可以得到启发,甚至可以考虑解决问题的方法.也许有架构或流模式.不必为我提供特定于语言的解决方案;只是给我一个方向,我可以去...我的问题很
..
我想出了一个例外: ERROR yarn.ApplicationMaster:用户类引发异常: org.apache.spark.SparkException:任务无法序列化 org.apache.spark.SparkException:无法在以下位置序列化任务 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(Closu
..
请帮忙,我想知道为什么kafka生产者总是连接到本地主机,但是那里的代理IP不是本地主机.那么,有什么帮助吗?有什么想法吗? import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kaf
..
我对Kafka还是很陌生,所以请多多包涵.这是我的设置. 我在统一框上托管了kafka.聚集.在域中说B. 客户端在Windows上.并尝试使用域A连接到托管在B上的kafka. 我有密钥表.和krb5.这两个都在环境中设置. krb5.ini(并设置为envt变量KRB5_CONFIG) [logging] default = CONSOLE admin_server = CONSO
..
当我使用Kafka的Java API时,如果我的主线程睡眠少于2000ns,它将无法发出任何消息.我真的想知道为什么会发生这种情况吗? 这是我的制片人: public class Producer { private final KafkaProducer producer; private final String topic;
..
我遇到的情况是不同的行为.共有3种不同的服务 第一个服务将从Solace队列中侦听并将其生成给kafka topic-1(启用交易的地方) 第二个服务将从kafka topic-1上方侦听并将其写入另一个kafka topic-2(在这里我们没有手动提交,交易 能够产生其他主题,自动提交偏移量为false& Isolation.level设置为read_commited) 之前删除 第三
..
我想从套接字获取数据并将其放入kafka主题,以便我的flink程序可以从该主题读取数据并对其进行处理.我可以在一个节点上做到这一点.但是我想拥有一个至少有三个不同节点(不同的IP地址)的kafka集群,并从套接字轮询数据以在节点之间分配它.我不知道该怎么做并更改此代码.我的简单程序如下: public class WordCount { public static void mai
..
我正在尝试找到获取我的kafka集群当前使用情况统计信息的方法.我希望收集以下信息: kafka集群中的主题数 每个kafka经纪人的分区数 活跃的消费者和生产者的数量 每个kafka代理的客户端连接数 每个分区上的消息数,磁盘大小等. 复制品滞后,消费者滞后等. 活跃的消费群体 可以并且应该收集的任何其他统计信息,目前我正在收集上述统计信息. 我可以使用Zookee
..
我在此链接上执行了第7步(使用Kafka Connect导入/导出数据): http://kafka.apache.org/documentation.html#quickstart 在删除"test.txt"文件之前,它一直运行良好.主要是因为这就是log4j文件的工作方式.一段时间后,文件将旋转-我的意思是-文件将被重命名&具有相同名称的新文件将开始被写入. 但是,之后,我删
..
这是我早期学习卡夫卡的日子.而且我正在检查本地计算机上的每个kafka属性/概念. 因此,我遇到了此属性min.insync.replicas,这是我的理解.如果我误解了任何内容,请纠正我. 一旦将消息发送到主题,该消息必须至少写入min.insync.replicas个关注者. min.insync.replicas还包括领导者. 如果可用的实时经纪人(间接同步复制中的 )数量少
..
这是我先前的问题 我正在尝试kafka的min.insync.replicas,这是摘要: 在本地设置3个代理,用min.insync.replicas=2创建一个主题insync. 消息由 kafka-console-producer 和acks=all产生,并由 kafka-console-consumer 阅读 买断了2个经纪人,仅剩下1个insync.replicas,并期望生
..
我对kafka ACL配置感到困惑,在该配置中我们为生产者和消费者配置了授权. 有各种示例显示使用命令行生成/使用消息. 我们是否需要任何额外的配置来使用JAVA api来/从安全的kafka主题生成/使用消息. 解决方案 如果您想了解安全的Kafka服务器的配置详细信息,请此处.
..
我有一个在单节点kubernetes环境上的pod内运行的单节点Kafka代理.我将这张图片用于kafka: https://hub.docker.com/r/wurstmeister/卡夫卡 kafka版本 = 1.1.0 Kubernetes集群在服务器上的VM内部运行. VM在活动接口ens32上具有以下IP-192.168.3.102 Kafka.yaml apiV
..
说,我发布并使用了不同类型的Java对象,对于每个对象我都必须定义自己的序列化程序实现. 我们如何在kafka消费者/生产者属性文件中的"serializer.class"属性下提供所有实现? 解决方案 我们具有类似的设置,在不同主题中具有不同的对象,但在一个主题中始终具有相同的对象类型.我们使用 ByteArrayDeserializer Java API 0.9.0.1附带,这意味着或
..
我最近开始使用Kafka,并在少数用例中对Kafka进行了评估. 如果我们想提供基于消息内容为消费者(订户)过滤消息的功能,什么是最好的方法? 说生产者暴露了一个名为“交易"的主题,该主题具有不同的交易详细信息,例如市场名称,创建日期,价格等. 一些消费者对特定市场的交易感兴趣,而另一些消费者对特定日期等之后的交易感兴趣.(基于内容的过滤) 由于经纪人方面无法进行过滤,因此
..