Azure流分析:多个Windows + JOINS [英] Azure Stream Analytics: Multiple Windows + JOINS
问题描述
我的体系结构:
- 1个具有8个分区和10个分区的EventHub; 2个TPU
- 1个流分析工作
- 6个基于相同输入(从100万到600万)的Windows
样本数据:
{side: 'BUY', ticker: 'MSFT', qty: 1, price: 123, tradeTimestamp: 10000000000}
{side: 'SELL', ticker: 'MSFT', qty: 1, price: 124, tradeTimestamp:1000000000}
EventHub PartitionKey
是ticker
The EventHub PartitionKey
is ticker
我想每秒发出以下数据:
(Total quantity bought / Total quantity sold) in the last minute, last 2mn, last 3mn and more
我尝试过的事情:
WITH TradesWindow AS (
SELECT
windowEnd = System.Timestamp,
ticker,
side,
totalQty = SUM(qty)
FROM [Trades-Stream] TIMESTAMP BY tradeTimestamp PARTITION BY PartitionId
GROUP BY ticker, side, PartitionId, HoppingWindow(second, 60, 1)
),
TradesRatio1MN AS (
SELECT
ticker = b.ticker,
buySellRatio = b.totalQty / s.totalQty
FROM TradesWindow b /* SHOULD I PARTITION HERE TOO ? */
JOIN TradesWindow s /* SHOULD I PARTITION HERE TOO ? */
ON s.ticker = b.ticker AND s.side = 'SELL'
AND DATEDIFF(second, b, s) BETWEEN 0 AND 1
WHERE b.side = 'BUY'
)
/* .... More windows.... */
/* FINAL OUTPUT: Joining all the windows */
SELECT
buySellRatio1MN = bs1.buySellRatio,
buySellRatio2MN = bs2.buySellRatio
/* more windows */
INTO [output]
FROM buySellRatio1MN bs1 /* SHOULD I PARTITION HERE TOO ? */
JOIN buySellRatio2MN bs2 /* SHOULD I PARTITION HERE TOO ? */
ON bs2.ticker = bs1.ticker
AND DATEDIFF(second, bs1, bs2) BETWEEN 0 AND 1
问题:
- 这需要6个EventHub Consumer组(每个组只能有5个读者),为什么?我在输入中没有5x6 SELECT语句,为什么呢?
- 输出似乎不一致(我不知道我的JOIN是否正确).
- 有时作业根本不输出(也许是某些分区问题?请参阅代码中有关分区的注释)
简而言之,是否有更好的方法来实现这一目标?在文档和示例中,我找不到任何有关具有多个窗口并加入它们,然后仅从1个输入加入以前的加入结果的例子.
Briefly, is there a better way to achieve this ? I couldn't find anything in the doc and examples about having multiple windows and joining them then joining the results of the previous joins from only 1 input.
推荐答案
对于第一个问题,这取决于横向扩展逻辑的内部实现.在此处中查看详情>.
For the first question, this depend of the internal implementation of the scale out logic. See details here.
对于连接的输出,我看不到整个查询,但是如果您以一个1分钟的窗口连接一个查询,并以一个2分钟的窗口连接一个带有1s时间缓冲"的查询,则每隔一个输出2分钟.为此,UNION运算符会更好.
For the output of the join, I don't see the whole query but if you join a query with a 1 minute window with a query with a 2 minute window with a 1s time "buffer" you will only an output every 2 minutes. UNION operator will be better for this.
从您的样本和您的目标来看,我认为有一种使用UDA(用户定义的集合)来编写此查询的简便得多的方法.
From your sample and your goal, I think there is a much easier way to write this query using UDA (User Defined Aggregate).
为此,我将首先定义一个称为比率"的UDA函数:
For this I will define a UDA function called "ratio" first:
function main() {
this.init = function () {
this.sumSell = 0.0;
this.sumBuy = 0.0;
}
this.accumulate = function (value, timestamp) {
if (value.side=="BUY") {this.sumBuy+=value.qty};
if (value.side=="SELL") {this.sumSell+=value.qty};
}
this.computeResult = function () {
if(this.sumSell== 0) {
result = 0;
}
else {
result = this.sumBuy/this.sumSell;
}
return result;
}
}
然后,我可以简单地在60秒的窗口中使用此SQL查询:
Then I can simply use this SQL query for a 60 seconds window:
SELECT
windowEnd = System.Timestamp,
ticker,
uda.ratio(iothub) as ratio
FROM iothub PARTITION BY PartitionId
GROUP BY ticker, PartitionId, SlidingWindow(second, 60)
这篇关于Azure流分析:多个Windows + JOINS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!