如何在F#中优化此移动平均值计算 [英] How to optimize this moving average calculation, in F#
问题描述
我的移动平均线具有以下特点:
- 每个条目都有一个时间戳记,值在时间上分布不均,队列长度可能有很大差异.
- 我没有固定的时间段,因此代码必须灵活,因为将要求使用多个时间段.
- 所使用的时间段是一个时间戳记,并且仅使用该时间戳记之上的记录.
这是代码:
module PriceMovingAverage =
// queue duration
let queueDuration = TimeSpan.FromHours(1.)
// moving average queue
let private timestampQueue = Queue<DateTime>()
let private priceQueue = Queue<float>()
// update moving average
let updateMovingAverage (tradeData: TradeData) =
// add the new price
timestampQueue.Enqueue(tradeData.Timestamp)
priceQueue.Enqueue(float tradeData.Price)
// remove the items older than the price base period
let rec dequeueLoop () =
if timestampQueue.Peek() + queueDuration < tradeData.Timestamp then
timestampQueue.Dequeue() |> ignore
priceQueue.Dequeue() |> ignore
dequeueLoop()
dequeueLoop()
// get the moving average
let getPrice fromTimestamp =
// count how many records to skip
let recordsToSkip =
timestampQueue
|> Seq.takeWhile (fun t -> t < fromTimestamp)
|> Seq.length
// calculate the average of the prices within the time range
try
Some (
priceQueue
|> Seq.skip recordsToSkip
|> Seq.average
|> decimal
)
with _ ->
None
问题是最后一部分: 我正在遍历时间戳队列,以查找需要跳过的记录数.然后,我浏览价格记录以计算平均值.
第一部分花费了大量的CPU时间:
let recordsToSkip =
timestampQueue
|> Seq.takeWhile (fun t -> t < fromTimestamp)
|> Seq.length
遍历序列然后计算长度很慢.
理想情况下,我只使用带有循环缓冲区的数组,但是问题是队列的长度会根据数据而有很大不同,因为索引实际上是时间戳,而不是队列中的位置. /p>
我可以将其转换为列表而不是序列,并且可能会提高速度,但这意味着每次都复制整个列表. 我以为拥有两个队列来进行平均会更快,但这也许不是事实.
有没有人知道如何在保持灵活性的同时加快速度(称为5-10x/秒)?
合并两个队列将产生以下结果:
let getPrice fromTimestamp =
try
Some (
priceQueue
|> Seq.toList
|> List.skipWhile (fun t -> t.Timestamp < fromTimestamp)
|> List.averageBy (fun t -> t.Price)
|> decimal
)
with _ ->
None
速度更快,但是仍然很慢.
- 我在这里制作了Jupyter笔记本: https://pastebin.com/E3uS6j7T
- 如果您愿意,我也直接在此处粘贴了测试代码: https://pastebin.com/fK18Wyui
虽然我不确定,如果每秒调用5-10次,则有多少个队列条目...我用1E6条目测试了下面的代码而且速度非常快.
该代码仅解决了跳过"问题.问题的一部分,这似乎是问题中的主要问题.该代码在数组上使用(手工)二进制搜索,如果没有匹配项,则返回匹配的索引或之后的索引.
module MovingAverage
let N = 1000000
let inline findFirstIndexAbove target a =
let upper = Array.length a
let rec loop lower upper =
let mid = lower + (upper - lower) / 2
//printfn "lower = %d, mid = %d, upper = %d" lower mid upper
if mid = lower
then
if a.[mid] = target
then mid
else upper
else
if a.[mid] < target
then
loop mid upper
else if a.[mid] > target
then loop lower mid
else mid
loop 0 upper
let test1 () =
let ats = Array.init N (fun _ -> System.DateTime.Now)
findFirstIndexAbove (ats.[10]) ats
let test2 () =
let au64 = Array.init N (fun i -> 2UL * uint64 i)
findFirstIndexAbove (au64.[10]+1UL) au64
在我的机器上(Debian 64位,便宜的AMD cpu,使用dotnet fsi
作为交互式外壳程序(不是fsharpi!)),我分别获得了test1()
和test2()
的以下计时.
test1();; 实数:00:00:00.223,CPU:00:00:00.220,GC gen0:0,gen1:0,gen2:0 val it:int = 10
test2();; 实数:00:00:00.005,CPU:00:00:00.000,GC gen0:0,gen1:0,gen2:0 val:int = 11
test1()中花费的大部分时间是带有时间戳的数组的初始化.
以上述内容为基础,在更接近问题的情况下,这是一种违反直觉的数组方法:
[<Struct>]
type TradeData =
{
timeStamp : System.DateTime
price : float
}
let inline skipBeyondOldData target a =
let upper = Array.length a
let rec loop lower upper =
let mid = lower + (upper - lower) / 2
//printfn "lower = %d, mid = %d, upper = %d" lower mid upper
if mid = lower
then
if a.[mid].timeStamp = target
then mid
else upper
else
if a.[mid].timeStamp < target
then
loop mid upper
else if a.[mid].timeStamp > target
then loop lower mid
else mid
loop 0 upper
let oneHour = System.TimeSpan.FromHours(1.0)
let cyclicUpdate state (currentPrice : TradeData) =
let tnow = System.DateTime.Now;
let tstart = tnow - oneHour
let workingSetStartIndex = skipBeyondOldData tstart state
let state1 = Array.append (state.[workingSetStartIndex..]) [| currentPrice |]
let avgPrice = Array.averageBy (fun td -> td.price) state1
(avgPrice,state1)
let rng = System.Random()
let initialState = Array.init N (fun _ -> { timeStamp = System.DateTime.Now; price = rng.NextDouble(); })
产生的时间:
cyclicUpdate initialState {timeStamp = System.DateTime.Now;价格= rng.NextDouble()} ;;
实数:00:00:00.016,CPU:00:00:00.010,GC gen0:0,gen1:0,gen2:0
val it:float * TradeData [] =(0.5001679869,...)
I have a moving average that has the following particularities:
- Each entry has a timestamp, the values are not evenly distributed in time and the queue length can vary significantly.
- I do not have a fixed period, so the code must be flexible as several periods will be requested.
- The period used is a timestamp and only records above that timestamp are to be used.
This is the code:
module PriceMovingAverage =
// queue duration
let queueDuration = TimeSpan.FromHours(1.)
// moving average queue
let private timestampQueue = Queue<DateTime>()
let private priceQueue = Queue<float>()
// update moving average
let updateMovingAverage (tradeData: TradeData) =
// add the new price
timestampQueue.Enqueue(tradeData.Timestamp)
priceQueue.Enqueue(float tradeData.Price)
// remove the items older than the price base period
let rec dequeueLoop () =
if timestampQueue.Peek() + queueDuration < tradeData.Timestamp then
timestampQueue.Dequeue() |> ignore
priceQueue.Dequeue() |> ignore
dequeueLoop()
dequeueLoop()
// get the moving average
let getPrice fromTimestamp =
// count how many records to skip
let recordsToSkip =
timestampQueue
|> Seq.takeWhile (fun t -> t < fromTimestamp)
|> Seq.length
// calculate the average of the prices within the time range
try
Some (
priceQueue
|> Seq.skip recordsToSkip
|> Seq.average
|> decimal
)
with _ ->
None
The issue is the last part: I'm iterating through the timestamp queue to find how many records I need to skip. Then I'm going through the price records to calculate the average.
A lot of the CPU time is spent on the first part:
let recordsToSkip =
timestampQueue
|> Seq.takeWhile (fun t -> t < fromTimestamp)
|> Seq.length
going through the sequence and then calculating the length is slow.
Ideally, I'd just use an array with a circular buffer, but the problem is that the length of the queue can vary significantly based on the data as the index is really the timestamp and not the position in the queue.
I could turn this into a list instead of a sequence and maybe gain some speed, but that means copying the whole list each time. I assumed it would be faster to have two queues in order to do the average, but maybe this is not true.
Does anyone have an idea how to make this fast (it's called 5-10x / sec) while keeping the flexibility?
Edit:
Merging the two queues yields this:
let getPrice fromTimestamp =
try
Some (
priceQueue
|> Seq.toList
|> List.skipWhile (fun t -> t.Timestamp < fromTimestamp)
|> List.averageBy (fun t -> t.Price)
|> decimal
)
with _ ->
None
It's faster, but it's still super slow.
Edit:
- I made a Jupyter notebook here: https://pastebin.com/E3uS6j7T
- If you prefer, I also pasted test code directly here: https://pastebin.com/fK18Wyui
While I am not sure, how many queue entries you have if you call it 5-10 times per second... I tested the code below with 1E6 entries and it was blazingly fast.
The code just addresses the "skip" part of the problem, which appears to be the main issue in the question. The code is using (hand crafted) binary search on an array, returning the matching index or the index after, if there is no match.
module MovingAverage
let N = 1000000
let inline findFirstIndexAbove target a =
let upper = Array.length a
let rec loop lower upper =
let mid = lower + (upper - lower) / 2
//printfn "lower = %d, mid = %d, upper = %d" lower mid upper
if mid = lower
then
if a.[mid] = target
then mid
else upper
else
if a.[mid] < target
then
loop mid upper
else if a.[mid] > target
then loop lower mid
else mid
loop 0 upper
let test1 () =
let ats = Array.init N (fun _ -> System.DateTime.Now)
findFirstIndexAbove (ats.[10]) ats
let test2 () =
let au64 = Array.init N (fun i -> 2UL * uint64 i)
findFirstIndexAbove (au64.[10]+1UL) au64
And on my machine (Debian 64 bit, cheap AMD cpu, using dotnet fsi
as interactive shell (not fsharpi!), I get the following timings for test1()
and test2()
respectively.
test1 ();; Real: 00:00:00.223, CPU: 00:00:00.220, GC gen0: 0, gen1: 0, gen2: 0 val it : int = 10
test2 ();; Real: 00:00:00.005, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0 val it : int = 11
The majority of the time spent in test1() is the initialization of the array with the time stamps.
With the above as a recipe, here the rather counter-intuitive array approach in a scenario closer to the question:
[<Struct>]
type TradeData =
{
timeStamp : System.DateTime
price : float
}
let inline skipBeyondOldData target a =
let upper = Array.length a
let rec loop lower upper =
let mid = lower + (upper - lower) / 2
//printfn "lower = %d, mid = %d, upper = %d" lower mid upper
if mid = lower
then
if a.[mid].timeStamp = target
then mid
else upper
else
if a.[mid].timeStamp < target
then
loop mid upper
else if a.[mid].timeStamp > target
then loop lower mid
else mid
loop 0 upper
let oneHour = System.TimeSpan.FromHours(1.0)
let cyclicUpdate state (currentPrice : TradeData) =
let tnow = System.DateTime.Now;
let tstart = tnow - oneHour
let workingSetStartIndex = skipBeyondOldData tstart state
let state1 = Array.append (state.[workingSetStartIndex..]) [| currentPrice |]
let avgPrice = Array.averageBy (fun td -> td.price) state1
(avgPrice,state1)
let rng = System.Random()
let initialState = Array.init N (fun _ -> { timeStamp = System.DateTime.Now; price = rng.NextDouble(); })
With the resulting timing:
cyclicUpdate initialState { timeStamp = System.DateTime.Now; price = rng.NextDouble() };;
Real: 00:00:00.016, CPU: 00:00:00.010, GC gen0: 0, gen1: 0, gen2: 0
val it : float * TradeData [] = (0.5001679869, ... )
这篇关于如何在F#中优化此移动平均值计算的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!