流式XML管道解析结果 [英] Streaming xml-conduit parse results
问题描述
我想使用 xml-conduit
,特别是 Text.XML.Stream.Parse
,以便从一个大的XML文件中懒惰地提取一个对象列表。
作为测试用例,我使用最近重新发布的StackOverflow数据转储。为了简单起见,我打算从> stackoverflow.com-Users.7z
中提取所有用户名。即使该文件是 .7z
, file
表示它只是bzip2压缩的数据(可能有一些7zip但是现在我不在乎)。
XML的简化版本将是
< users>
...
...
< / users>
基于以前的Q& A 和示例
$ c> runghc 运行下面的程序,它运行时不打印任何输出:
{ - #LANGUAGE OverloadedStrings# - }
导入Data.Conduit(runResourceT,($$),($ =))
将限定的Data.Conduit.Binary导入为CB
import Data.Conduit.BZlib
导入Data.Conduit
导入Data.Text(文本)
导入System.IO
导入Text.XML.Stream.Parse
导入Control.Applicative((< ; *))
data User = User {name :: Text}派生(显示)
parseUserRow = tagNamerow(requireAttrDisplayName< * ignoreAttrs) $ \displayName - >
return $ User $ display
$ b parseUsers = tagNoAttrusers$ many parseUserRow
main = do
users< - runResourceT $ CB.sourceFile stackoverflow.com-Users.7z$ = bunzip2 $ = parseBytes def $$ forceusers requiredparseUsers
putStrLn $ unlines $ map show users
我假设发生此问题是因为Haskell在开始打印之前试图深入评估 users
列表。这个理论得到了该程序的内存使用量的支持,该程序的持续增长速度约为每秒2%(来源:htop)。
如何将结果直播标准输出?我认为这可以通过在最后添加另一个管道语句,如 $$ CB.sinkFileoutput.txt
来实现。然而,这个特定的版本需要 ByteString
的 Conduit
输出。你可以指点我正确的方向去哪里?
任何帮助将不胜感激!
也就是说,让我解释一下你看到的问题。 many
函数会创建一个结果列表,并且在完成处理之前不会生成任何值。在你的情况下,有这么多的价值,这似乎永远不会发生。最终,当整个文件被读取时,整个用户列表将被一次显示。但是,这显然不是您要查找的行为。
相反,您要做的是创建一个 的用户
值,只要它们准备就绪即可生成。你想要做的是基本上用一个新函数替换多个
函数调用,这个函数会在每次调用结果时都会产生 yield
解析。一个简单的实现可以是:
yieldWhileJust :: Monad m
=> ConduitM a b m(也许b)
- >导管a m b
yieldWhileJust消费者=
循环
其中
循环=
mx< - 消费者
案例mx
无 - > return()
Just x - >产量×>>循环
另外,不要使用 putStrLn $ unlines $ map show
,你想把整个管道附加到一个消费者上,每个消费者都会单独打印出 Data.Conduit.List.mapM _
轻松实现,例如: CL.mapM_(liftIO。print)
。
我把完整示例。输入是一个人为生成的无限XML文件,只是为了证明它真的在立即产生输出。
{ - # LANGUAGE OverloadedStrings# - }
{ - #LANGUAGE RankNTypes# - }
import Control.Applicative((<*))
import Control.Concurrent(threadDelay)
import Control。 Monad(forever,void)
导入Control.Monad.IO.Class(MonadIO(liftIO))
导入Data.ByteString(ByteString)
导入Data.Conduit
导入限定数据.Conduit.List作为CL
导入Data.Text(文本)
导入Data.Text.Encoding(encodeUtf8)
导入Data.XML.Types(事件)
导入文本。 XML.Stream.Parse
- 仅用于测试目的而不是实际包含大量输入数据文件
infiniteInput :: MonadIO m =>源m ByteString
infiniteInput = do
产生< users>
forever $ do
yield $ encodeUtf8
< row id = \1 \DisplayName = \StackOverflow \/>< row id = \ 2597135\DisplayName = \UliKöhler\/>
liftIO $ threadDelay 1000000
--yield< / users> - 永远不会达到
data User = User {name :: Text}派生(显示)
parseUserRow :: MonadThrow m =>消费者事件m(可能是用户)
parseUserRow = tagNamerow(requireAttrDisplayName< * ignoreAttrs)$ \displayName - > do
return $ User displayName
parseUsers :: MonadThrow m => Conduit Event m User
parseUsers = void $ tagNoAttrusers$ yieldWhileJust parseUserRow
yieldWhileJust :: Monad m
=> ConduitM a b m(也许b)
- >导管a m b
yieldWhileJust消费者=
循环
其中
循环=
mx< - 消费者
案例mx
无 - > return()
Just x - >产量×>>循环
main :: IO()
main = infiniteInput
$$ parseBytes def
= $ parseUsers
= $ CL.mapM_ print
I want to use xml-conduit
, specifically Text.XML.Stream.Parse
in order to lazily extract a list of objects from a large XML file.
As a test case, I use the recently re-released StackOverflow data dumps. To keep it simple, I intend to extract all usernames from stackoverflow.com-Users.7z
. Even if the file is a .7z
, file
says it is just bzip2-compressed data (there might be some 7zip stuff at the end of the file, but right now I don't care).
A simplified version of the XML would be
<users>
<row id="1" DisplayName="StackOverflow"/>
...
<row id="2597135" DisplayName="Uli Köhler"/>
...
</users>
Based on this previous Q&A and the example on Hackage stream-reading the example XML in bz2-ed form works perfectly for me
However, when using runghc
to run the following program, it runs without printing any output:
{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit (runResourceT, ($$), ($=))
import qualified Data.Conduit.Binary as CB
import Data.Conduit.BZlib
import Data.Conduit
import Data.Text (Text)
import System.IO
import Text.XML.Stream.Parse
import Control.Applicative ((<*))
data User = User {name :: Text} deriving (Show)
parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do
return $ User displayName
parseUsers = tagNoAttr "users" $ many parseUserRow
main = do
users <- runResourceT $ CB.sourceFile "stackoverflow.com-Users.7z" $= bunzip2 $= parseBytes def $$ force "users required" parseUsers
putStrLn $ unlines $ map show users
I assume this issue occurs because Haskell tries to deeply evaluate the users
list before starting to print it. This theory is supported by the memory usage of the program continually growing about 2 percent per second (source: htop).
How can I "live-stream" the results to stdout? I assume this is possible by adding another conduit statement like $$ CB.sinkFile "output.txt"
at the end. This specific version however expects a Conduit
output of ByteString
. Could you point me in the right direction where to go from here?
Any help will be appreciated!
Let me start by saying that the streaming helper API in xml-conduit has not be worked on in years, and could probably benefit from a reimagining given changes that have happened to conduit in the interim. I think there are likely much better ways to accomplish things.
That said, let me explain the problem you're seeing. The many
function creates a list of results, and will not produce any values until it has finished processing. In your case, there are so many values that this appears to never happen. Ultimately, when the entire file has been read, the entire list of users will be displayed at once. But that's clearly not the behavior you're looking for.
Instead, what you want to do is create a stream of User
values which are produced as soon as they're ready. What you want to do is basically replace the many
function call with a new function which will yield
a result each time it's parsed. A simple implementation of this could be:
yieldWhileJust :: Monad m
=> ConduitM a b m (Maybe b)
-> Conduit a m b
yieldWhileJust consumer =
loop
where
loop = do
mx <- consumer
case mx of
Nothing -> return ()
Just x -> yield x >> loop
Also, instead of using putStrLn $ unlines $ map show
, you want to attach the entire pipeline to a consumer which will print each individually yielded User
value. This can be implemented easily with Data.Conduit.List.mapM_
, e.g.: CL.mapM_ (liftIO . print)
.
I've put together a full example based on your code. The input is an artificially generated infinite XML file, just to prove the point that it really is yielding output immediately.
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
import Control.Applicative ((<*))
import Control.Concurrent (threadDelay)
import Control.Monad (forever, void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.ByteString (ByteString)
import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Text (Text)
import Data.Text.Encoding (encodeUtf8)
import Data.XML.Types (Event)
import Text.XML.Stream.Parse
-- instead of actually including a large input data file, just for testing purposes
infiniteInput :: MonadIO m => Source m ByteString
infiniteInput = do
yield "<users>"
forever $ do
yield $ encodeUtf8
"<row id=\"1\" DisplayName=\"StackOverflow\"/><row id=\"2597135\" DisplayName=\"Uli Köhler\"/>"
liftIO $ threadDelay 1000000
--yield "</users>" -- will never be reached
data User = User {name :: Text} deriving (Show)
parseUserRow :: MonadThrow m => Consumer Event m (Maybe User)
parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do
return $ User displayName
parseUsers :: MonadThrow m => Conduit Event m User
parseUsers = void $ tagNoAttr "users" $ yieldWhileJust parseUserRow
yieldWhileJust :: Monad m
=> ConduitM a b m (Maybe b)
-> Conduit a m b
yieldWhileJust consumer =
loop
where
loop = do
mx <- consumer
case mx of
Nothing -> return ()
Just x -> yield x >> loop
main :: IO ()
main = infiniteInput
$$ parseBytes def
=$ parseUsers
=$ CL.mapM_ print
这篇关于流式XML管道解析结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!