多级组合 [英] Multilevel group by

查看:80
本文介绍了多级组合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个热门的股票报价流,并希望进行多级分组,相当于以下sql

I have a hot stream of quotes for stocks and would like to do a multi-level grouping the equivalent of the following sql

选择符号,avg(px),总和(数量)

select symbol, avg(px), sum(volume)

来自报价

按交换分组,符号

任何帮助将不胜感激。以下是测试设置代码。

Any help will be appreciated. Below is test setup code.

void Main()

{

  Subject< Quote> quotes = new主题< Quote>();

void Main()
{
 Subject<Quote> quotes = new Subject<Quote>();

 从q个变种groupedQuotes =在引号

&NBSP;&NBSP;可以通过新的{q.InstrumentId,q.exchange}到GRP&NBSP基团Q;&NBSP;&NBSP;&NBSP;&NBSP;&NBSP;&NBSP;&NBSP;&NBSP ;     

  选择grp;

  var groupedQuotes = from q in quotes
  group q by new { q.InstrumentId, q.exchange } into grp              
  select grp;

    var subscribe = from g in groupedQuotes .Window(TimeSpan.FromSeconds(1))选择克;

&NBSP;&NBSP;&NBSP; subscribe.Dump();

   var subscribe = from g in groupedQuotes.Window(TimeSpan.FromSeconds(1)) select g;
   subscribe.Dump();

&NBSP;&NBSP;&NBSP; &NBSP;&NBSP;

&NBSP;&NBSP; quotes.OnNext(新报价{InstrumentId = QUOT; A" ;, Px的=10.10米,&NBSP;交换= QUOT; O" ;,扇区= QUOT; b" ,体积= 100L});

&NBSP;&NBSP; quotes.OnNext(新报价{InstrumentId = QUOT; b" ;, Px的=50.10米,交换= QUOT; N" ;,扇区= QUOT; b" ;,体积= 100L});

   quotes.OnNext(new Quote {InstrumentId =" A",Px = 20.11m,exchange =" R",sector =" B",volume = 100L});

   quotes.OnNext(new Quote {InstrumentId =" A",Px = 26.11m,exchange =" R" ,sector =" B",volume = 100L});

  

   //预期结果

   // A,O,10.10,100
$
   // A,R,23.11,200
$
   // B, N,50.10,100
$
 }

     
  quotes.OnNext(new Quote{InstrumentId = "A", Px = 10.10m,  exchange="O", sector="B", volume=100L});
  quotes.OnNext(new Quote{InstrumentId = "B", Px = 50.10m, exchange="N", sector="B", volume=100L});
  quotes.OnNext(new Quote{InstrumentId = "A", Px = 20.11m, exchange="R", sector="B", volume=100L});
  quotes.OnNext(new Quote{InstrumentId = "A", Px = 26.11m, exchange="R", sector="B", volume=100L});
  
  //Expected result
  //A, O, 10.10, 100
  //A, R, 23.11, 200
  //B, N, 50.10, 100
 }

 公共类报价

  {

  公共字符串InstrumentId {get;组; }
   public decimal Px {get;组; }
   public string symbol {get;组; }
   public string sector {get;组; }
   public string exchange {get;组; }
   public long volume {get;组; }
  

 }

}

 public class Quote
 {
  public string InstrumentId { get; set; }
  public decimal Px { get; set; }
  public string symbol { get; set; }
  public string sector { get; set; }
  public string exchange { get; set; }
  public long volume { get; set; }
  
 }
}

推荐答案

这不是真的按问题划分的多级别小组。 通过包含聚合的投影查询,它只是一个组。 诀窍是为每个窗口投射一个新的匿名类型,并使用
Zip 将聚合投影到子匿名类型以保持被动。 但是,如果窗口为空(除以零),则
平均值运算符将失败,因此您必须确保始终至少有一个元素;我在下面用
Merge 运算符完成了这项操作,因此空窗口的平均值为0.

This isn't really a multi-level group by problem.  It's just a single group by query with a projection containing aggregates.  The trick is to project a new anonymous type for each window and to use Zip to project the aggregates into a child anonymous type to remain reactive.  However, the Average operator will fail if the window is empty (division by zero), so you must ensure there's always at least one element; I've done this below with the Merge operator so that the average value for an empty window is 0.

from q in quotes
group q by new { q.InstrumentId, q.Exchange } into g
from window in g.Window(TimeSpan.FromSeconds(1))
select new
{
	InstrumentId = g.Key.InstrumentId,
	Exchange = g.Key.Exchange,
	Aggregates = Observable.Zip(
		window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(),
		window.Select(q => q.Volume).Sum(),
		(avg, sum) => new { avg, sum })
};

编辑:这是完整的实验室,用于说明如何订阅此查询:

Here's the complete lab to illustrate how to subscribe to this query:

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace Rxx.Labs.Reactive
{
	public sealed class GroupByMultiLevel : BaseConsoleLab
	{
		protected override void Main()
		{
			var quotes = new Subject<Quote>();

			var groupedQuotes =
				from q in quotes
				group q by new { q.InstrumentId, q.Exchange } into g
				from window in g.Window(TimeSpan.FromSeconds(1))
				select new
				{
					InstrumentId = g.Key.InstrumentId,
					Exchange = g.Key.Exchange,
					Aggregates = Observable.Zip(
						window.Select(q => q.Px).Merge(window.IsEmpty().Where(i => i).Select(_ => 0M)).Average(),
						window.Select(q => q.Volume).Sum(),
						(avg, sum) => new { avg, sum })
				};

			using (groupedQuotes
				.Subscribe(group => group.Aggregates
				.Subscribe(value => TraceLine(group.InstrumentId + ',' + group.Exchange + ',' + value.avg + ',' + value.sum))))
			{
				quotes.OnNext(new Quote { InstrumentId = "A", Px = 10.10m, Exchange = "O", Volume = 100L });
				quotes.OnNext(new Quote { InstrumentId = "B", Px = 50.10m, Exchange = "N", Volume = 100L });
				quotes.OnNext(new Quote { InstrumentId = "A", Px = 20.11m, Exchange = "R", Volume = 100L });
				quotes.OnNext(new Quote { InstrumentId = "A", Px = 26.11m, Exchange = "R", Volume = 100L });

				WaitForKey();
			}
		}

		class Quote
		{
			public string InstrumentId { get; set; }
			public decimal Px { get; set; }
			public string Exchange { get; set; }
			public long Volume { get; set; }
		}
	}
}

- 戴夫


这篇关于多级组合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆