为什么我的修改过的(现实世界中的haskell)Mapreduce实现失败,并显示“太多打开的文件” [英] Why does my modified (real world haskell) Mapreduce implementation fails with "Too many open files"

查看:115
本文介绍了为什么我的修改过的(现实世界中的haskell)Mapreduce实现失败,并显示“太多打开的文件”的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在执行一个haskell程序,它将文件的每一行与文件中的每一行进行比较。为了让我们假设一行代表的数据结构只是一个Int,而我的算法是平方距离。这将我执行如下:

I am implementing a haskell program wich compares each line of a file with each other line in the file. For symplicity let's assume the datastructure represented by one line is just an Int, and my algorithm is the squared distance. This would I implement as follows:

--My operation
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Applying my operation simply on a file
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
    where infiniteList :: Int->Int-> [Int]
          infiniteList i j = (i + j) : infiniteList j (i+j)



<不幸的是,整个文件将被保存在内存中。为了防止在非常大的文件上出现可能的内存不足异常,我想在每次'allDistances'递归时将文件返回到文件的开头。

Unfortunately the complete file will be kept in memory. To prevent possible out of memory exceptions on very big files, i would like to seek the filecursor back to the beginning of the file, at each recursion of 'allDistances'.

在Real World Haskell一书中,给出了一个mapreduce的实现,它具有一个以块分割文件的功能(第24章,可用 here )。我修改了chunking函数,而不是将整个文件分成块,而是返回与每个块代表的块相同数量的块,这些块代表

In the book "Real World Haskell" an implementation is given of mapreduce, with a function to split a file in chunks (chapter 24, available here). I modified the chunking function to, instead of dividing the complete file in chunks, return as many chunks as lines with each chunk representing one element of

tails . lines. readFile

完整的实现是(加上前面的代码区域)

The full implementation is (plus the previous code region)

import qualified Data.ByteString.Lazy.Char8 as Lazy
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
import System.IO

--Applying my operation using mapreduce on a very big file
sumOfDistancesOnFile :: FilePath -> IO Int
sumOfDistancesOnFile path = chunkedFileOperation chunkByLinesTails (distancesUsingMapReduce) path

distancesUsingMapReduce :: [Lazy.ByteString] -> Int
distancesUsingMapReduce = mapReduce rpar (distancesFirstToTail . lexer)
                                rpar combineDistances
              where lexer :: Lazy.ByteString -> [Int]
                    lexer chunk = map (read . Lazy.unpack) (Lazy.lines chunk)

distancesOneToMany :: Int -> [Int] -> Int
distancesOneToMany one many = combineDistances $ map (distance one) many

distancesFirstToTail :: [Int] -> Int
distancesFirstToTail s = 
              if not (null s)
              then distancesOneToMany (head s) (tail s)
              else 0
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
      -> (a -> b)   -- map function
      -> Strategy c -- evaluation strategy for reduction
      -> ([b] -> c) -- reduce function
      -> [a]        -- list to map over
      -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
      mapResult `pseq` reduceResult
      where mapResult    = parMap mapStrat mapFunc input
            reduceResult = reduceFunc mapResult `using` reduceStrat


--Working with (file)chunks:
data ChunkSpec = CS{
    chunkOffset :: !Int64
    , chunkLength :: !Int64
    } deriving (Eq,Show)

chunkedFileOperation ::   (NFData a)=>
            (FilePath-> IO [ChunkSpec])
       ->   ([Lazy.ByteString]-> a)
       ->   FilePath
       ->   IO a
chunkedFileOperation chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedRead chunkCreator path
    let r = funcOnChunks chunks
    (rdeepseq r `seq` return r) `finally` mapM_ hClose handles

chunkedRead ::  (FilePath -> IO [ChunkSpec])
        ->  FilePath
        ->  IO ([Lazy.ByteString], [Handle])
chunkedRead chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
    h <- openFile path ReadMode
    hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
    chunk <- Lazy.take (chunkLength spec) `liftM` Lazy.hGetContents h
    return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Lazy.hGet h 4096
                        case Lazy.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Lazy.length bytes)
            findNewline newOffset
        findChunks 0

不幸的是,在一个更大的文件上(比如2000行),mapreduce版本会抛出一个异常:

*异常:getCurrentDirectory:资源耗尽(打开的文件太多)

Unfortunately, on a bigger file (for example 2000 lines) the mapreduce version throws an exception:
* Exception: getCurrentDirectory: resource exhausted (Too many open files)

我对自己无法调试程序感到有点惭愧,但我只知道如何调试java / c#代码。而且我也不知道如何正确测试文件分块和读取。我期望这个问题不是mapreduce函数本身的一部分,因为没有mapreduce的类似版本也会引发异常。在那次尝试中,我让chunkedFileOperation接受了一个块的操作和直接应用的'reduce'函数。

I'm a bit ashamed to not be able to debug the program myself, but I only know how to debug java/c# code. And I also don't know how the file chunking and reading could be properly tested. I expect the problem not to be part of the mapreduce function itself, as a similar version without mapreduce also throws an exception. In that attempt I had the chunkedFileOperation accept both the operation for one chunk and the 'reduce' function, which it applied directly.

顺便说一下,我正在运行

HaskellPlatform 2011.2.0 Mac OS X 10.6.7(雪豹)

包含以下软件包:

bytestring 0.9.1.10

并行3.1.0.1

并且我有资格成为自学成才的新手/新鲜haskell程序员

Btw, I'm running
HaskellPlatform 2011.2.0 on Mac OS X 10.6.7 (snow leopard)
with the following packages:
bytestring 0.9.1.10
parallel 3.1.0.1
and i qualify as a self-educated beginner/fresh haskell programmer

推荐答案

重新使用懒惰IO,所以用 readFile 打开的文件没有及时关闭。您需要考虑定期明确关闭文件的解决方案(例如,通过严格的IO或iteratee IO)。

You're using lazy IO, so those files opened with readFile aren't being closed in a timely fashion. You'll need to think of a solution that explicitly closes the files regularly (e.g. via strict IO, or iteratee IO).

这篇关于为什么我的修改过的(现实世界中的haskell)Mapreduce实现失败,并显示“太多打开的文件”的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆