apache-kafka相关内容
我已经用SASL+ACL设置了Kafka和ZooKeeper身份验证,Kafka通过包含加密的SSL双向身份验证设置到生产者和消费者。 通过在Kafka和ZooKeeper之间启用SASL和ACL,不允许未经授权的Kafka Broker登录到ZooKeeper集群。但是,主题的创建和删除仍然可以不受任何限制。 zookeeper.properties dataDir=/x02/
..
我正在使用@KafkaListener注释编写一个Kafka使用者,我知道有一种方法可以使用ConCurentKafkaListenerContainerFactory中的方法增加来自不同分区的并发Kafka使用者的数量 e.g. factory.setConcurrency(3); setconency的Javadoc如下所示:- 并发运行的KafkaMessageListene
..
我在我的生产服务器中使用Kafka0.10.0和ZooKeeper3.4.6。我有20个主题,每个主题大约有50个分区。我总共有100个用户,每个用户都订阅了不同的主题和分区。所有用户都有相同的groupID。那么,如果为特定主题添加或删除消费者,那么附加到不同主题的消费者也将经历重新平衡吗? 我的消费者代码是: public static void main(String[] arg
..
我正在尝试用ApacheKafka替换兔子MQ,在规划时,我遇到了几个概念性规划问题。 首先,我们对每用户队列策略使用Rabb MQ,这意味着每个用户使用一个队列。这符合我们的需要,因为每个用户代表要与该特定用户一起完成的一些工作,并且如果该用户导致问题,则队列对于其他用户永远不会有问题,因为队列是分开的(问题意味着队列中的消息将使用http请求被分派给用户。如果用户拒绝接收消息(服务器可能会关
..
我已经通过docker安装了Kafka。 当我运行docker-compose up命令时,我遇到以下错误: [2022-02-28 08:13:24,185] INFO Awaiting socket connections on localhost:9092. (kafka.network.Acceptor) kafka | [2022-02-28 08:13:24,2
..
Q1)以下是我在为MySQL源创建Kafka连接器时使用的配置。 { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "snapshot.locking.mode": "minimal", "database.user": "cdc_user", "tasks.max": "3", "dat
..
我在AWS MSK上有两个Kafka集群(在相同的环境和区域)。我在目标集群上设置了KafkaConnect集群,并设置了Mirror Maker连接器以运行。连接器提交正常,没有错误。 当我尝试检查连接器的状态时,它显示正在运行: {"name":"mirror-maker-test-connector","connector":{"state":"RUNNING","worker_id":"
..
我有一个安装了CDC的Postgres数据库。 我部署了Kafka Debezium连接器1.8.0。Postgres DB的最终版本 发布http://localhost:8083/connectors 正文: { "name": "postgres-kafkaconnector", "config": { "connector.class
..
这是架构-注册表.属性的配置 listeners=http://10.X.X.76:8081 kafkastore.bootstrap.servers=PLAINTEXT://10.XXX:9092,PLAINTEXT://10.XXX:9092,PLAINTEXT://10.XXXX.1:9092,PLAINTEXT://1XXXX.69:9092 kafkastore.topic=_sc
..
我正在使用我找到的Github资源库和文件夹路径:https://github.com/entechlog/kafka-examples/tree/master/kafka-connect-standalone以独立模式在本地运行Kafka Connect。我对Docker合成文件做了一些更改,但主要是与身份验证有关的更改。 我现在遇到的问题是,当我运行Docker镜像时,对于每个分区(有1
..
$ curl localhost:8083/connectors/file-stream-demo-distributed/status {"name":"file-stream-demo-distributed","connector":{"state":"RUNNING","worker_id":"XX.XX.XX.XXX:8083"},"tasks":[{"id":0,"state":"RU
..
首先感谢@OneCricketeer到目前为止的支持。到目前为止,我已经尝试了这么多配置,我不知道还可以尝试什么。 使用合流connect-standalone worker.properties sink.properties访问外部流。 连接正常,我可以看到已加载偏移量: INFO[MY_MYSQL_SINK|TASK-0][消费者客户端ID=连接器-消费者-MY_MYSQL
..
我需要知道卡夫卡是如何掩盖秘密的。机密包括用户名、密码、服务ID和apiKey等。我不想记录此类信息。 推荐答案 对于apache-kafka-connect可以使用秘密外部化。此功能在apache-kafka2.0.0至KIP-297中实现。 简短示例: 将这些属性添加到connect-distributed.properties文件: config.provide
..
我想将实时数据从SQL SERVER直接添加到Kafka,我发现有https://debezium.io/docs/connectors/sqlserver/ 提供的SQL SERVER连接器 在文档中,它说将为每个表创建一个主题。我正在努力了解体系结构,因为我有500个客户端,这意味着我有500个数据库,每个数据库都有500个表。这是否意味着它将创建250000个主题,或者我是否需要为每个客
..
我正在尝试使用Kafka Connect使用JDBC源和宿连接器将数据从旧的DB2数据库同步到Postgres数据库。它工作得很好,但前提是我必须严格控制用于表名的大小写。 例如,我在DB2中有一个名为action的表,它也存在于postgres中,具有相同的列,等等。唯一的区别是在DB2中它是大写的ACTION,而在postgres中它是小写的action。 以下是一个有效的接收器文
..
我们在一个项目上使用Kafka Connect已经有一段时间了,目前完全只使用Confluent Kafka Connect JDBC connector。我很难理解“任务”在Kafka Connect中的作用,特别是这个连接器。我理解“连接器”;它们包含一系列关于特定源/宿的配置以及它们所连接的源/目标主题。我了解连接器和任务之间存在1:Many关系,以及任务用于并行化工作的一般原则。但是,我们
..
我在使用融合JDBC连接器时遇到了非常奇怪的行为。我非常肯定它与融合堆栈无关,而是与Kafka-Connect框架本身有关。 因此,我将offset.storage.file.filename属性定义为默认/tmp/connect.offsets并运行我的接收器连接器。显然,我希望连接器持久化给定文件中的偏移量(它不存在于文件系统中,但它应该是自动创建的,对吗?)文档显示: offset.
..
我正尝试在批量模式下使用具有以下属性的Kafka Connect JDBC源连接器。 connector.class=io.confluent.connect.jdbc.JdbcSourceConnector timestamp.column.name=timestamp connection.password=XXXXX validate.non.null=false tasks.max=
..
我从最初的Kafka MirrorMaker迁移到MirrorMaker 2.0,以便将主题从一个集群复制到另一个集群。我正在运行所述的专用MirrorMaker群集in the docs。 假设我正在复制一个名为test-topic的主题。 Cluster A Cluster B ---------- ---------- test-topic ---> A.t
..
我要从一个系统实时传输到另一个系统。 我正在使用kafka-python,可以在本地直播。 计算出连接器将处理多个设备。有没有人能给我提个建议,让我用连接符来实现它? 推荐答案 Kafka Connect是Java框架,不是Python. Kafka Connect运行REST API,您可以使用urllib3或requests与其交互,而不是kafka-python
..