如何在F#中优化此移动平均值计算 [英] How to optimize this moving average calculation, in F#

查看:81
本文介绍了如何在F#中优化此移动平均值计算的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的移动平均线具有以下特点:

  • 每个条目都有一个时间戳记,值在时间上分布不均,队列长度可能有很大差异.
  • 我没有固定的时间段,因此代码必须灵活,因为将要求使用多个时间段.
  • 所使用的时间段是一个时间戳记,并且仅使用该时间戳记之上的记录.

这是代码:

module PriceMovingAverage =

    // queue duration
    let queueDuration = TimeSpan.FromHours(1.)

    // moving average queue
    let private timestampQueue = Queue<DateTime>()
    let private priceQueue     = Queue<float>()

    // update moving average
    let updateMovingAverage (tradeData: TradeData) =

        // add the new price
        timestampQueue.Enqueue(tradeData.Timestamp)
        priceQueue.Enqueue(float tradeData.Price)

        // remove the items older than the price base period
        let rec dequeueLoop () =
            if timestampQueue.Peek() + queueDuration < tradeData.Timestamp then
                timestampQueue.Dequeue() |> ignore
                priceQueue.Dequeue() |> ignore
                dequeueLoop()

        dequeueLoop()


    // get the moving average
    let getPrice fromTimestamp =

        // count how many records to skip
        let recordsToSkip =
            timestampQueue
            |> Seq.takeWhile (fun t -> t < fromTimestamp)
            |> Seq.length

        // calculate the average of the prices within the time range
        try
            Some (
                priceQueue
                |> Seq.skip recordsToSkip
                |> Seq.average
                |> decimal
            )
        with _ ->
            None

问题是最后一部分: 我正在遍历时间戳队列,以查找需要跳过的记录数.然后,我浏览价格记录以计算平均值.

第一部分花费了大量的CPU时间:

let recordsToSkip =
        timestampQueue
        |> Seq.takeWhile (fun t -> t < fromTimestamp)
        |> Seq.length

遍历序列然后计算长度很慢.

理想情况下,我只使用带有循环缓冲区的数组,但是问题是队列的长度会根据数据而有很大不同,因为索引实际上是时间戳,而不是队列中的位置. /p>

我可以将其转换为列表而不是序列,并且可能会提高速度,但这意味着每次都复制整个列表. 我以为拥有两个队列来进行平均会更快,但这也许不是事实.

有没有人知道如何在保持灵活性的同时加快速度(称为5-10x/秒)?


合并两个队列将产生以下结果:

   let getPrice fromTimestamp =
        try
            Some (
                priceQueue
                |> Seq.toList
                |> List.skipWhile (fun t -> t.Timestamp < fromTimestamp)
                |> List.averageBy (fun t -> t.Price)
                |> decimal
            )
        with _ ->
            None

速度更快,但是仍然很慢.


解决方案

虽然我不确定,如果每秒调用5-10次,则有多少个队列条目...我用1E6条目测试了下面的代码而且速度非常快.

该代码仅解决了跳过"问题.问题的一部分,这似乎是问题中的主要问题.该代码在数组上使用(手工)二进制搜索,如果没有匹配项,则返回匹配的索引或之后的索引.

 module MovingAverage

let  N = 1000000

let inline findFirstIndexAbove target a =
    let upper = Array.length a
    let rec  loop lower upper =
        let mid = lower + (upper - lower) / 2
        //printfn "lower = %d, mid = %d, upper = %d" lower mid upper
        if mid = lower
        then
            if a.[mid] = target
            then mid
            else upper
        else
            if a.[mid] < target
            then
                loop mid upper
            else if a.[mid] > target
            then loop lower mid
            else mid
    loop 0 upper

let test1 () =
    let ats = Array.init N (fun _ -> System.DateTime.Now)
    findFirstIndexAbove (ats.[10]) ats

let test2 () =
    let au64 = Array.init N (fun i -> 2UL * uint64 i)
    findFirstIndexAbove (au64.[10]+1UL) au64
 

在我的机器上(Debian 64位,便宜的AMD cpu,使用dotnet fsi作为交互式外壳程序(不是fsharpi!)),我分别获得了test1()test2()的以下计时.

test1();; 实数:00:00:00.223,CPU:00:00:00.220,GC gen0:0,gen1:0,gen2:0 val it:int = 10
test2();; 实数:00:00:00.005,CPU:00:00:00.000,GC gen0:0,gen1:0,gen2:0 val:int = 11

test1()中花费的大部分时间是带有时间戳的数组的初始化.

以上述内容为基础,在更接近问题的情况下,这是一种违反直觉的数组方法:

 [<Struct>]
type TradeData =
    {
        timeStamp : System.DateTime
        price : float 
    }

let inline skipBeyondOldData target a =
    let upper = Array.length a
    let rec  loop lower upper =
        let mid = lower + (upper - lower) / 2
        //printfn "lower = %d, mid = %d, upper = %d" lower mid upper
        if mid = lower
        then
            if a.[mid].timeStamp = target
            then mid
            else upper
        else
            if a.[mid].timeStamp < target
            then
                loop mid upper
            else if a.[mid].timeStamp > target
            then loop lower mid
            else mid
    loop 0 upper

let oneHour = System.TimeSpan.FromHours(1.0)

let cyclicUpdate state (currentPrice : TradeData) =
    let tnow = System.DateTime.Now;
    let tstart = tnow - oneHour
    let workingSetStartIndex = skipBeyondOldData tstart state
    let state1 = Array.append (state.[workingSetStartIndex..]) [| currentPrice |]
    let avgPrice = Array.averageBy (fun td -> td.price) state1
    (avgPrice,state1)

let rng = System.Random()

let initialState = Array.init N (fun _ -> { timeStamp = System.DateTime.Now; price = rng.NextDouble(); })

 

产生的时间:

cyclicUpdate initialState {timeStamp = System.DateTime.Now;价格= rng.NextDouble()} ;;
实数:00:00:00.016,CPU:00:00:00.010,GC gen0:0,gen1:0,gen2:0
val it:float * TradeData [] =(0.5001679869,...)

I have a moving average that has the following particularities:

  • Each entry has a timestamp, the values are not evenly distributed in time and the queue length can vary significantly.
  • I do not have a fixed period, so the code must be flexible as several periods will be requested.
  • The period used is a timestamp and only records above that timestamp are to be used.

This is the code:

module PriceMovingAverage =

    // queue duration
    let queueDuration = TimeSpan.FromHours(1.)

    // moving average queue
    let private timestampQueue = Queue<DateTime>()
    let private priceQueue     = Queue<float>()

    // update moving average
    let updateMovingAverage (tradeData: TradeData) =

        // add the new price
        timestampQueue.Enqueue(tradeData.Timestamp)
        priceQueue.Enqueue(float tradeData.Price)

        // remove the items older than the price base period
        let rec dequeueLoop () =
            if timestampQueue.Peek() + queueDuration < tradeData.Timestamp then
                timestampQueue.Dequeue() |> ignore
                priceQueue.Dequeue() |> ignore
                dequeueLoop()

        dequeueLoop()


    // get the moving average
    let getPrice fromTimestamp =

        // count how many records to skip
        let recordsToSkip =
            timestampQueue
            |> Seq.takeWhile (fun t -> t < fromTimestamp)
            |> Seq.length

        // calculate the average of the prices within the time range
        try
            Some (
                priceQueue
                |> Seq.skip recordsToSkip
                |> Seq.average
                |> decimal
            )
        with _ ->
            None

The issue is the last part: I'm iterating through the timestamp queue to find how many records I need to skip. Then I'm going through the price records to calculate the average.

A lot of the CPU time is spent on the first part:

let recordsToSkip =
        timestampQueue
        |> Seq.takeWhile (fun t -> t < fromTimestamp)
        |> Seq.length

going through the sequence and then calculating the length is slow.

Ideally, I'd just use an array with a circular buffer, but the problem is that the length of the queue can vary significantly based on the data as the index is really the timestamp and not the position in the queue.

I could turn this into a list instead of a sequence and maybe gain some speed, but that means copying the whole list each time. I assumed it would be faster to have two queues in order to do the average, but maybe this is not true.

Does anyone have an idea how to make this fast (it's called 5-10x / sec) while keeping the flexibility?


Edit:

Merging the two queues yields this:

   let getPrice fromTimestamp =
        try
            Some (
                priceQueue
                |> Seq.toList
                |> List.skipWhile (fun t -> t.Timestamp < fromTimestamp)
                |> List.averageBy (fun t -> t.Price)
                |> decimal
            )
        with _ ->
            None

It's faster, but it's still super slow.


Edit:

解决方案

While I am not sure, how many queue entries you have if you call it 5-10 times per second... I tested the code below with 1E6 entries and it was blazingly fast.

The code just addresses the "skip" part of the problem, which appears to be the main issue in the question. The code is using (hand crafted) binary search on an array, returning the matching index or the index after, if there is no match.

module MovingAverage

let  N = 1000000

let inline findFirstIndexAbove target a =
    let upper = Array.length a
    let rec  loop lower upper =
        let mid = lower + (upper - lower) / 2
        //printfn "lower = %d, mid = %d, upper = %d" lower mid upper
        if mid = lower
        then
            if a.[mid] = target
            then mid
            else upper
        else
            if a.[mid] < target
            then
                loop mid upper
            else if a.[mid] > target
            then loop lower mid
            else mid
    loop 0 upper

let test1 () =
    let ats = Array.init N (fun _ -> System.DateTime.Now)
    findFirstIndexAbove (ats.[10]) ats

let test2 () =
    let au64 = Array.init N (fun i -> 2UL * uint64 i)
    findFirstIndexAbove (au64.[10]+1UL) au64

And on my machine (Debian 64 bit, cheap AMD cpu, using dotnet fsi as interactive shell (not fsharpi!), I get the following timings for test1() and test2() respectively.

test1 ();; Real: 00:00:00.223, CPU: 00:00:00.220, GC gen0: 0, gen1: 0, gen2: 0 val it : int = 10
test2 ();; Real: 00:00:00.005, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0 val it : int = 11

The majority of the time spent in test1() is the initialization of the array with the time stamps.

With the above as a recipe, here the rather counter-intuitive array approach in a scenario closer to the question:

[<Struct>]
type TradeData =
    {
        timeStamp : System.DateTime
        price : float 
    }

let inline skipBeyondOldData target a =
    let upper = Array.length a
    let rec  loop lower upper =
        let mid = lower + (upper - lower) / 2
        //printfn "lower = %d, mid = %d, upper = %d" lower mid upper
        if mid = lower
        then
            if a.[mid].timeStamp = target
            then mid
            else upper
        else
            if a.[mid].timeStamp < target
            then
                loop mid upper
            else if a.[mid].timeStamp > target
            then loop lower mid
            else mid
    loop 0 upper

let oneHour = System.TimeSpan.FromHours(1.0)

let cyclicUpdate state (currentPrice : TradeData) =
    let tnow = System.DateTime.Now;
    let tstart = tnow - oneHour
    let workingSetStartIndex = skipBeyondOldData tstart state
    let state1 = Array.append (state.[workingSetStartIndex..]) [| currentPrice |]
    let avgPrice = Array.averageBy (fun td -> td.price) state1
    (avgPrice,state1)

let rng = System.Random()

let initialState = Array.init N (fun _ -> { timeStamp = System.DateTime.Now; price = rng.NextDouble(); })

With the resulting timing:

cyclicUpdate initialState { timeStamp = System.DateTime.Now; price = rng.NextDouble() };;
Real: 00:00:00.016, CPU: 00:00:00.010, GC gen0: 0, gen1: 0, gen2: 0
val it : float * TradeData [] = (0.5001679869, ... )

这篇关于如何在F#中优化此移动平均值计算的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆