管道:多流消费者 [英] Conduit: Multiple Stream Consumers
问题描述
我编写一个程序来计算语料库中NGrams的频率。我已经有了一个函数,它消耗了一系列的标记并生成单个订单的NGrams:
ngram :: Monad m => ; Int - > Conduit t m [t]
trigrams = ngram 3
countFreq ::(Ord t,Monad m)=> Consumer [t] m(Map [t] Int)
目前我只能连接一个流消费者到流源:
标记--- trigrams --- countFreq
如何将多个流消费者连接到相同的流源?
我想要这样的:
.--- unigrams - - countFreq
| --- bigrams --- countFreq
令牌---- | --- trigrams --- countFreq
'--- ... --- countFreq
A plus将并行运行每个消费者
感谢Petr,我想出了这个解决方案。 order = do
chan< - atomically newBroadcastTMChan
results< - forM orders $ \_ - > newEmptyMVar
线程< - forM(zip结果订单)$
forkIO。 uncurry(sink chan)
forkIO。 runResourceT $ sourceFiletest.txt
$$ javascriptTokenizer
= $ sinkTMChan chan
$ b $ forM结果readMVar
其中
sink chan结果n = do
chan'< - 原子地$ dupTMChan chan
freqs< - runResourceT $ sourceTMChan chan'
$$ ngram n
= $频率
putMVar结果freqs
我假设您想要所有接收器接收所有值。
我建议:
- 使用
newBroadcastTMChan
创建一个新频道Control.Concurrent.STM.TMChan
(stm-chans) li>
- 使用此频道建立罪恶k使用
sinkTBMChan
来自Data.Conduit.TMChan
(stm-conduit) li>
- 对于每个客户端,使用
dupTMChan
创建自己的副本以供阅读。使用sourceTBMChan
开始一个新线程。 - 收集线程结果。
- 确保您的客户能够在生成数据的同时快速读取数据,否则可能会导致堆溢出。 )
- Use
newBroadcastTMChan
to create a new channelControl.Concurrent.STM.TMChan
(stm-chans). - Use this channel to build a sink using
sinkTBMChan
fromData.Conduit.TMChan
(stm-conduit) for your main producer. - For each client use
dupTMChan
to create its own copy for reading. Start a new thread that will read this copy usingsourceTBMChan
. - Collect results from your threads.
- Be sure your clients can read the data as fast as they're produced, otherwise you can get heap overflow.
更新:如何收集结果的一种方式是创建一个 MVar
为每个消费者线程。它们中的每一个都会在结束后得到它的结果 putMVar
。并且你的主线程会在所有这些 MVar
s上 takeMVar
,从而等待每个线程完成。例如,如果 vars
是你的 MVar
s的列表,主线程会发出 mapM takeMVar vars
收集所有结果。
I write a program which counts the frequencies of NGrams in a corpus. I already have a function that consumes a stream of tokens and produces NGrams of one single order:
ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)
At the moment i just can connect one stream consumer to a stream source:
tokens --- trigrams --- countFreq
How do I connect multiple stream consumers to the same stream source? I would like to have something like this:
.--- unigrams --- countFreq
|--- bigrams --- countFreq
tokens ----|--- trigrams --- countFreq
'--- ... --- countFreq
A plus would be to run each consumer in parallel
EDIT: Thanks to Petr I came up with this solution
spawnMultiple orders = do
chan <- atomically newBroadcastTMChan
results <- forM orders $ \_ -> newEmptyMVar
threads <- forM (zip results orders) $
forkIO . uncurry (sink chan)
forkIO . runResourceT $ sourceFile "test.txt"
$$ javascriptTokenizer
=$ sinkTMChan chan
forM results readMVar
where
sink chan result n = do
chan' <- atomically $ dupTMChan chan
freqs <- runResourceT $ sourceTMChan chan'
$$ ngram n
=$ frequencies
putMVar result freqs
I'm assuming you want all your sinks to receive all values.
I'd suggest:
(I haven't tried it, let us know how it works.)
Update: One way how you could collect the results is to create a MVar
for each consumer thread. Each of them would putMVar
its result after it's finished. And your main thread would takeMVar
on all these MVar
s, thus waiting for every thread to finish. For example if vars
is a list of your MVar
s, the main thread would issue mapM takeMVar vars
to collect all the results.
这篇关于管道:多流消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!