管道:多流消费者 [英] Conduit: Multiple Stream Consumers

查看:150
本文介绍了管道:多流消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我编写一个程序来计算语料库中NGrams的频率。我已经有了一个函数,它消耗了一系列的标记并生成单个订单的NGrams:

  ngram :: Monad m => ; Int  - > Conduit t m [t] 
trigrams = ngram 3
countFreq ::(Ord t,Monad m)=> Consumer [t] m(Map [t] Int)

目前我只能连接一个流消费者到流源:

 标记--- trigrams --- countFreq 

如何将多个流消费者连接到相同的流源?
我想要这样的:

  .--- unigrams  -  -  countFreq 
| --- bigrams --- countFreq
令牌---- | --- trigrams --- countFreq
'--- ... --- countFreq

A plus将并行运行每个消费者




感谢Petr,我想出了这个解决方案。 order = do
chan< - atomically newBroadcastTMChan

results< - forM orders $ \_ - > newEmptyMVar
线程< - forM(zip结果订单)$
forkIO。 uncurry(sink chan)

forkIO。 runResourceT $ sourceFiletest.txt
$$ javascriptTokenizer
= $ sinkTMChan chan
$ b $ forM结果readMVar

其中
sink chan结果n = do
chan'< - 原子地$ dupTMChan chan
freqs< - runResourceT $ sourceTMChan chan'
$$ ngram n
= $频率
putMVar结果freqs


解决方案

我假设您想要所有接收器接收所有值。



我建议:


  1. 使用 newBroadcastTMChan 创建一个新频道 Control.Concurrent.STM.TMChan (stm-chans) li>
  2. 使用此频道建立罪恶k使用 sinkTBMChan 来自 Data.Conduit.TMChan (stm-conduit) li>
  3. 对于每个客户端,使用 dupTMChan 创建自己的副本以供阅读。使用 sourceTBMChan 开始一个新线程。

  4. 收集线程结果。

  5. 确保您的客户能够在生成数据的同时快速读取数据,否则可能会导致堆溢出。





  6. 更新:如何收集结果的一种方式是创建一个 MVar 为每个消费者线程。它们中的每一个都会在结束后得到它的结果 putMVar 。并且你的主线程会在所有这些 MVar s上 takeMVar ,从而等待每个线程完成。例如,如果 vars 是你的 MVar s的列表,主线程会发出 mapM takeMVar vars 收集所有结果。


    I write a program which counts the frequencies of NGrams in a corpus. I already have a function that consumes a stream of tokens and produces NGrams of one single order:

    ngram :: Monad m => Int -> Conduit t m [t]
    trigrams = ngram 3
    countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)
    

    At the moment i just can connect one stream consumer to a stream source:

    tokens --- trigrams --- countFreq
    

    How do I connect multiple stream consumers to the same stream source? I would like to have something like this:

               .--- unigrams --- countFreq
               |--- bigrams  --- countFreq
    tokens ----|--- trigrams --- countFreq
               '--- ...      --- countFreq
    

    A plus would be to run each consumer in parallel

    EDIT: Thanks to Petr I came up with this solution

    spawnMultiple orders = do
        chan <- atomically newBroadcastTMChan
    
        results <- forM orders $ \_ -> newEmptyMVar
        threads <- forM (zip results orders) $
                            forkIO . uncurry (sink chan)
    
        forkIO . runResourceT $ sourceFile "test.txt"
                             $$ javascriptTokenizer
                             =$ sinkTMChan chan
    
        forM results readMVar
    
        where
            sink chan result n = do
                chan' <- atomically $ dupTMChan chan
                freqs <- runResourceT $ sourceTMChan chan'
                                     $$ ngram n
                                     =$ frequencies
                putMVar result freqs
    

    解决方案

    I'm assuming you want all your sinks to receive all values.

    I'd suggest:

    1. Use newBroadcastTMChan to create a new channel Control.Concurrent.STM.TMChan (stm-chans).
    2. Use this channel to build a sink using sinkTBMChan from Data.Conduit.TMChan (stm-conduit) for your main producer.
    3. For each client use dupTMChan to create its own copy for reading. Start a new thread that will read this copy using sourceTBMChan.
    4. Collect results from your threads.
    5. Be sure your clients can read the data as fast as they're produced, otherwise you can get heap overflow.

    (I haven't tried it, let us know how it works.)


    Update: One way how you could collect the results is to create a MVar for each consumer thread. Each of them would putMVar its result after it's finished. And your main thread would takeMVar on all these MVars, thus waiting for every thread to finish. For example if vars is a list of your MVars, the main thread would issue mapM takeMVar vars to collect all the results.

    这篇关于管道:多流消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆