使用组合器将管道重新组合成更大的块 [英] Rechunk a conduit into larger chunks using combinators
问题描述
我正在尝试构建一个接收输入为 ByteString
s的 Conduit
(每个块的大小约为1kb )并产生连续的 ByteString
s的512kb块。
这似乎应该很简单,但我遇到了很多麻烦,我尝试使用的大多数策略只能成功地将大块分成更小的块,我没有成功连接更大的块。
我开始尝试 isolate
,然后 takeExactlyE
和最终 conduitVector
,但无济于事。最终我解决了这个问题:
将限定的Data.Conduit导入为C
将限定的Data.Conduit.Combinators导入为C
将合格的Data.ByteString导入为B
将合格的Data.ByteString.Lazy导入为BL
chunksOfAtLeast :: Monad m => Int - > C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
其中
循环缓冲区n = do
mchunk< - C.await
案例mchunk
Nothing - >
- 产生最后的剩余字节
当(n 只是块 - > do
- 当缓冲区已经被填满并从
开始时让出buffer'= buffer<> BL.fromStrict chunk
l = B.length chunk
if n <= l
then C.yield buffer'>>循环BL.empty chunkSize
else循环缓冲区'(n-1)
<子> PS我决定不为这个功能拆分更大的块,但这只是一个简单的方便。 然而,这看起来很冗长管道功能处理分块 [ 1 , 2 , 3 , 4 ] 。请帮忙!必须有更好的方法来使用combinators来做到这一点,但我错过了一些直觉!
正如我所做的那样,为缓冲区使用惰性字节串是否可行?我对字节串的内部表示有点不清楚,这是否会有所帮助,特别是因为我使用了 BL.length
,我猜可能会评估thunk吗? / b> 为了详细说明Michael的回答和评论,我最终得到了这个管道: 将限定的Data.Conduit导入为C
将合格的Data.Conduit.Combinators导入为C
将合格的Data.ByteString导入为B
将合格的Data.ByteString.Lazy导入为BL
- 严格rechunk分块管道
chunksOfE'::(MonadBase base m,PrimMonad base)
=> Int
- > C.Conduit ByteString m ByteString
chunksOfE'chunkSize = C.vectorBuilder chunkSize C.mapM_E = $ = C.map fromByteVector
我的理解是, vectorBuilder
将支付早期连接较小块的开销,从而生成聚合块作为严格的字节串。
据我所知,生成惰性字节串块(即chunked chunk)的替代实现可能是合乎需要的,非常大并且/或者馈送到像网络套接字那样的自然流式接口中。这是我在lazy bytestring版本上的最佳尝试:
将限定的Data.Sequences.Lazy导入为SL
导入合格的Data.Sequences as S
将合格的Data.Conduit.List导入为CL
- | lazyrechunk分块管道
chunksOfE ::(Monad m,SL.LazySequence lazy strict)
=> S.Index lazy
- > C.Conduit严格懒惰
chunksOfE chunkSize = CL.sequence C.sinkLazy = $ = C.takeE chunkSize
{ - #LANGUAGE NoImplicitPrelude# - }
{ - #LANGUAGE OverloadedStrings# - }
import ClassyPrelude.Conduit
chunksOfAtLeast :: Monad m => Int - > Conduit ByteString m LByteString
chunksOfAtLeast chunkSize =
loop
其中
loop = do
lbs< - takeCE chunkSize = $ = sinkLazy
除非(null lbs )$ do
yield lbs
loop
main :: IO()
main =
yieldMany [hello,there,world !]
$$ chunksOfAtLeast 3
= $ mapM_C print
根据您的目标,您可以采取许多其他方法。如果你想要有一个严格的缓冲区,那么使用vectorBuilder的blaze-builder会很有意义。但是,这保留了您已有的相同类型签名。
I am trying to construct a Conduit
that receives as input ByteString
s (of around 1kb per chunk in size) and produces as output concatenated ByteString
s of 512kb chunks.
This seems like it should be simple to do, but I'm having a lot of trouble, most of the strategies I've tried using have only succeeded in dividing the chunks into smaller chunks, I haven't succeeded in concatenating larger chunks.
I started out trying isolate
, then takeExactlyE
and eventually conduitVector
, but to no avail. Eventually I settled on this:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
where
loop buffer n = do
mchunk <- C.await
case mchunk of
Nothing ->
-- Yield last remaining bytes
when (n < chunkSize) (C.yield buffer)
Just chunk -> do
-- Yield when the buffer has been filled and start over
let buffer' = buffer <> BL.fromStrict chunk
l = B.length chunk
if n <= l
then C.yield buffer' >> loop BL.empty chunkSize
else loop buffer' (n - l)
P.S. I decided not to split larger chunks for this function, but this was just a convenient simplification.
However, this seems very verbose given all the conduit functions that deal with chunking[1,2,3,4]. Please help! There must surely be a better way to do this using combinators, but I am missing some piece of intuition!
P.P.S. Is it ok to use lazy bytestring for the buffer as I've done? I'm a bit unclear about the internal representation for bytestring and whether this will help, especially since I'm using BL.length
which I guess might evaluate the thunk anyway?
Conclusion
Just to elaborate on Michael's answer and comments, I ended up with this conduit:
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base)
=> Int
-> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector
My understanding is that vectorBuilder
will pay the cost for concatenating the smaller chunks early on, producing the aggregated chunks as strict bytestrings.
From what I can tell, an alternative implementation that produces lazy bytestring chunks (i.e. "chunked chunks") might be desirable when the aggregated chunks are very large and/or feed into a naturally streaming interface like a network socket. Here's my best attempt at the "lazy bytestring" version:
import qualified Data.Sequences.Lazy as SL
import qualified Data.Sequences as S
import qualified Data.Conduit.List as CL
-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
=> S.Index lazy
-> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize
How about this?
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
import ClassyPrelude.Conduit
chunksOfAtLeast :: Monad m => Int -> Conduit ByteString m LByteString
chunksOfAtLeast chunkSize =
loop
where
loop = do
lbs <- takeCE chunkSize =$= sinkLazy
unless (null lbs) $ do
yield lbs
loop
main :: IO ()
main =
yieldMany ["hello", "there", "world!"]
$$ chunksOfAtLeast 3
=$ mapM_C print
There are lots of other approaches that you could take depending on your goals. If you wanted to have a strict buffer, then using blaze-builder of vectorBuilder would make a lot of sense. But this keeps the same type signature you have already.
这篇关于使用组合器将管道重新组合成更大的块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!