从ElasticSearch提取到Kafka,使用logstash连续不断地进行任何新的ES更新 [英] Extract from ElasticSearch, into Kafka, continuously any new ES updates using logstash

查看:798
本文介绍了从ElasticSearch提取到Kafka,使用logstash连续不断地进行任何新的ES更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有多个索引的ES集群,它们都在随机的时间间隔内接收更新.我有一个logstash实例,它从ES提取数据并将其传递到Kafka.

I have an ES cluster with multiple indices that all receive updates in random time intervals. I have a logstash instance extracting data from ES and passing it into Kafka.

什么是每分钟运行一次并在ES中获取任何更新的好方法?

What would be a good method to run this every minute and pickup any updates in ES?

Conf:

 input {
   elasticsearch {
     hosts => [ "hostname1.com:5432", "hostname2.com" ]
     index => "myindex-*"
     query => "*"
     size => 10000
     scroll => "5m"
   }
 }
 output {
   kafka {
     bootstrap-servers => "abc-kafka.com:1234"
     topic_id => "my.topic.test"
   }
 }

我想在查询中使用文档@timestamp并将其保存在临时文件中,然后重新运行查询(使用时间表)并获取最新的更新/插入(例如 jdbc输入插件)

I would like to use the documents @timestamp in a query and save it in a temp file, then rerun query (with a schedule) and get the latest updates/insert (something like what the jdbc-input plugin of logstash supports)

有什么想法吗?

提前谢谢

推荐答案

有人问同样的事情,但是该问题并没有带来太多的访问量.您可以对其+1.

Someone asked the same thing a few months ago but that issue didn't get much traffic. You can +1 it, maybe.

同时,您可以将elasticsearch输入中的query修改为:

In the meantime, you could modify the query in your elasticsearch input to be like this:

query => '{"query":{"range":{"timestamp":{"gt": "now-1m"}}}}'

即您查询所有timestamp字段(任意名称,更改为与您匹配的字段)在过去一分钟之内的文档

i.e. you query all documents whose timestamp field (arbitrary name, change to match yours) is within the past minute

然后,您需要设置一个cron,它将每分钟启动一次logstash进程.现在,由于触发cron的时间,logstash开始运行的时间与查询到达ES服务器端的时间之间的延迟,只知道1m可能不够用,并且您可能会丢失一些文档.您需要对此进行测试,然后找出最合适的.

Then you need to setup a cron that will start your logstash process every minute. Now due to the latency between the moment the cron is triggered, the moment logstash starts running and the moment the query arrives on the ES server side, just know that 1m might not be sufficient and you risk missing some docs. You need to test this and find out which is best.

根据此最近的博客帖子,另一种方法可以记录Logstash上一次在环境变量LAST_RUN中运行的时间,并在查询中使用该变量:

According to this recent blog post, another way could be to record the last time Logstash ran in an environment variables LAST_RUN and use that variable in the query:

query => '{"query":{"range":{"timestamp":{"gt": "${LAST_RUN}"}}}}'

在这种情况下,您将创建一个由cron运行的shell脚本,该脚本基本上可以执行以下操作:

In this scenario, you'd create a shell script that is run by a cron and that does basically this:

  1. 运行logstash -f your_config_file.conf
  2. 完成后,设置LAST_RUN=$(date +"%FT%T")
  1. run logstash -f your_config_file.conf
  2. when done, set LAST_RUN=$(date +"%FT%T")

这篇关于从ElasticSearch提取到Kafka,使用logstash连续不断地进行任何新的ES更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆