多级组合 [英] Multilevel group by
问题描述
我有一个热门的股票报价流,并希望进行多级分组,相当于以下sql
I have a hot stream of quotes for stocks and would like to do a multi-level grouping the equivalent of the following sql
选择符号,avg(px),总和(数量)
select symbol, avg(px), sum(volume)
来自报价
按交换分组,符号
任何帮助将不胜感激。以下是测试设置代码。
Any help will be appreciated. Below is test setup code.
void Main()
{
Subject< Quote> quotes = new主题< Quote>();
void Main()
{
Subject<Quote> quotes = new Subject<Quote>();
从q个变种groupedQuotes =在引号
&NBSP;&NBSP;可以通过新的{q.InstrumentId,q.exchange}到GRP&NBSP基团Q;&NBSP;&NBSP;&NBSP;&NBSP;&NBSP;&NBSP;&NBSP;&NBSP ;
选择grp;
var groupedQuotes = from q in quotes
group q by new { q.InstrumentId, q.exchange } into grp
select grp;
var subscribe = from g in groupedQuotes .Window(TimeSpan.FromSeconds(1))选择克;
&NBSP;&NBSP;&NBSP; subscribe.Dump();
var subscribe = from g in groupedQuotes.Window(TimeSpan.FromSeconds(1)) select g;
subscribe.Dump();
&NBSP;&NBSP;&NBSP; &NBSP;&NBSP;
&NBSP;&NBSP; quotes.OnNext(新报价{InstrumentId = QUOT; A" ;, Px的=10.10米,&NBSP;交换= QUOT; O" ;,扇区= QUOT; b" ,体积= 100L});
&NBSP;&NBSP; quotes.OnNext(新报价{InstrumentId = QUOT; b" ;, Px的=50.10米,交换= QUOT; N" ;,扇区= QUOT; b" ;,体积= 100L});
quotes.OnNext(new Quote {InstrumentId =" A",Px = 20.11m,exchange =" R",sector =" B",volume = 100L});
quotes.OnNext(new Quote {InstrumentId =" A",Px = 26.11m,exchange =" R" ,sector =" B",volume = 100L});
//预期结果
// A,O,10.10,100
$
// A,R,23.11,200
$
// B, N,50.10,100
$
}
quotes.OnNext(new Quote{InstrumentId = "A", Px = 10.10m, exchange="O", sector="B", volume=100L});
quotes.OnNext(new Quote{InstrumentId = "B", Px = 50.10m, exchange="N", sector="B", volume=100L});
quotes.OnNext(new Quote{InstrumentId = "A", Px = 20.11m, exchange="R", sector="B", volume=100L});
quotes.OnNext(new Quote{InstrumentId = "A", Px = 26.11m, exchange="R", sector="B", volume=100L});
//Expected result
//A, O, 10.10, 100
//A, R, 23.11, 200
//B, N, 50.10, 100
}
公共类报价
{
公共字符串InstrumentId {get;组; }
public decimal Px {get;组; }
public string symbol {get;组; }
public string sector {get;组; }
public string exchange {get;组; }
public long volume {get;组; }
}
}
public class Quote
{
public string InstrumentId { get; set; }
public decimal Px { get; set; }
public string symbol { get; set; }
public string sector { get; set; }
public string exchange { get; set; }
public long volume { get; set; }
}
}
推荐答案
这不是真的按问题划分的多级别小组。 通过包含聚合的投影查询,它只是一个组。 诀窍是为每个窗口投射一个新的匿名类型,并使用
Zip 将聚合投影到子匿名类型以保持被动。 但是,如果窗口为空(除以零),则
平均值运算符将失败,因此您必须确保始终至少有一个元素;我在下面用
Merge 运算符完成了这项操作,因此空窗口的平均值为0.
This isn't really a multi-level group by problem. It's just a single group by query with a projection containing aggregates. The trick is to project a new anonymous type for each window and to use Zip to project the aggregates into a child anonymous type to remain reactive. However, the Average operator will fail if the window is empty (division by zero), so you must ensure there's always at least one element; I've done this below with the Merge operator so that the average value for an empty window is 0.
from q in quotes
group q by new { q.InstrumentId, q.Exchange } into g
from window in g.Window(TimeSpan.FromSeconds(1))
select new
{
InstrumentId = g.Key.InstrumentId,
Exchange = g.Key.Exchange,
Aggregates = Observable.Zip(
window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(),
window.Select(q => q.Volume).Sum(),
(avg, sum) => new { avg, sum })
};
编辑:这是完整的实验室,用于说明如何订阅此查询:
Here's the complete lab to illustrate how to subscribe to this query:
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace Rxx.Labs.Reactive
{
public sealed class GroupByMultiLevel : BaseConsoleLab
{
protected override void Main()
{
var quotes = new Subject<Quote>();
var groupedQuotes =
from q in quotes
group q by new { q.InstrumentId, q.Exchange } into g
from window in g.Window(TimeSpan.FromSeconds(1))
select new
{
InstrumentId = g.Key.InstrumentId,
Exchange = g.Key.Exchange,
Aggregates = Observable.Zip(
window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(),
window.Select(q => q.Volume).Sum(),
(avg, sum) => new { avg, sum })
};
using (groupedQuotes
.Subscribe(group => group.Aggregates
.Subscribe(value => TraceLine(group.InstrumentId + ',' + group.Exchange + ',' + value.avg + ',' + value.sum))))
{
quotes.OnNext(new Quote { InstrumentId = "A", Px = 10.10m, Exchange = "O", Volume = 100L });
quotes.OnNext(new Quote { InstrumentId = "B", Px = 50.10m, Exchange = "N", Volume = 100L });
quotes.OnNext(new Quote { InstrumentId = "A", Px = 20.11m, Exchange = "R", Volume = 100L });
quotes.OnNext(new Quote { InstrumentId = "A", Px = 26.11m, Exchange = "R", Volume = 100L });
WaitForKey();
}
}
class Quote
{
public string InstrumentId { get; set; }
public decimal Px { get; set; }
public string Exchange { get; set; }
public long Volume { get; set; }
}
}
}
- 戴夫
这篇关于多级组合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!