信号处理功能的时间优化 [英] Time optimization of function for signal processing

查看:82
本文介绍了信号处理功能的时间优化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个程序在做很多迭代(几千到几百万到几亿).它开始要花费很多时间(几分钟,甚至几天),尽管我花了很多力气对其进行优化,但我仍然有些停滞.

I have a program doing a LOT of iteration (thousands to millions to hundreds of millions). It's starting to take quite a lot of time (few minutes, to a few days), and despite all my effort to optimize it, I'm still a bit stuck.

配置文件:通过控制台调用使用cProfile

Profile: using cProfile via console call

ncalls     tottime  percall  cumtime  percall filename:lineno(function)
    500/1    0.018    0.000  119.860  119.860 {built-in method builtins.exec}
        1    0.006    0.006  119.860  119.860 Simulations_profiling.py:6(<module>)
      6/3    0.802    0.134  108.302   36.101 Simulations_profiling.py:702(optimization)
    38147    0.581    0.000  103.411    0.003 Simulations_profiling.py:270(overlap_duo_combination)
   107691   28.300    0.000  102.504    0.001 Simulations_profiling.py:225(duo_overlap)
 12615015   69.616    0.000   69.616    0.000 {built-in method builtins.round}

第一个问题是,第一行的2条是什么?我以为是程序被调用了.

我将通过if / else语句中的公差比较来替换round()方法,从而避免浪费时间.我想进一步优化以下两个功能,但是找不到新的方法.

I'm going to replace the round() method by tolerance comparison in my if / else statements, thus avoiding this time consumption. I would like to optimize further the 2 following functions, but I can't find a new approach.

import itertools
import numpy as np

class Signal:
    def __init__(self, fq, t0, tf, width=0.3):
        self.fq = fq                                    # frequency in Hz
        self.width = float(width)                       # cathodic phase width in ms
        self.t0 = t0                                    # Instant of the first pulse in ms
        self.tf = tf                                    # End point in ms

        # List of instant at which a stim pulse is triggered in ms
        self.timeline = np.round(np.arange(t0, self.tf, 1/fq*1000), 3)

    def find_closest_t(self, t):
        val = min(self.timeline, key=lambda x:abs(x-t))
        id = np.where(self.timeline==val)[0][0]

        if val <= t or id == 0:
            return val, id
        else:
            return self.timeline[id-1], id-1

def duo_overlap(S1, S2, perc):

    pulse_id_t1, pulse_id_t2 = [], []

    start = max(S1.t0, S2.t0) - max(S1.width, S2.width)
    if start <= 0:
        start = 0
        start_id_S1 = 0
        start_id_S2 = 0
    else:
        start_id_S1 = S1.find_closest_t(start)[1]
        start_id_S2 = S2.find_closest_t(start)[1]

    stop = min(S1.tf, S2.tf) + max(S1.width, S2.width)

    for i in range(start_id_S1, len(S1.timeline)):
        if S1.timeline[i] > stop:
            break

        for j in range(start_id_S2, len(S2.timeline)):
            if S2.timeline[j] > stop:
                break

            d = round(abs(S2.timeline[j] - S1.timeline[i]), 3)  # Computation of the distance of the 2 point

            if S1.timeline[i] <= S2.timeline[j] and d < S1.width:
                pulse_id_t1.append(i)
                pulse_id_t2.append(j)
                continue

            elif S1.timeline[i] >= S2.timeline[j] and d < S2.width:
                pulse_id_t1.append(i)
                pulse_id_t2.append(j)
                continue

            else:
                continue

    return pulse_id_t1, pulse_id_t2

def overlap_duo_combination(signals, perc=0):

    overlap = dict()
    for i in range(len(signals)):
        overlap[i] = list()

    for subset in itertools.combinations(signals, 2):
        p1, p2 = duo_overlap(subset[0], subset[1], perc = perc)
        overlap[signals.index(subset[0])] += p1
        overlap[signals.index(subset[1])] += p2

    return overlap

程序说明:

Explanation of the program:

我有宽度为Signal.width且频率为Signal.fq的方波信号,开始于Signal.t0,结束于Signal.tf.在Signal实例的初始化过程中,将计算timeline:它是浮点数的一维数组,其中each number corresponds to the instant at which a pulse is triggered.

I have square signals of width Signal.width and of frequency Signal.fq starting at Signal.t0 and ending at Signal.tf. During the initialization of a Signal instance, the timeline is computed: it's a 1D-array of float in which each number corresponds to the instant at which a pulse is triggered.

示例:

IN: Signal(50, 0, 250).timeline
OUT: array([  0.,  20.,  40.,  60.,  80., 100., 120., 140., 160., 180., 200., 220., 240.])

A pulse is triggered at t = 0, t = 20, t = 40, ... Each pulse has a width of 0.3.

duo_overlap()在输入中接收2个信号(在这个示例中,我们将固定为0的百分比.此函数计算S1和S2的脉冲的id(在时间轴数组中的ID)与1重叠与另一个.

duo_overlap() takes 2 signals in input (and a percentage that we will keep fix at 0 for this example. This function computes the id of the the pulse for S1 and for S2 (ID in the timeline array) that overlap one with another.

示例:

如果对于S1,脉冲从t = 0开始,而对于S1,脉冲从t = 0.2开始 S2,因为0.2-0 = 0.2 <0. 0.3(S1​​.width),两个脉冲重叠.

If a pulse starts at t = 0 for S1 and a pulse starts at t = 0.2 for S2, since 0.2 - 0 = 0.2 < 0.3 (S1.width), the 2 pulses overlap.

我试图通过仅在可能会相互重叠的ID(start_idstop)上循环来优化此功能.但是如您所见,由于大量的通话.

I tried to optimize this function by looping only on the ID in which they can possibly overlap (start_id, stop) computed ahead.But as you can see, this function is still really time-consuming because of the high number of calls.

最后一个函数overlap_duo_combination()将输入的N个信号作为列表(或元组/可迭代)(实际上2 <== N <= 6),并创建一个dict(),其中的键是ID输入信号列表中的信号的最大值,并且该值是重叠脉冲ID的列表(输入列表中信号的2比2的比较).

The last function, overlap_duo_combination() takes N signals in input as a list (or tuple / iterable) (2 <= N <= 6 in practice) and creates a dict() in which the key is the ID of the signal in the input list, and the value is a list of overlapping pulses ID (comparison 2 by 2 of the signals within the input list).

示例:

输入:信号=(S1,S2,S3)S1的脉冲n°2与脉冲n°3重叠 S2的脉冲n°3与S3的脉冲n°5重叠.

Input: signals = (S1, S2, S3) The pulse n°2 of S1 overlap with pulse n°3 of S2 and the pulse n°3 of S2 overlap with pulse n°5 of S3.

输出:dict [0] = [2]/dict [1] = [3,3]/dict [2] = [5]

Output: dict[0] = [2] / dict[1] = [3, 3] / dict[2] = [5]

3对S2弹出两次,因为它将添加第一个图块duo_overlap()在S1和S2上调用,第二次在S2和S3上调用. 我不想避免重复,因为它是有关有多少个不同脉冲重叠的信息(在这种情况下,有2个脉冲与S2的脉冲n°3重叠).

The 3 pops out twice for S2 because it will be add the first tile duo_overlap() is called on S1 and S2, and the second time when it is caleed on S2 and S3. I don't want to avoid duplicates since it's an information on how many different pulses are overlapping (in this case, 2 pulses are overlapping with the pulse n°3 of S2).

您是否有任何想法,建议,代码或其他任何可以减轻这部分代码时间复杂度的东西?

由于我拥有Nvidia 1080 Ti,但我目前正在研究PyCUDA的实现,但是我不懂C语言. 是否值得将这个内部函数切换到GPU,该内部函数在被调用时不会花费很长时间执行,而是被调用了数千次?

I am currently looking into PyCUDA implementation since I have a Nvidia 1080 Ti at disposal, but I don't know the C language. Would it be worth to switch to GPU this inner function that doesn't take long to execute when called but is called thousands of times?

感谢您阅读如此长的帖子,并感谢您的帮助!

Thanks for reading such a long post, and thanks for the help!

推荐答案

如果我正确理解了您的问题,则可以依靠numpy而不是执行所有循环来加快duo_overlap()函数的速度.

If I understood your problem correctly, you could speed up your duo_overlap() function by relying on numpy instead of performing all the loops.

您想从S1.timeline中减去S2.timeline的所有值,然后将差值与信号宽度进行比较.以下函数重复S1.timeline(作为列重复),并从每一行中减去S2.timeline.因此,行索引对应于S1.timeline,列索引对应于S2.timeline.

You would like to subtract all values of the S2.timeline from the S1.timeline and compare the difference to the width of the signal. The following function repeats the S1.timeline (repeats as columns) and subtracts S2.timeline from each row. Thus, the row indices correspond to S1.timeline, the column indices correspond to S2.timeline.

def duo_overlap_np(S1, S2):
    x = S1.timeline
    y = S2.timeline

    mat = np.repeat(x, y.shape[0]).reshape(x.shape[0],y.shape[0])
    mat = mat - y

    # End of S1 pulse overlaps with start of S2 pulse
    overlap_1 = np.where((0 >= mat) & (mat >= -S1.width))

    # End of S2 pulse overlaps with start of S1 pulse
    overlap_2 = np.where((0 <= mat) & (mat <= S2.width))

    # Flatten the overlap arrays. The first element returned by np.where
    # corresponds to S1, the second to S2
    S1_overlap = np.concatenate([overlap_1[0], overlap_2[0]])
    S2_overlap = np.concatenate([overlap_1[1], overlap_2[1]])

    return S1_overlap, S2_overlap

在我的机器上进行快速速度比较,

A quick speed comparison on my machine gives,

S1 = Signal(50, 0, 1000, width=0.3)
S2 = Signal(25.5, 20.2, 1000, width=0.6)

%timeit duo_overlap(S1, S2, 0)
# 7 ms ± 284 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

%timeit duo_overlap_np(S1, S2)
# 38.2 µs ± 2.42 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

这篇关于信号处理功能的时间优化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆