将elasticsearch 2.x转储到mongodb并返回ES 6.x [英] dump elasticsearch 2.x to mongodb and back to ES 6.x

查看:66
本文介绍了将elasticsearch 2.x转储到mongodb并返回ES 6.x的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题更多的是理论性的,而不是源代码.

This question is more of theoretical than source code.

我有一个ES 2.x节点,该节点具有超过1.2TB的数据.我们有40多个索引,每个索引至少具有1种类型.在此,ES 2.x被用作数据库而不是搜索引擎.用于将数据转储到ES 2.x的源已丢失.此外,数据未标准化,但是单个ES文档具有多个嵌入式文档.我们的目标是重新创建数据源并同时对其进行规范化.

I have a ES 2.x node which has more than 1.2TB data. We have 40+ indices with each having at-least 1 type. Here, ES 2.x is used as a database rather than as a search engine. The source which was used to dump data into ES 2.x is lost. Also, data is not normalised but a single ES document has multiple embedded documents. Our aim is to recreate the data source and at the same time to normalise it.

我们正在计划:

  1. 从ES中检索数据,对其进行分析,然后将其转储到新的mongodb中以存储到特定的集合中,并维护数据之间的关系.IE.以标准化形式保存.
  2. 在新的ES 6节点上为新的mongo数据建立索引.

我们正在使用JRuby 9.1.15.0,Rails 5,Ruby 2.4和Sidekiq.

We are using JRuby 9.1.15.0, Rails 5, Ruby 2.4 and Sidekiq.

当前,我们正在从ES中检索特定日期时间范围内的数据.有时我们收到0条记录,有时收到100000+条记录.问题是当我们收到大量记录时.

Currently, we are retrieving data from ES for a specific date-time range. Sometimes we receive 0 records and sometimes 100000+. The problem is when we receive huge number of records.

这是一个示例脚本,当日期范围的数据较小时可以使用,而数据较大时则可以使用. 1.2TB/40个索引是平均索引大小.

Here is a sample script that works when the data for a date range is small but fails when the data is large. 1.2TB/40 indices is the avg index size.

class DataRetrieverWorker
  include Sidekiq::Worker
  include Sidekiq::Status::Worker

  def perform(indx_name, interval = 24, start_time = nil, end_time = nil)
    unless start_time || end_time
      client = ElasticSearchClient.instance.client
      last_retrieved_at = RetrievedIndex.where(name: indx_name).desc(:created_at).first
      start_time, end_time = unless last_retrieved_at
                               data = client.search index: indx_name, size: 1, sort: [{ insert_time: { order: 'asc' } }]
                               first_day = DateTime.parse(data['hits']['hits'].first['_source']['insert_time'])
                               start_time = first_day.beginning_of_day
                               end_time = first_day.end_of_day
                             else
                               # retrieve for the next time slot. usually 24 hrs.
                               [last_retrieved_at.end_time, last_retrieved_at.end_time + interval.hours]
                             end
      DataRetrieverWorker.perform_async(indx_name, interval, start_time, end_time)
    else
       # start scroll on the specified range and retrieve data.
       query = { range: { insert_time: { gt: DateTime.parse(start_time).utc.iso8601, lt: DateTime.parse(end_time).utc.iso8601 } } }
       data = client.search index: indx_name, scroll: '10m', size: SCROLL_SIZE, body: { query: query }
      ri = RetrievedIndex.find_by(name: indx_name, start_time: start_time, end_time: end_time)
      if ri
        DataRetrieverWorker.perform_at(2.seconds.from_now, indx_name, interval)
        return
      end
      ri = RetrievedIndex.create!(name: indx_name, start_time: start_time, end_time: end_time, documents_cnt: data['hits']['total'])
      if data['hits']['total'] > 0
        if data['hits']['total'] > 2000
          BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
          while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
            BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
          end
        else
          data['hits']['hits'].each do |r|
            schedule(r)
            ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
          end
          while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
            data['hits']['hits'].each do |r|
              schedule(r)
              ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
            end
          end
        end
      else
        DataRetrieverWorker.perform_async(indx_name, interval)
        return
      end
      DataRetrieverWorker.perform_at(indx_name, interval)
    end
  end

  private

  def schedule(data)
    DataPersisterWorker.perform_async(data)
  end
end

问题:

  1. 从ES 2.x检索数据的理想方法应该是什么.我们正在通过日期范围检索数据,然后使用滚动API来检索结果集.这样对吗?
  2. 在特定时间范围内获得较大结果时应该怎么做.有时,我们在几分钟的时间内获得了20000多个记录.理想的方法应该是什么?
  3. sidekiq是否适合处理如此大量的数据?
  4. 运行sidekiq的服务器的理想配置应该是什么?
  5. 使用日期范围来检索数据是否正确?文件数量相差很大.0或100000 +.
  6. 是否有更好的方法可以使我知道多少条记录,而与时间范围无关?
  7. 我尝试独立于时间范围使用滚动API,但是对于具有100cr记录的索引,使用大小为100的滚动(对ES的api调用有100个结果)是否正确?8.索引中的数据将被连续添加.没有文档被更新.

我们已经测试了我们的代码,并且它在每个日期时间范围(例如6个小时)内处理名义数据(例如4-5k个文档).我们还计划将数据分片.由于每当我们添加/更新某些集合中的记录时都需要执行一些ruby回调,因此我们将使用Mongoid来实现.在mongodb中不插入mongoid的直接数据插入是不可行的.

We have tested our code and it handles nominal data(say 4-5k documents) per datetime range(say 6 hrs). We are also planning to shard the data. Since we need some ruby callbacks to be executed whenever we add/update records in some collections, we will be using Mongoid for the same. Direct data insertion in mongodb without mongoid is not an option.

任何指针都会有所帮助.谢谢.

Any pointers would be helpful. Thanks.

推荐答案

  1. 从ES 2.x检索数据的理想方法应该是什么.我们正在通过日期范围检索数据,然后使用滚动API来检索结果集.这样对吗?

ES中的数据是否持续增长?

Are the data is continuously increasing in ES?

  1. 在特定时间范围内获得较大结果时应该怎么做.有时,我们在几分钟的时间内获得了20000多个记录.理想的方法应该是什么?

您正在使用滚动API,这是一种很好的方法.您可以尝试使用ES的Sliced Scroll API.

You are using scroll api that is good approach. You can give a try to Sliced Scroll API of ES.

  1. sidekiq是否适合处理如此大量的数据?

是的,sidekiq很好,可以处理大量数据.

Yes sidekiq is good and can process this amount of data.

  1. 运行sidekiq的服务器的理想配置应该是什么?

您当前运行sidekiq的服务器的配置是什么?

What is your current configuration of the server running sidekiq?

  1. 使用日期范围来检索数据是否正确?文件数量相差很大.0或100000+.

您一次没有保存超过100000个结果.您正在使用滚动API逐块处理它们.如果没有在ES中继续添加数据,则使用带有滚动API的 match_all:{} 进行查询.如果不断添加数据,则日期范围是合适的方法.

You are not holding 100000+ results at a time. You are processing them in chunks using scroll API. If data is not continuely added in ES then use query with match_all: {} with scroll api. if data is continually added then date range is fine approach.

  1. 是否有更好的方法可以使我获得统一的记录数,而与时间范围无关?

是,如果您不使用日期范围而使用.扫描所有形式为0的文档,最后使用滚动api.

Yes, If you use without using date range. Scan all documents form 0 to last with scroll api.

  1. 我尝试独立于时间范围使用滚动API,但是对于具有100cr记录的索引,使用大小为100的滚动(对ES的api调用有100个结果)是否正确?

您可以增加滚动大小,因为mongodb支持批量插入文档. MongoDB批量插入

You can increase scroll size as mongodb supports bulk insertion of documents. MongoDB Bulk Insert

以下几点可以解决您的问题:

Below points may resolve your issue:

处理上一批后清除scroll_id可能会提高性能.

  1. 滚动请求具有优化功能,可以使排序顺序为_doc时更快.如果要遍历所有文档而不考虑顺序,这是最有效的选择.

  1. Scroll requests have optimisations that make them faster when the sort order is _doc. If you want to iterate over all documents regardless of the order, this is the most efficient option.

scroll参数告诉Elasticsearch应该保持搜索上下文存活多长时间.它的值(例如1m)不需要足够长的时间来处理所有数据,而只需要足够长的时间即可处理前一批结果.每个滚动请求都会设置一个新的到期时间.

The scroll parameter tells Elasticsearch how long it should keep the search context alive. Its value (e.g. 1m) does not need to be long enough to process all data, it just needs to be long enough to process the previous batch of results. Each scroll request sets a new expiry time.

搜索上下文将被自动删除.但是,保持滚动打开是有代价的(在性能部分的后面部分讨论),因此,一旦不再使用clear-scroll API,则应立即清除滚动,以明确清除滚动​​:

Search context are automatically removed when the scroll timeout has been exceeded. However keeping scrolls open has a cost (discussed later in the performance section) so scrolls should be explicitly cleared as soon as the scroll is not being used anymore using the clear-scroll API:

Scroll API:后台合并过程通过将较小的段合并在一起以创建新的较大的段来优化索引,然后删除较小的段.在滚动过程中,此过程继续进行,但是开放的搜索上下文可防止旧段在使用中时被删除.通过这种方式,Elasticsearch可以返回初始搜索请求的结果,而不管对文档的后续更改如何.使较旧的段保持活动状态意味着需要更多的文件句柄.确保已将节点配置为具有足够的空闲文件句柄,并且在获取数据后不久清除了滚动API上下文.我们可以使用node stats API检查打开了多少个搜索上下文:

Scroll API : The background merge process optimizes the index by merging together smaller segments to create new bigger segments, at which time the smaller segments are deleted. This process continues during scrolling, but an open search context prevents the old segments from being deleted while they are still in use. This is how Elasticsearch is able to return the results of the initial search request, regardless of subsequent changes to documents. Keeping older segments alive means that more file handles are needed. Ensure that nodes have been configured to have ample free file handles and scroll API context is cleared soon after data fetch. We can check how many search contexts are open with the nodes stats API:

因此,非常有必要如先前在清除滚动API"部分中所述清除滚动API上下文".

It is thus very necessary to clear the Scroll API Context as described earlier in Clear Scroll API section.

来源

这篇关于将elasticsearch 2.x转储到mongodb并返回ES 6.x的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆