使用组合器将管道重新组合成更大的块 [英] Rechunk a conduit into larger chunks using combinators

查看:167
本文介绍了使用组合器将管道重新组合成更大的块的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试构建一个接收输入为 ByteString s的 Conduit (每个块的大小约为1kb )并产生连续的 ByteString s的512kb块。

这似乎应该很简单,但我遇到了很多麻烦,我尝试使用的大多数策略只能成功地将大块分成更小的块,我没有成功连接更大的块。



我开始尝试 isolate ,然后 takeExactlyE 和最终 conduitVector ,但无济于事。最终我解决了这个问题:

 将限定的Data.Conduit导入为C 
将限定的Data.Conduit.Combinators导入为C
将合格的Data.ByteString导入为B
将合格的Data.ByteString.Lazy导入为BL

chunksOfAtLeast :: Monad m => Int - > C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
其中
循环缓冲区n = do
mchunk< - C.await
案例mchunk
Nothing - >
- 产生最后的剩余字节
当(n 只是块 - > do
- 当缓冲区已经被填满并从
开始时让出buffer'= buffer<> BL.fromStrict chunk
l = B.length chunk
if n <= l
then C.yield buffer'>>循环BL.empty chunkSize
else循环缓冲区'(n-1)

<子> PS我决定不为这个功能拆分更大的块,但这只是一个简单的方便。 然而,这看起来很冗长管道功能处理分块 [ 1 2 3 4 ] 。请帮忙!必须有更好的方法来使用combinators来做到这一点,但我错过了一些直觉!

正如我所做的那样,为缓冲区使用惰性字节串是否可行?我对字节串的内部表示有点不清楚,这是否会有所帮助,特别是因为我使用了 BL.length ,我猜可能会评估thunk吗? / b> 为了详细说明Michael的回答和评论,我最终得到了这个管道:

 将限定的Data.Conduit导入为C 
将合格的Data.Conduit.Combinators导入为C
将合格的Data.ByteString导入为B
将合格的Data.ByteString.Lazy导入为BL

- 严格rechunk分块管道
chunksOfE'::(MonadBase base m,PrimMonad base)
=> Int
- > C.Conduit ByteString m ByteString
chunksOfE'chunkSize = C.vectorBuilder chunkSize C.mapM_E = $ = C.map fromByteVector

我的理解是, vectorBuilder 将支付早期连接较小块的开销,从而生成聚合块作为严格的字节串。

据我所知,生成惰性字节串块(即chunked chunk)的替代实现可能是合乎需要的,非常大并且/或者馈送到像网络套接字那样的自然流式接口中。这是我在lazy bytestring版本上的最佳尝试:

 将限定的Data.Sequences.Lazy导入为SL 
导入合格的Data.Sequences as S
将合格的Data.Conduit.List导入为CL

- | lazyrechunk分块管道
chunksOfE ::(Monad m,SL.LazySequence lazy strict)
=> S.Index lazy
- > C.Conduit严格懒惰
chunksOfE chunkSize = CL.sequence C.sinkLazy = $ = C.takeE chunkSize


  { - #LANGUAGE NoImplicitPrelude# - } 
{ - #LANGUAGE OverloadedStrings# - }
import ClassyPrelude.Conduit

chunksOfAtLeast :: Monad m => Int - > Conduit ByteString m LByteString
chunksOfAtLeast chunkSize =
loop
其中
loop = do
lbs< - takeCE chunkSize = $ = sinkLazy
除非(null lbs )$ do
yield lbs
loop

main :: IO()
main =
yieldMany [hello,there,world !]
$$ chunksOfAtLeast 3
= $ mapM_C print

根据您的目标,您可以采取许多其他方法。如果你想要有一个严格的缓冲区,那么使用vectorBuilder的blaze-builder会很有意义。但是,这保留了您已有的相同类型签名。


I am trying to construct a Conduit that receives as input ByteStrings (of around 1kb per chunk in size) and produces as output concatenated ByteStrings of 512kb chunks.

This seems like it should be simple to do, but I'm having a lot of trouble, most of the strategies I've tried using have only succeeded in dividing the chunks into smaller chunks, I haven't succeeded in concatenating larger chunks.

I started out trying isolate, then takeExactlyE and eventually conduitVector, but to no avail. Eventually I settled on this:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
  where 
    loop buffer n = do
      mchunk <- C.await
      case mchunk of 
        Nothing -> 
          -- Yield last remaining bytes
          when (n < chunkSize) (C.yield buffer)
        Just chunk -> do
          -- Yield when the buffer has been filled and start over
          let buffer' = buffer <> BL.fromStrict chunk
              l       = B.length chunk
          if n <= l
          then C.yield buffer' >> loop BL.empty chunkSize
          else loop buffer' (n - l)

P.S. I decided not to split larger chunks for this function, but this was just a convenient simplification.

However, this seems very verbose given all the conduit functions that deal with chunking[1,2,3,4]. Please help! There must surely be a better way to do this using combinators, but I am missing some piece of intuition!

P.P.S. Is it ok to use lazy bytestring for the buffer as I've done? I'm a bit unclear about the internal representation for bytestring and whether this will help, especially since I'm using BL.length which I guess might evaluate the thunk anyway?


Conclusion

Just to elaborate on Michael's answer and comments, I ended up with this conduit:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base) 
         => Int 
         -> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector

My understanding is that vectorBuilder will pay the cost for concatenating the smaller chunks early on, producing the aggregated chunks as strict bytestrings.

From what I can tell, an alternative implementation that produces lazy bytestring chunks (i.e. "chunked chunks") might be desirable when the aggregated chunks are very large and/or feed into a naturally streaming interface like a network socket. Here's my best attempt at the "lazy bytestring" version:

import qualified Data.Sequences.Lazy        as SL
import qualified Data.Sequences             as S
import qualified Data.Conduit.List          as CL

-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
          => S.Index lazy
          -> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize

解决方案

How about this?

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit

chunksOfAtLeast :: Monad m => Int -> Conduit ByteString m LByteString
chunksOfAtLeast chunkSize =
    loop
  where
    loop = do
        lbs <- takeCE chunkSize =$= sinkLazy
        unless (null lbs) $ do
            yield lbs
            loop

main :: IO ()
main =
    yieldMany ["hello", "there", "world!"]
        $$ chunksOfAtLeast 3
        =$ mapM_C print

There are lots of other approaches that you could take depending on your goals. If you wanted to have a strict buffer, then using blaze-builder of vectorBuilder would make a lot of sense. But this keeps the same type signature you have already.

这篇关于使用组合器将管道重新组合成更大的块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆