如何通过Java API在ElasticSearch中重建索引 [英] How to reindex in ElasticSearch via Java API

查看:226
本文介绍了如何通过Java API在ElasticSearch中重建索引的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

喜欢标题说...



我阅读了这篇文章( https://www.elastic.co/blog/changing-mapping-with-zero-downtime ),这个概念很棒,但我很难找到我通过JAVA API做了很好的参考。



我发现这个插件: https://github.com/karussell/elasticsearch-reindex ,但好像我试图做的那样过度

解决方案

在本地的星巴克进行了一些研究之后,我想出了:



我们假设我们已经有了我们的索引( old_index),并且它具有数据...现在我们将该数据移动到我们创建的新索引(new_index)(可能对于某个字段可能使用不同的模式STRING对INT,或者现在您决定不再希望分析或存储某些字段等)。



Th这里的基本思想是从已经存在的索引(old_index)中检索所有数据,并将其转换为新的索引(new_index)。但是,您只需执行以下操作:



步骤1.您需要执行搜索滚动
https://www.elastic.co/guide/en/elasticsearch/reference/current/ search-request-scroll.html



它所有的检索结果比正常搜索更有效。没有得分等等。文档的内容如下:滚动不是用于实时用户请求,而是用于处理大量数据,例如为了将一个索引的内容重新索引到一个新的索引中使用不同的配置。



以下是有关如何使用Java API的链接: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/scrolling。 html



步骤2.进行插入时,必须使用批量摄取。再一次,出于性能原因。以下是Bulk Ingest Java API的链接: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/bulk.html#_using_bulk_processor



现在要实际做...



步骤1.设置滚动搜索,将从旧索引加载数据

  SearchResponse scrollResp = client.prepareSearch(old_index)//指定索引
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue (60000))
.setQuery(QueryBuilders.matchAllQuery())//匹配所有查询
.setSize(100).execute()。actionGet(); //每个分页将返回100个点击

步骤2.设置批量处理器。 / p>

  int BULK_ACTIONS_THRESHOLD = 1000; 
int BULK_CONCURRENT_REQUESTS = 1;
BulkProcessor bulkProcessor = BulkProcessor.builder(client,new BulkProcessor.Listener(){
@Override
public void beforeBulk(long executionId,BulkRequest request){
logger.info(批量执行由{}动作组成的新的批量,request.numberOfActions());
}

@Override
public void afterBulk(long executionId,BulkRequest request,BulkResponse回复){
logger.info(执行大量由{}动作组成,request.numberOfActions());
}

@Override
public void afterBulk (long executionId,BulkRequest请求,Throwable故障){
logger.warn(执行批量错误,失败);
}
})。setBulkActions(BULK_ACTIONS_THRESHOLD).setConcurrentRequests(BULK_CONCURRENT_REQUESTS)。 。setFlushInterval(TimeValue.timeValueMillis(5))建立();

步骤3.在步骤1中通过创建的滚动搜索器从旧索引读取,直到有mo记录为止插入新的索引

  //滚动直到没有命中返回
while(true){
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())。setScroll(new TimeValue(600000))。execute()。actionGet();
//中断条件:没有命中返回
if(scrollResp.getHits()。getHits()。length == 0){
logger.info(关闭批量处理器) ;
bulkProcessor.close();
break;
}
//从扫描搜索获取结果并将其添加到批量摄取
(SearchHit命中:scrollResp.getHits()){
IndexRequest request = new IndexRequest( new_index,hit.type(),hit.id());
Map source =((Map)((Map)hit.getSource()));
request.source(source);
bulkProcessor.add(request);
}
}

步骤4.现在是分配现有别名的时候了,指向旧指数,新指数。然后删除对旧索引的别名引用,然后删除旧索引本身。要了解如何确定分配给已存在的旧索引的别名,请参阅这篇文章: ElasticSeach JAVA API以找到给定索引的别名



将别名分配给新索引



pre> client.admin()。indices()。prepareAliases()。addAlias(new_index,alias_name)。

从旧索引中删除别名,然后删除旧索引



($)$($)$($)code
client.admin()。indexes()。prepareDelete(old_index)。execute()。actionGet();


Like title says...

I read this article (https://www.elastic.co/blog/changing-mapping-with-zero-downtime), and the concept was great, but I struggling to find decent reference on how to do it via JAVA API.

I found this plugin: https://github.com/karussell/elasticsearch-reindex, but seems like overkill of what I am trying to do

解决方案

After some research at a local Starbucks here is what I came up with:

Let's assume that we have our index already ("old_index") and it has data... Now let's move that data to a new index ("new_index") that we created (perhaps with different schema STRING vs INT for a certain field, or now you decide that you no longer wish to analyze or store certain field, etc).

The basic idea here is to retrieve all data from already existing index ("old_index") and ingest it into new index ("new_index"). However, there are few things that you have to do:

Step 1. You need to perform search scroll https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html

All it does it retrieves results much more efficiently vs regular search. There is no scoring, etc. Here is what the documentation has to say: "Scrolling is not intended for real time user requests, but rather for processing large amounts of data, e.g. in order to reindex the contents of one index into a new index with a different configuration."

Here is a link to Java API on how to use it: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/scrolling.html

Step 2. When doing the insert, you have to use bulk ingest. Once again, it is done for performance reasons. Here is a link to Bulk Ingest Java API: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/bulk.html#_using_bulk_processor

Now onto ho to actually do it...

Step 1. Set up scroll search that would "load" data from old index

SearchResponse scrollResp = client.prepareSearch("old_index") // Specify index
    .setSearchType(SearchType.SCAN)
    .setScroll(new TimeValue(60000))
    .setQuery(QueryBuilders.matchAllQuery()) // Match all query
    .setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll

Step 2. Set up bulk processor.

int BULK_ACTIONS_THRESHOLD = 1000;
int BULK_CONCURRENT_REQUESTS = 1;
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        logger.info("Bulk Going to execute new bulk composed of {} actions", request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        logger.info("Executed bulk composed of {} actions", request.numberOfActions());
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.warn("Error executing bulk", failure);
    }
    }).setBulkActions(BULK_ACTIONS_THRESHOLD).setConcurrentRequests(BULK_CONCURRENT_REQUESTS).setFlushInterval(TimeValue.timeValueMillis(5)).build();

Step 3. Read from old index via created scroll searcher in Step 1 until there are mo records left and insert into new index

//Scroll until no hits are returned
while (true) {
    scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
    //Break condition: No hits are returned
    if (scrollResp.getHits().getHits().length == 0) {
        logger.info("Closing the bulk processor");
        bulkProcessor.close();
        break; 
    }
    // Get results from a scan search and add it to bulk ingest
    for (SearchHit hit: scrollResp.getHits()) {
        IndexRequest request = new IndexRequest("new_index", hit.type(), hit.id());
        Map source = ((Map) ((Map) hit.getSource()));
        request.source(source);
        bulkProcessor.add(request);
   }
}

Step 4. Now it is time to assign existing alias, that points to old index, to new index. Then delete alias reference to old index and then delete old index itself. To find out how to determine alias that were assigned to already existing old index see this post: ElasticSeach JAVA API to find aliases given index

To assign alias to new index

client.admin().indices().prepareAliases().addAlias("new_index", "alias_name").get();

Remove alias from old index and then delete old index

client.admin().indices().prepareAliases().removeAlias("old_index", "alias_name").execute().actionGet();
client.admin().indices().prepareDelete("old_index").execute().actionGet();

这篇关于如何通过Java API在ElasticSearch中重建索引的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆