如何运行 mongo-kafka 连接器作为 kafka 的源并将其与 logstash 输入集成以使用 elasticsearch 作为接收器? [英] How to run the mongo-kafka connector as a source for kafka and integrate that with logstash input to use elasticsearch as a sink?

查看:21
本文介绍了如何运行 mongo-kafka 连接器作为 kafka 的源并将其与 logstash 输入集成以使用 elasticsearch 作为接收器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个 https://github.com/mongodb/mongo-kafka

但是它如何运行以连接我正在运行的 kafka 实例.

But how does this run to connect with my running kafka instance.

这个问题听起来多么愚蠢.但是似乎没有可用的文档可以使此操作与本地运行的 mongodbreplicaset 一起使用.

Even how stupid this question sound. But there is no documentation seems to be available to make this working with locally running replicaset of mongodb.

所有博客都指向使用 mongo atlas.

All blogs point to using mongo atlas instead.

如果你有好的资源,请指导我.

If you have a good resource, please guide me towards it.

更新 1 --

使用的maven插件 - https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect

Used maven plugin - https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect

放在kafka插件中,重启kafka.

Placed it in kafka plugins, restarted kafka.

更新 2 -- 如何启用 mongodb 作为 kafka 的源?

UPDATE 2 -- How to enable mongodb as source for kafka?

https://github.com/mongodb/mongo-kafka/blob/master/config/MongoSourceConnector.properties

用作 Kafka 配置的文件

file to be used as a configuration for Kafka

bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties

更新 3 - 回到 博客 没有提到 8083 端口是什么.

UPDATE 3 - The above method hasn't worked going back to the blog which does not mention what the port 8083 is.

安装了 Confluent 和 confluent-hub,仍然不确定 mongo-connector 与 kafka 一起工作.

Installed Confluent and confluent-hub, still unsure of the mongo-connector working with kafka.

更新 4 -

Zookeeper、Kafka Server、Kafka 连接运行

Zookeeper, Kafka Server, Kafka connect running

Mongo Kafka 库文件Kafka Connect Avro 连接器库文件

使用下面的命令我的源代码开始工作 -

Using below commands my source got working -

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties

使用下面的logstash配置我能够将数据推送到elasticsearch -

And using below configuration for logstash I was able to push data into elasticsearch -

input {
  kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["users","organisations","skills"]
  }
}
output {
  elasticsearch {
        hosts => ["localhost:9200"]
  }
  stdout { codec => rubydebug }
}

所以现在一个 MongoSourceConnector.properties 保留它读取的单个集合名称,我需要为每个集合运行 kafka connect 和不同的属性文件.

So now one MongoSourceConnector.properties keeps a single collection name it reads from, I need to run kafka connect with different property files for each collection.

我的 Logstash 将新数据推送到 elasticsearch,而不是更新旧数据.另外,它不会根据集合的名称创建索引.我的想法是这应该能够与我的 MongoDB 数据库完美同步.

My Logstash is pushing new data into elasticsearch, instead of updating old data. Plus it is not creating indexes as per the name of the collection. The idea is this should be able to sync with my MongoDB Database perfectly.

最终更新 - 现在一切正常,

FINAL UPDATE - Everything is now working smoothly,

  • 为 kafka 连接创建了多个属性文件
  • 最新的logstash实际上根据主题名称创建索引,并相应地更新索引

input {
    kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["users","organisations","skills"]
    }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
        codec =>
        rubydebug {
            metadata => true
        }
    }
}

推荐答案

成功让 MongoDb 与 Elasticsearch 同步的步骤 -

Steps to successfully get MongoDb syncing with Elasticsearch -

  • 首先部署mongodb Replica -
//Make sure no mongo deamon instance is running
//To check all the ports which are listening or open
sudo lsof -i -P -n | grep LISTEN 

//Kill the process Id of mongo instance
sudo kill 775

//Deploy replicaset
mongod --replSet "rs0" --bind_ip localhost --dbpath=/data/db

  • 为 Kafka 创建配置属性
  • //dummycollection.properties <- Filename
    name=dummycollection-source
    connector.class=com.mongodb.kafka.connect.MongoSourceConnector
    tasks.max=1
    
    # Connection and source configuration
    connection.uri=mongodb://localhost:27017
    database=dummydatabase
    collection=dummycollection
    copy.existing=true
    topic.prefix=
    poll.max.batch.size=1000
    poll.await.time.ms=5000
    
    # Change stream options
    publish.full.document.only=true
    pipeline=[]
    batch.size=0
    collation=
    

    • 确保来自以下网址的 JAR 文件可用于您的 kafka 插件-
    • Maven 中央存储库搜索

      Kafka Connect Avro 转换器

      • 部署 kafka
      //Zookeeper
      bin/zookeeper-server-start.sh config/zookeeper.properties
      
      //Kaka Server
      bin/kafka-server-start.sh config/server.properties
      
      //Kaka Connect
      bin/connect-standalone.sh config/connect-standalone.properties config/dummycollection.properties
      

      • 配置 Logstash -
      • // /etc/logstash/conf.d/apache.conf  <- File 
        input {
          kafka {
                bootstrap_servers => "localhost:9092"
                decorate_events => true
                topics => ["dummydatabase.dummycollection"]
          }
        }
        filter {
            json {
                source => "message"
                target => "json_payload"
            }
        
            json {
                source => "[json_payload][payload]"
                target => "payload"
            }
        
            mutate {
                add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
                rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
                rename => { "[payload][fullDocument]" => "document"}
                remove_field => ["message","json_payload","payload"]
            }
        }
        output {
            elasticsearch {
                hosts => ["localhost:9200"]
                index => "%{es_index}"
                action => "update"
                doc_as_upsert => true
                document_id => "%{mongo_id}"
            }
            stdout {
              codec =>
                rubydebug {
                    metadata => true
                }
            }
        }
        

        • 启动 ElasticSearch、Kibana 和 Logstash
        • sudo systemctl start elasticsearch
          sudo systemctl start kibana
          sudo systemctl start logstash
          

          • 测试
          • 打开 Mongo Compass,然后

            Open Mongo Compass, and

            • 创建一个集合,在logstash主题中提及这些集合并为Kafka创建属性文件
            • 添加数据
            • 更新数据

            查看 Elasticsearch 中的索引

            Review indexes in Elasticsearch

            这篇关于如何运行 mongo-kafka 连接器作为 kafka 的源并将其与 logstash 输入集成以使用 elasticsearch 作为接收器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆