弹性搜索巢。更好的术语聚合代码及其迭代 [英] ElasticSearch Nest. better code for terms aggregation and its iteration

查看:135
本文介绍了弹性搜索巢。更好的术语聚合代码及其迭代的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在给定的时间内获取唯一的数字用户ID列表。



让字段为 userId 和时间字段是 startTime ,我成功地得到如下结果:

 的HashSet< INT> hashUserIdList = new HashSet< int>(); //保证存储唯一的用户ID。 

//步骤1.获取唯一的userIds数
var total = client.Search< Log>(s => s
.Query(q => q
.DateRange(c => c.Field(p => p.startTime)
.GreaterThan(FixedDate)))
.Aggregations(a => a
。 Cardinality(userId_cardinality,c => c
.Field(userId)))
.Aggs.Cardinality(userId_cardinality);

int totalCount =(int)total.Value;

//步骤2.通过术语聚合获取唯一的userId值。
var response = client.Search< Log>(s => s
.Source(source => source.Includes(inc => inc.Field(userId)))
.Query(q => q
.DateRange(c => c.Field(p => p.startTime)
.GreaterThan(FixedDate)))
.Aggregations (a => a
.Terms(userId_terms,c => c
.Field(userId)。Size(totalCount))))
.Aggs.Terms userId_terms);

//步骤3.将唯一的userIds存储到HashSet。
foreach(在response.Buckets中的var元素)
{
hashUserIdList.Add(int.Parse(element.Key));
}

的作品,但似乎效率不如(1 )它首先获取 totalCount ,(2)它定义了 Size(totalCount),这可能会导致500个服务器错误桶溢出(如果结果有数千个)。



foreach 中迭代是很好的,但是我没有让它们按照 100 的顺序来迭代。我把 / 大小跳过 / Take 这里和那里返回的值是不可靠的。



如何正确编码?

解决方案

对于一些集合,但有几个意见:


  1. 基数聚合使用HyperLogLog ++ 算法来大致基数;

  2. 术语汇总对于许多术语来说可能是计算上昂贵的,因为每个桶需要建立在内存中,然后序列化为响应。

您可以跳过基数聚合获取大小,只需传递 int.MaxValue 作为术语聚合的大小。在速度方面效率较低的另一种方法是滚动范围内的所有文档,源过滤器只返回您感兴趣的字段。我希望Scroll方法在集群上施加更少的压力,但是我建议您监视您采取的任何方法。



这是比较Stack Overflow数据集(2016年6月,IIRC)的两种方法,



条款汇总



  void Main()
{
var pool = new SingleNodeConnectionPool(new Uri(http:// localhost:9200));

var connectionSettings = new ConnectionSettings(pool)
.MapDefaultTypeIndices(d => d
.Add(typeof(Question),NDC.StackOverflowIndex)
);


var client = new ElasticClient(connectionSettings);

var twoYearsAgo = DateTime.UtcNow.Date.AddYears(-2);
var yearAgo = DateTime.UtcNow.Date.AddYears(-1);

var searchResponse = client.Search< Question>(s => s
.Size(0)
.Query(q => q
.DateRange (c => c.Field(p => p.CreationDate)
.GreaterThan(twoYearsAgo)
.LessThan(yearAgo)


。聚合(a => a
.Terms(unique_users,c => c
.Field(f => f.OwnerUserId)
.Size(int.MaxValue)


);

var uniqueOwnerUserIds = searchResponse.Aggs.Terms(unique_users)。Buckets.Select(b => b.KeyAsString).ToList();

// 3.83秒
//唯一问题问题:795352
Console.WriteLine($unique question askers:{uniqueOwnerUserIds.Count});
}



Scroll API



  void Main()
{
var pool = new SingleNodeConnectionPool(new Uri(http:// localhost:9200));

var connectionSettings = new ConnectionSettings(pool)
.MapDefaultTypeIndices(d => d
.Add(typeof(Question),NDC.StackOverflowIndex)
);

var client = new ElasticClient(connectionSettings);
var uniqueOwnerUserIds = new HashSet< int>();

var twoYearsAgo = DateTime.UtcNow.Date.AddYears(-2);
var yearAgo = DateTime.UtcNow.Date.AddYears(-1);

var searchResponse = client.Search< Question>(s => s
.Source(sf => sf
.Include(ff => ff
.Field(f => f.OwnerUserId)


.Size(10000)
.Scroll(1m)
.Query => q
.DateRange(c => c
.Field(p => p.CreationDate)
.GreaterThan(twoYearsAgo)
.LessThan(yearAgo)


);

while(searchResponse.Documents.Any())
{
foreach(searchResponse.Documents中的var文档)
{
if(document.OwnerUserId .HasValue)
uniqueOwnerUserIds.Add(document.OwnerUserId.Value);
}

searchResponse = client.Scroll< Question>(1m,searchResponse.ScrollId);
}

client.ClearScroll(c => c.ScrollId(searchResponse.ScrollId));

// 91.8秒
//唯一问题问题:795352
Console.WriteLine($unique question askers:{uniqueOwnerUserIds.Count});
}

条款聚合速度比Scroll API方法快24倍。 >

I'd like to fetch a list of unique numeric user IDs in given period.

Let say the field is userId and time field is startTime, I successfully get results as below;

HashSet<int> hashUserIdList= new HashSet<int>(); // guarantees to store unique userIds.

// Step 1. get unique number of userIds
var total = client.Search<Log>(s => s
    .Query(q => q
        .DateRange(c => c.Field(p => p.startTime)
        .GreaterThan(FixedDate)))
        .Aggregations(a => a
            .Cardinality("userId_cardinality", c => c
                .Field("userId"))))
    .Aggs.Cardinality("userId_cardinality");

int totalCount = (int)total.Value;

// Step 2. get unique userId values by Terms aggregation.
var response = client.Search<Log>(s => s
    .Source(source => source.Includes(inc => inc.Field("userId")))
    .Query(q => q
        .DateRange(c => c.Field(p => p.startTime)
        .GreaterThan(FixedDate)))
    .Aggregations(a => a
        .Terms("userId_terms", c => c
            .Field("userId").Size(totalCount))))
    .Aggs.Terms("userId_terms");

// Step 3. store unique userIds to HashSet.
foreach (var element in response.Buckets)
{
    hashUserIdList.Add(int.Parse(element.Key));
}

It works but seems not efficient as (1) it fetches totalCount firstly, and (2) it defines Size(totalCount) which could make 500 server error due to the bucket overflow (what if the result has thousands).

It would be good to iterate in foreach manner but I failed to make them iterable by size 100. I put From/Size or Skip/Take here and there but the returned value was unreliable.

How can I code correctly?

解决方案

This approach may be OK for some sets but a couple of observations:

  1. Cardinality Aggregation uses HyperLogLog++ algorithm to approximate cardinality; this approximation can be completely accurate for low cardinality fields but less so for high cardinality.
  2. Terms Aggregation may be computationally expensive for many terms, as each bucket needs to be built in memory, then serialized to response.

You can probably skip the Cardinality Aggregation to get the size, and simply pass int.MaxValue as the size for the Terms Aggregation. An alternative approach that would be less efficient in terms of speed would be to scroll through all documents in the range, source filter to only return the field that you're interested in. I would expect the Scroll approach to put less pressure on the cluster, but I would recommend to monitor any approach that you take.

Here's a comparison of the two approaches on the Stack Overflow data set (taken June 2016, IIRC), looking at unique question askers between 2 years ago today and a year ago today.

Terms Aggregation

void Main()
{
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));

    var connectionSettings = new ConnectionSettings(pool)
        .MapDefaultTypeIndices(d => d
            .Add(typeof(Question), NDC.StackOverflowIndex)
        );


    var client = new ElasticClient(connectionSettings);

    var twoYearsAgo = DateTime.UtcNow.Date.AddYears(-2);
    var yearAgo = DateTime.UtcNow.Date.AddYears(-1);

    var searchResponse = client.Search<Question>(s => s
        .Size(0)
        .Query(q => q
            .DateRange(c => c.Field(p => p.CreationDate)
                .GreaterThan(twoYearsAgo)
                .LessThan(yearAgo)
            )
        )
        .Aggregations(a => a
            .Terms("unique_users", c => c
                .Field(f => f.OwnerUserId)
                .Size(int.MaxValue)
            )
        )
    );

    var uniqueOwnerUserIds = searchResponse.Aggs.Terms("unique_users").Buckets.Select(b => b.KeyAsString).ToList();

    // 3.83 seconds
    // unique question askers: 795352
    Console.WriteLine($"unique question askers: {uniqueOwnerUserIds.Count}");
}

Scroll API

void Main()
{
    var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));

    var connectionSettings = new ConnectionSettings(pool)
        .MapDefaultTypeIndices(d => d
            .Add(typeof(Question), NDC.StackOverflowIndex)
        );

    var client = new ElasticClient(connectionSettings);
    var uniqueOwnerUserIds = new HashSet<int>();

    var twoYearsAgo = DateTime.UtcNow.Date.AddYears(-2);
    var yearAgo = DateTime.UtcNow.Date.AddYears(-1);

    var searchResponse = client.Search<Question>(s => s
        .Source(sf => sf
            .Include(ff => ff
                .Field(f => f.OwnerUserId)
            )
        )
        .Size(10000)
        .Scroll("1m")
        .Query(q => q
            .DateRange(c => c
                .Field(p => p.CreationDate)
                .GreaterThan(twoYearsAgo)
                .LessThan(yearAgo)
            )
        )
    );

    while (searchResponse.Documents.Any())
    {
        foreach (var document in searchResponse.Documents)
        {
            if (document.OwnerUserId.HasValue)
                uniqueOwnerUserIds.Add(document.OwnerUserId.Value);
        }

        searchResponse = client.Scroll<Question>("1m", searchResponse.ScrollId);
    }

    client.ClearScroll(c => c.ScrollId(searchResponse.ScrollId));

    // 91.8 seconds
    // unique question askers: 795352
    Console.WriteLine($"unique question askers: {uniqueOwnerUserIds.Count}");
}

Terms aggregation is ~24 times faster than the Scroll API approach.

这篇关于弹性搜索巢。更好的术语聚合代码及其迭代的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆