通过Jest将Elasticsearch索引的一部分重新索引到新索引上 [英] Reindex part of Elasticsearch index onto new index via Jest

查看:107
本文介绍了通过Jest将Elasticsearch索引的一部分重新索引到新索引上的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个测试的ElasticSearch 6.0索引,其中填充了数百万条记录,可能在数十亿的生产中.我需要搜索这些记录的子集,然后将原始集的该子集保存到二级索引中,以供以后搜索.我已经通过在Kibana上查询ES证明了这一点,挑战是使用我的Jest客户端(searchbox.io,版本5.3.3)在Java 8中找到合适的API来做到这一点.ElasticSearch群集位于AWS上,因此无法使用传输客户端.

I have a test ElasticSearch 6.0 index populated with millions of records, likely to be in the billions in production. I need to search for a subset of these records, then save this subset of the original set into a secondary index for later searching. I have proven this out via querying ES on Kibana, the challenge is to find appropriate APIs in Java 8 using my Jest client (searchbox.io, version 5.3.3) to do the same. The ElasticSearch cluster is on AWS, so using a transport client is out.

POST _reindex?slices=10&wait_for_completion=false
{ "conflicts": "proceed",
  "source":{
    "index": "my_source_idx",
    "size": 5000,
    "query": { "bool": {
      "filter": { "bool" : { "must" : [
        { "nested": { "path": "test", "query": { "bool": { "must":[
           { "terms" : { "test.RowKey": ["abc"]} },
           { "range" : { "test.dates" : { "lte": "2018-01-01", "gte": "2010-08-01"} } },
           { "range" : { "test.DatesCount" : { "gte": 2} } },
           { "script" : { "script" : { "id": "my_painless_script", 
              "params" : {"min_occurs" : 1, "dateField": "test.dates", "RowKey": ["abc"], "fromDate": "2010-08-01", "toDate": "2018-01-01"}}}}
        ]}}}}
      ]}}
    }}
  },
  "dest": {
    "index": "my_dest_idx"
  },
  "script": {
    "source": <My painless script>
  } }

我知道我可以对源索引执行搜索,然后将响应记录创建并批量加载到新索引上,但是我希望能够一次性完成所有操作,因为我的脚本很简单收集一些与将搜索二级索引的查询有关的信息.性能是一个问题,因为应用程序将使用目标索引将后续查询链接在一起.有人知道如何使用Jest完成此操作吗?

I am aware I can perform a search on the source index, then create and bulk load the response records onto the new index, but I want to be able to do this all in one shot, as I do have a painless script to glean off some information that is pertinent to the queries that will search the secondary index. Performance is a concern, as the application will be chaining subsequent queries together using the destination index to query against. Does anyone know how to do accomplish this using Jest?

推荐答案

Jest似乎不支持此特定功能.Jest API它有一种方法可以将脚本(而不是查询)作为参数传递,但是我什至遇到了问题.

It appears as if this particular functionality is not yet supported in Jest. The Jest API It has a way to pass in a script (not a query) as a parameter, but I even was having problems with that.

在与同事进行一些黑客攻击之后,我们找到了解决此问题的方法...

After some hacking with a coworker, we found a way around this...

步骤1)通过对脚本的编辑来扩展GenericResultAbstractionAction类:

Step 1) Extend the GenericResultAbstractionAction class with edits to the script:

public class GenericResultReindexActionHack extends GenericResultAbstractAction {
    GenericResultReindexActionHack(GenericResultReindexActionHack.Builder builder) {
        super(builder);

        Map<String, Object> payload = new HashMap<>();
        payload.put("source", builder.source);
    payload.put("dest", builder.dest);
    if (builder.conflicts != null) {
        payload.put("conflicts", builder.conflicts);
    }
    if (builder.size != null) {
        payload.put("size", builder.size);
    }
    if (builder.script != null) {
        Script script = (Script) builder.script;

//请注意,脚本参数需要采用不同的格式以符合ES _reindex API:

// Note the script parameter needs to be formatted differently to conform to the ES _reindex API:

        payload.put("script", new Gson().toJson(ImmutableMap.of("id", script.getIdOrCode(), "params", script.getParams())));
    }
    this.payload = ImmutableMap.copyOf(payload);

    setURI(buildURI());
}

@Override
protected String buildURI() {
    return super.buildURI() + "/_reindex";
}

@Override
public String getRestMethodName() {
    return "POST";
}

@Override
public String getData(Gson gson) {
    if (payload == null) {
        return null;
    } else if (payload instanceof String) {
        return (String) payload;
    } else {

//我们需要删除查询,目标和脚本字段的错误格式:

// We need to remove the incorrect formatting for the query, dest, and script fields:

        // TODO: Need to consider spaces in the JSON
        return gson.toJson(payload).replaceAll("\\\\n", "")
                        .replace("\\", "")
                        .replace("query\":\"", "query\":")
                        .replace("\"},\"dest\"", "},\"dest\"")
                        .replaceAll("\"script\":\"","\"script\":")
                .replaceAll("\"}","}")
                .replaceAll("},\"script\"","\"},\"script\"");

    }
}

public static class Builder extends GenericResultAbstractAction.Builder<GenericResultReindexActionHack , GenericResultReindexActionHack.Builder> {

    private Object source;
    private Object dest;
    private String conflicts;
    private Long size;
    private Object script;

    public Builder(Object source, Object dest) {
        this.source = source;
        this.dest = dest;
    }

    public GenericResultReindexActionHack.Builder conflicts(String conflicts) {
        this.conflicts = conflicts;
        return this;
    }

    public GenericResultReindexActionHack.Builder size(Long size) {
        this.size = size;
        return this;
    }

    public GenericResultReindexActionHack.Builder script(Object script) {
        this.script = script;
        return this;
    }

    public GenericResultReindexActionHack.Builder waitForCompletion(boolean waitForCompletion) {
        return setParameter("wait_for_completion", waitForCompletion);
    }

    public GenericResultReindexActionHack.Builder waitForActiveShards(int waitForActiveShards) {
        return setParameter("wait_for_active_shards", waitForActiveShards);
    }

    public GenericResultReindexActionHack.Builder timeout(long timeout) {
        return setParameter("timeout", timeout);
    }

    public GenericResultReindexActionHack.Builder requestsPerSecond(double requestsPerSecond) {
        return setParameter("requests_per_second", requestsPerSecond);
    }

    public GenericResultReindexActionHack build() {
        return new GenericResultReindexActionHack(this);
    }
}

}

第2步)在查询中使用此类,然后要求您将查询作为源的一部分传递,然后删除'\ n'字符:

Step 2) Use of this class with a query then requires you to pass in the query as part of the source, then remove the '\n' characters:

ImmutableMap<String, Object> sourceMap = ImmutableMap.of("index", sourceIndex, "query", qb.toString().replaceAll("\\\\n", ""));
        ImmutableMap<String, Object> destMap = ImmutableMap.of("index", destIndex);

GenericResultReindexActionHack reindex = new GenericResultReindexActionHack.Builder(sourceMap, destMap)
                .waitForCompletion(false)
                .conflicts("proceed")
                .size(5000L)
                .script(reindexScript)
                .setParameter("slices", 10)
                .build();

        JestResult result = handleResult(reindex);
        String task = result.getJsonString();
        return (task);

请注意,reindexScript参数的类型为org.elasticsearch.script.

Note the reindexScript parameter is of type org.elasticsearch.script.

这是一种摆脱Jest局限性的混乱方式,但似乎可行.我了解以这种方式进行操作可能会对输入格式中可接受的内容产生一些限制...

This is a messy, hack-y way of getting around the limitations of Jest, but it seems to work. I understand that by doing it this way there may be some limitations to what may be acceptable in the input formatting...

这篇关于通过Jest将Elasticsearch索引的一部分重新索引到新索引上的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆