如何运行 mongo-kafka 连接器作为 kafka 的源并将其与 logstash 输入集成以使用 elasticsearch 作为接收器? [英] How to run the mongo-kafka connector as a source for kafka and integrate that with logstash input to use elasticsearch as a sink?
问题描述
我创建了一个 https://github.com/mongodb/mongo-kafka一个>
但是它如何运行以连接我正在运行的 kafka 实例.
But how does this run to connect with my running kafka instance.
这个问题听起来多么愚蠢.但是似乎没有可用的文档可以使此操作与本地运行的 mongodb
的 replicaset
一起使用.
Even how stupid this question sound. But there is no documentation seems to be available to make this working with locally running replicaset
of mongodb
.
所有博客都指向使用 mongo atlas.
All blogs point to using mongo atlas instead.
如果你有好的资源,请指导我.
If you have a good resource, please guide me towards it.
更新 1 --
使用的maven插件 - https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
Used maven plugin - https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
放在kafka插件中,重启kafka.
Placed it in kafka plugins, restarted kafka.
更新 2 -- 如何启用 mongodb 作为 kafka 的源?
UPDATE 2 -- How to enable mongodb as source for kafka?
https://github.com/mongodb/mongo-kafka/blob/master/config/MongoSourceConnector.properties
用作 Kafka 配置的文件
file to be used as a configuration for Kafka
bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties
更新 3 - 回到 博客 没有提到 8083 端口是什么.
UPDATE 3 - The above method hasn't worked going back to the blog which does not mention what the port 8083 is.
安装了 Confluent 和 confluent-hub,仍然不确定 mongo-connector 与 kafka 一起工作.
Installed Confluent and confluent-hub, still unsure of the mongo-connector working with kafka.
更新 4 -
Zookeeper、Kafka Server、Kafka 连接运行
Zookeeper, Kafka Server, Kafka connect running
Mongo Kafka 库文件Kafka Connect Avro 连接器库文件
使用下面的命令我的源代码开始工作 -
Using below commands my source got working -
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties
使用下面的logstash配置我能够将数据推送到elasticsearch -
And using below configuration for logstash I was able to push data into elasticsearch -
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["users","organisations","skills"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout { codec => rubydebug }
}
所以现在一个 MongoSourceConnector.properties 保留它读取的单个集合名称,我需要为每个集合运行 kafka connect 和不同的属性文件.
So now one MongoSourceConnector.properties keeps a single collection name it reads from, I need to run kafka connect with different property files for each collection.
我的 Logstash 将新数据推送到 elasticsearch,而不是更新旧数据.另外,它不会根据集合的名称创建索引.我的想法是这应该能够与我的 MongoDB 数据库完美同步.
My Logstash is pushing new data into elasticsearch, instead of updating old data. Plus it is not creating indexes as per the name of the collection. The idea is this should be able to sync with my MongoDB Database perfectly.
最终更新 - 现在一切正常,
FINAL UPDATE - Everything is now working smoothly,
- 为 kafka 连接创建了多个属性文件
- 最新的logstash实际上根据主题名称创建索引,并相应地更新索引
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["users","organisations","skills"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}
推荐答案
成功让 MongoDb 与 Elasticsearch 同步的步骤 -
Steps to successfully get MongoDb syncing with Elasticsearch -
- 首先部署mongodb Replica -
//Make sure no mongo deamon instance is running
//To check all the ports which are listening or open
sudo lsof -i -P -n | grep LISTEN
//Kill the process Id of mongo instance
sudo kill 775
//Deploy replicaset
mongod --replSet "rs0" --bind_ip localhost --dbpath=/data/db
- 为 Kafka 创建配置属性
//dummycollection.properties <- Filename
name=dummycollection-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=dummydatabase
collection=dummycollection
copy.existing=true
topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000
# Change stream options
publish.full.document.only=true
pipeline=[]
batch.size=0
collation=
- 确保来自以下网址的 JAR 文件可用于您的 kafka 插件-
- 部署 kafka
//Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
//Kaka Server
bin/kafka-server-start.sh config/server.properties
//Kaka Connect
bin/connect-standalone.sh config/connect-standalone.properties config/dummycollection.properties
- 配置 Logstash -
// /etc/logstash/conf.d/apache.conf <- File
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["dummydatabase.dummycollection"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}
- 启动 ElasticSearch、Kibana 和 Logstash
sudo systemctl start elasticsearch
sudo systemctl start kibana
sudo systemctl start logstash
- 测试
- 创建一个集合,在logstash主题中提及这些集合并为Kafka创建属性文件
- 添加数据
- 更新数据
打开 Mongo Compass,然后
Open Mongo Compass, and
查看 Elasticsearch 中的索引
Review indexes in Elasticsearch
这篇关于如何运行 mongo-kafka 连接器作为 kafka 的源并将其与 logstash 输入集成以使用 elasticsearch 作为接收器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!