我可以从一个线程内循环调用一个线程吗? [英] Can I call a thread recurrently from within a thread?

查看:57
本文介绍了我可以从一个线程内循环调用一个线程吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 queue 将样本从线程 A(采集")传输到线程 B(P300"),但我无法读取线程 B 中的任何数据,尽管样本正在分配在线程 A 中.从我的输出来看,我认为我的线程 B 在线程 A 开始放入数据之前正在匆忙和测试.

I'm trying to transfer samples from thread A ("Acquisition") to thread B ("P300") using queue but I can't read any data in thread B, although samples are being allocated in thread A. Judging by my output, I think my thread B is rushing and testing things before my thread A starts to put data in.

在下面查看我的代码结构的近似值:

See an approximation of my code structure bellow:

import threading
import queue
from queue import Empty
import numpy as np
import warnings
warnings.filterwarnings("error")

class AcqThread(threading.Thread):
    def __init__(self, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
        threading.Thread.__init__(self)
        self.stopQ2 = stopQ2
        self.stopQ1 = stopQ1
        self.dataOutQ2 = dataOutQ2
        self.dataOutQ1 = dataOutQ1
        self.saveQ = saveQ

    def run(self):
        Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.stopQ1, self.stopQ2, self.saveQ)

class P300Thread(threading.Thread):
    def __init__(self, dataInQ, featureQ, stopQ):
        threading.Thread.__init__(self)
        self.dataInQ = dataInQ
        self.featureQ = featureQ
        self.stopQ = stopQ

    def run(self):
        P300fun(self.dataInQ, self.featureQ, self.stopQ)

threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
StopQ1 = queue.Queue()
StopQ2 = queue.Queue()
FeatQ1 = queue.Queue()
StopQ1.put(0)
StopQ2.put(0)
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, StopQ1, StopQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1, StopQ1)

def Acquisition(inlet, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
    i = 0
    print('Starting...')
    while i<1250: #i is the number of samples
        sample, timestamp = inlet.pull_sample() #samples coming in @ 250Hz
        ##Normalization, filtering##
        threadLock.acquire()
        dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]]) #I only need the last 250 samples
        threadLock.release()
        i += 1

def P300fun(dataInQ, featureQ, stopQ):
    p300sample = []
    p300timestamp = []
    print(f"Is DataInQ size true? {DataOutQ1.qsize()}")
    print("Is dataInQ emtpy?", DataOutQ1.empty())
    while dataInQ.qsize(): #or while not dataqueue.empty():
        try:
            print("DataInQ has data")
            ss, ts = dataInQ.get(0) 
            print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
        except Empty:
            return
    print('Thread Finished')

if __name__ == '__main__':
    print('Looking for an EEG stream...')
    streams = resolve_stream('type', 'EEG')
    inlet = StreamInlet(streams[0])
    print('Connected!\n')

    AcqTh.start()
    P300Th.start()

    AcqTh.join()
    P300Th.join()

    print("\n\n>>>DONE<<<\n\n")

和输出:

Looking for an EEG stream...
Connected!

Is DataInQ size true? 0
Starting...
Is dataInQ emtpy? True
Thread Finished

>>>DONE<<<

在我的研究中,question 1 似乎存在类似的问题,但问题似乎出在图像处理部分(他们使用了 multiprocessing 包).问题 2似乎有并发问题,这可能是我的问题,但我不确定如何将其转换为我的问题,如果我错了,请告诉我).问题 3 只是参数顺序有问题,所以这里不适用,我想想.

In my research, question 1 appeared to present a similar problem, but it seems that the problem was in the image processing part (and they use the multiprocessing package). Question 2 seems to have a concurrency problem, which might be my problem, but I'm not sure how to translate it to my problemlet me know if I'm wrong, tho). Question 3 had just a problem with the order of arguments, so not applicable here, I think.

我该怎么办?我应该从线程 A 中反复调用线程 B 吗?我需要线程 B 上的循环或延迟吗?.join() 部分可能有问题吗?在不久的将来,我需要添加更多线程,因此最好先弄清楚如何仅使用两个线程...

How should I go about this? Should I recurrently call thread B from within thread A?? Do I need a loop or a delay on thread B? Is there some issue with the .join() part maybe? I'll need to add more threads in a near future, so it would be good to figure out how to work with only two first...

感谢所有帮助!

推荐答案

成为菜鸟可能很棘手...所以我将回答我自己的问题,以帮助其他可能也遇到此问题的初学者.

Being a noob can be tricky... So I'll answer my own question to help other beginners that may come across this issue too.

嗯,首先要注意的是:不,不可能在一个线程内循环调用一个线程,因为每个线程只能被调用一次.

Well, first things first: no, it's not possible to call a thread from within a thread recurrently, because each thread can only be called once.

但是有一种方法可以防止线程结束,使它们等待允许它们继续的触发器.经过更多研究,我遇到了这个问题,它向我展示了一种创建事件的方法对于线程.可以在此处找到该文档.它非常简单:事件对象的行为类似于标志,可以是 set()(表示 True)或 clear()(表示 False,这是原始值).要测试事件,可以使用 is_set() 方法解决布尔问题,或者使用 wait() 方法代替计时器.就我而言,它为我节省了一些我将要使用的队列:

But there is a way to prevent the thread from ending, making them wait for triggers that will allow them to continue. After some more research, I came across this question that showed me there is a way to create events for threads. The documentation can be found here. And it's quite straight forward: the event objects behave like flags and can be set() (indicating True) or clear() (indicating False, which is the original value). To test an event, one can use the is_set() method for boolean problems or use the wait() method instead of a timer. In my case, it saved me some queues I was going to use:

import threading
import queue
from queue import Empty
import numpy as np


class AcqThread(threading.Thread):
    def __init__(self, dataOutQ1, dataOutQ2, saveQ):
        threading.Thread.__init__(self)
        self.dataOutQ2 = dataOutQ2
        self.dataOutQ1 = dataOutQ1
        self.saveQ = saveQ

    def run(self):
        Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.saveQ)

class P300Thread(threading.Thread):
    def __init__(self, dataInQ, featureQ):
        threading.Thread.__init__(self)
        self.dataInQ = dataInQ
        self.featureQ = featureQ

    def run(self):
        P300fun(self.dataInQ, self.featureQ)

threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
FeatQ1 = queue.Queue()
FeatQ2 = queue.Queue()

#NEW:: initializes Events
E = threading.Event()
EP300 = threading.Event()
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1)

它允许我调用"线程 B循环",因为它在活动时保持我的第一个(由于事件 E)并且仅在事件 EP300 被设置时才进入处理部分.然后,在该过程完成后清除 EP300:

And it allows me to "call" thread B "recurrently", as it keeps my first while active (because of event E) and enters the processing part only when the event EP300 is set. Then, EP300 is cleared after the process is done:

def Acquisition(inlet, dataOutQ1, dataOutQ2 saveQ):
    i = 0
    print('Starting...')
    while i<1250:
        sample, timestamp = inlet.pull_sample()
        ##Normalization, filtering##
        if _condition_:
            threadLock.acquire()
            dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]])
            threadLock.release()
            EP300.set() #NEW:: allows the P300 function to collect data from queue
        i += 1
    E.set() #NEW:: flaggs end data collection

def P300fun(dataInQ, featureQ):
    p300sample = []
    p300timestamp = []
    while not E.is_set(): #NEW:: loop until collection is ended
        if EP300.is_set(): #NEW:: activated when Event is triggered
            while dataInQ.qsize():
                try:
                    print("DataInQ has data")
                    ss, ts = dataInQ.get(0) 
                    print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
                except Empty:
                    return
        if not E.is_set(): #NEW:: Event is cleared in case data collection is not over, waiting for a new set()
            EP300.clear()
    print('Thread Finished')

if __name__ == '__main__':
    print('Looking for an EEG stream...')
    streams = resolve_stream('type', 'EEG')
    inlet = StreamInlet(streams[0])
    print('Connected!\n')

    AcqTh.start()
    P300Th.start()

    AcqTh.join()
    P300Th.join()

    print("\n\n>>>DONE<<<\n\n")

这篇关于我可以从一个线程内循环调用一个线程吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆