如何将mongo-kafka连接器作为kafka的源运行,并将其与logstash输入集成以将elasticsearch用作接收器? [英] How to run the mongo-kafka connector as a source for kafka and integrate that with logstash input to use elasticsearch as a sink?

查看:124
本文介绍了如何将mongo-kafka连接器作为kafka的源运行,并将其与logstash输入集成以将elasticsearch用作接收器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经创建了 https://github.com/mongodb/mongo-kafka

但这如何与我正在运行的kafka实例连接.

But how does this run to connect with my running kafka instance.

这个问题听起来多么愚蠢.但是似乎没有可用的文档来使此代码与在本地运行的 mongodb replicaset 一起使用.

Even how stupid this question sound. But there is no documentation seems to be available to make this working with locally running replicaset of mongodb.

所有博客都指向使用mongo地图集.

All blogs point to using mongo atlas instead.

如果您有足够的资源,请指导我.

If you have a good resource, please guide me towards it.

更新1-

使用的maven插件- https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect

Used maven plugin - https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect

将其放入kafka插件中,然后重新启动kafka.

Placed it in kafka plugins, restarted kafka.

更新2-如何启用mongodb作为kafka的源代码?

UPDATE 2 -- How to enable mongodb as source for kafka?

https://github.com/mongodb/mongo-kafka/blob/master/config/MongoSourceConnector.properties

文件用作Kafka的配置

file to be used as a configuration for Kafka

bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties

更新3-以上方法在返回

UPDATE 3 - The above method hasn't worked going back to the blog which does not mention what the port 8083 is.

安装了Confluent和confluent-hub,仍然不确定mongo-connector是否与kafka一起使用.

Installed Confluent and confluent-hub, still unsure of the mongo-connector working with kafka.

更新4-

Zookeeper,Kafka服务器,Kafka连接正在运行

Zookeeper, Kafka Server, Kafka connect running

Mongo Kafka库文件 Kafka Connect Avro连接器库文件

使用以下命令,我的源代码可以正常工作了

Using below commands my source got working -

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties

使用以下配置进行logstash,我能够将数据推送到elasticsearch-

And using below configuration for logstash I was able to push data into elasticsearch -

input {
  kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["users","organisations","skills"]
  }
}
output {
  elasticsearch {
        hosts => ["localhost:9200"]
  }
  stdout { codec => rubydebug }
}

因此,现在一个MongoSourceConnector.properties保留了从中读取的单个集合名称,我需要为每个集合使用不同的属性文件运行kafka connect.

So now one MongoSourceConnector.properties keeps a single collection name it reads from, I need to run kafka connect with different property files for each collection.

我的Logstash将新数据推送到elasticsearch中,而不是更新旧数据.另外,它不会根据集合名称创建索引.想法是这应该能够与我的MongoDB数据库完美同步.

My Logstash is pushing new data into elasticsearch, instead of updating old data. Plus it is not creating indexes as per the name of the collection. The idea is this should be able to sync with my MongoDB Database perfectly.

最终更新-现在一切正常,

FINAL UPDATE - Everything is now working smoothly,

  • 为kafka connect创建了多个属性文件
  • 最新的logstash实际上根据主题名称创建索引,并相应地更新索引

input {
    kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["users","organisations","skills"]
    }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
        codec =>
        rubydebug {
            metadata => true
        }
    }
}

推荐答案

成功使MongoDb与Elasticsearch同步的步骤-

Steps to successfully get MongoDb syncing with Elasticsearch -

  • 首先部署mongodb副本-
//Make sure no mongo deamon instance is running
//To check all the ports which are listening or open
sudo lsof -i -P -n | grep LISTEN 

//Kill the process Id of mongo instance
sudo kill 775

//Deploy replicaset
mongod --replSet "rs0" --bind_ip localhost --dbpath=/data/db

  • 为Kafka创建配置属性
  • //dummycollection.properties <- Filename
    name=dummycollection-source
    connector.class=com.mongodb.kafka.connect.MongoSourceConnector
    tasks.max=1
    
    # Connection and source configuration
    connection.uri=mongodb://localhost:27017
    database=dummydatabase
    collection=dummycollection
    copy.existing=true
    topic.prefix=
    poll.max.batch.size=1000
    poll.await.time.ms=5000
    
    # Change stream options
    publish.full.document.only=true
    pipeline=[]
    batch.size=0
    collation=
    

    • 确保url以下的JAR文件可用于您的kafka插件-
    • Maven中央存储库搜索

      Kafka Connect Avro Converter

      • 部署kafka
      //Zookeeper
      bin/zookeeper-server-start.sh config/zookeeper.properties
      
      //Kaka Server
      bin/kafka-server-start.sh config/server.properties
      
      //Kaka Connect
      bin/connect-standalone.sh config/connect-standalone.properties config/dummycollection.properties
      

      • 配置Logstash-
      • // /etc/logstash/conf.d/apache.conf  <- File 
        input {
          kafka {
                bootstrap_servers => "localhost:9092"
                decorate_events => true
                topics => ["dummydatabase.dummycollection"]
          }
        }
        filter {
            json {
                source => "message"
                target => "json_payload"
            }
        
            json {
                source => "[json_payload][payload]"
                target => "payload"
            }
        
            mutate {
                add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
                rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
                rename => { "[payload][fullDocument]" => "document"}
                remove_field => ["message","json_payload","payload"]
            }
        }
        output {
            elasticsearch {
                hosts => ["localhost:9200"]
                index => "%{es_index}"
                action => "update"
                doc_as_upsert => true
                document_id => "%{mongo_id}"
            }
            stdout {
              codec =>
                rubydebug {
                    metadata => true
                }
            }
        }
        

        • 启动ElasticSearch,Kibana和Logstash
        • sudo systemctl start elasticsearch
          sudo systemctl start kibana
          sudo systemctl start logstash
          

          • 测试
          • 打开Mongo罗盘,然后

            Open Mongo Compass, and

            • 创建一个集合,在logstash主题中提及这些集合,并为Kafka创建属性文件
            • 向其中添加数据
            • 更新数据

            在Elasticsearch中查看索引

            Review indexes in Elasticsearch

            这篇关于如何将mongo-kafka连接器作为kafka的源运行,并将其与logstash输入集成以将elasticsearch用作接收器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆