为什么我的修改后的(现实世界的 haskell)Mapreduce 实现因“打开的文件太多"而失败? [英] Why does my modified (real world haskell) Mapreduce implementation fails with "Too many open files"

查看:10
本文介绍了为什么我的修改后的(现实世界的 haskell)Mapreduce 实现因“打开的文件太多"而失败?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在实现一个 haskell 程序,它将文件的每一行与文件中的每一行进行比较.为简单起见,我们假设一行表示的数据结构只是一个 Int,而我的算法是平方距离.我将按如下方式实现:

I am implementing a haskell program wich compares each line of a file with each other line in the file. For symplicity let's assume the datastructure represented by one line is just an Int, and my algorithm is the squared distance. This would I implement as follows:

--My operation
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Applying my operation simply on a file
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
    where infiniteList :: Int->Int-> [Int]
          infiniteList i j = (i + j) : infiniteList j (i+j)

很遗憾,完整的文件将保存在内存中.为了防止在非常大的文件上可能出现内存不足异常,我想在每次递归allDistances"时将文件光标返回到文件的开头.

Unfortunately the complete file will be kept in memory. To prevent possible out of memory exceptions on very big files, i would like to seek the filecursor back to the beginning of the file, at each recursion of 'allDistances'.

在Real World Haskell"一书中,给出了 mapreduce 的实现,具有将文件分割成块的功能(第 24 章,可用 这里).我修改了分块功能,而不是将整个文件分成块,而是返回与行一样多的块,每个块代表一个元素

In the book "Real World Haskell" an implementation is given of mapreduce, with a function to split a file in chunks (chapter 24, available here). I modified the chunking function to, instead of dividing the complete file in chunks, return as many chunks as lines with each chunk representing one element of

tails . lines. readFile

完整的实现是(加上前面的代码区域)

The full implementation is (plus the previous code region)

import qualified Data.ByteString.Lazy.Char8 as Lazy
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
import System.IO

--Applying my operation using mapreduce on a very big file
sumOfDistancesOnFile :: FilePath -> IO Int
sumOfDistancesOnFile path = chunkedFileOperation chunkByLinesTails (distancesUsingMapReduce) path

distancesUsingMapReduce :: [Lazy.ByteString] -> Int
distancesUsingMapReduce = mapReduce rpar (distancesFirstToTail . lexer)
                                rpar combineDistances
              where lexer :: Lazy.ByteString -> [Int]
                    lexer chunk = map (read . Lazy.unpack) (Lazy.lines chunk)

distancesOneToMany :: Int -> [Int] -> Int
distancesOneToMany one many = combineDistances $ map (distance one) many

distancesFirstToTail :: [Int] -> Int
distancesFirstToTail s = 
              if not (null s)
              then distancesOneToMany (head s) (tail s)
              else 0
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
      -> (a -> b)   -- map function
      -> Strategy c -- evaluation strategy for reduction
      -> ([b] -> c) -- reduce function
      -> [a]        -- list to map over
      -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
      mapResult `pseq` reduceResult
      where mapResult    = parMap mapStrat mapFunc input
            reduceResult = reduceFunc mapResult `using` reduceStrat


--Working with (file)chunks:
data ChunkSpec = CS{
    chunkOffset :: !Int64
    , chunkLength :: !Int64
    } deriving (Eq,Show)

chunkedFileOperation ::   (NFData a)=>
            (FilePath-> IO [ChunkSpec])
       ->   ([Lazy.ByteString]-> a)
       ->   FilePath
       ->   IO a
chunkedFileOperation chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedRead chunkCreator path
    let r = funcOnChunks chunks
    (rdeepseq r `seq` return r) `finally` mapM_ hClose handles

chunkedRead ::  (FilePath -> IO [ChunkSpec])
        ->  FilePath
        ->  IO ([Lazy.ByteString], [Handle])
chunkedRead chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ spec -> do
    h <- openFile path ReadMode
    hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
    chunk <- Lazy.take (chunkLength spec) `liftM` Lazy.hGetContents h
    return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Lazy.hGet h 4096
                        case Lazy.elemIndex '
' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Lazy.length bytes)
            findNewline newOffset
        findChunks 0

不幸的是,在更大的文件(例如 2000 行)上,mapreduce 版本会引发异常:
* 异常:getCurrentDirectory:资源耗尽(打开的文件太多)

Unfortunately, on a bigger file (for example 2000 lines) the mapreduce version throws an exception:
* Exception: getCurrentDirectory: resource exhausted (Too many open files)

我有点惭愧不能自己调试程序,但我只知道如何调试java/c#代码.而且我也不知道如何正确测试文件分块和读取.我希望问题不是 mapreduce 函数本身的一部分,因为没有 mapreduce 的类似版本也会引发异常.在那次尝试中,我让 chunkedFileOperation 接受一个块的操作和它直接应用的reduce"函数.

I'm a bit ashamed to not be able to debug the program myself, but I only know how to debug java/c# code. And I also don't know how the file chunking and reading could be properly tested. I expect the problem not to be part of the mapreduce function itself, as a similar version without mapreduce also throws an exception. In that attempt I had the chunkedFileOperation accept both the operation for one chunk and the 'reduce' function, which it applied directly.

顺便说一句,我正在跑步
Mac OS X 10.6.7(雪豹)上的 HaskellPlatform 2011.2.0
带有以下软件包:
字节串 0.9.1.10
并行 3.1.0.1
我有资格成为一名自学成才的初学者/新的 Haskell 程序员

Btw, I'm running
HaskellPlatform 2011.2.0 on Mac OS X 10.6.7 (snow leopard)
with the following packages:
bytestring 0.9.1.10
parallel 3.1.0.1
and i qualify as a self-educated beginner/fresh haskell programmer

推荐答案

你正在使用惰性 IO,所以那些用 readFile 打开的文件没有被及时关闭.您需要考虑一种定期明确关闭文件的解决方案(例如,通过严格 IO 或迭代 IO).

You're using lazy IO, so those files opened with readFile aren't being closed in a timely fashion. You'll need to think of a solution that explicitly closes the files regularly (e.g. via strict IO, or iteratee IO).

这篇关于为什么我的修改后的(现实世界的 haskell)Mapreduce 实现因“打开的文件太多"而失败?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆