如何在Mule中读取巨大的CSV文件 [英] How to read huge CSV file in Mule

查看:118
本文介绍了如何在Mule中读取巨大的CSV文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Mule Studio 3.4.0社区版。
我有一个大问题,如何解析一个大的CSV文件传入与文件端点。场景是,我有3个CSV文件,我会把文件的内容到数据库。
但是当我尝试加载一个巨大的文件(大约144MB)我得到OutOfMemory异常。我认为作为解决方案将我的大CSV分割为更小的大小CSV(我不知道如果这个解决方案是最好的)o尝试找到一种方法来处理CSV而不抛出异常。

I'am using Mule Studio 3.4.0 Community Edition. I have a big problem about how to parse a large CSV file incoming with File Endpoint. The scenario is that I have 3 CSV files and I would putting the files'content into a database. But when I try to load a huge file (about 144MB) I get the "OutOfMemory" Exception. I thought as solution to divide/split my the large CSV into smaller size CSVs (I don't know if this solution is the best) o try to find a way to process CSV without throwing an exception.

<file:connector name="File" autoDelete="true" streaming="true" validateConnections="true" doc:name="File"/>

<flow name="CsvToFile" doc:name="CsvToFile">
        <file:inbound-endpoint path="src/main/resources/inbox" moveToDirectory="src/main/resources/processed"  responseTimeout="10000" doc:name="CSV" connector-ref="File">
            <file:filename-wildcard-filter pattern="*.csv" caseSensitive="true"/>
        </file:inbound-endpoint>
        <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property"/>
        <choice doc:name="Choice">
            <when expression="INVOCATION:nome_file=azienda" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/companies-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Azienda"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertAziende" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Azienda">
                    <jdbc-ee:query key="InsertAziende" value="INSERT INTO aw006_azienda VALUES (#[map-payload:AW006_ID], #[map-payload:AW006_ID_CLIENTE], #[map-payload:AW006_RAGIONE_SOCIALE])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
            <when expression="INVOCATION:nome_file=servizi" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/services-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Servizi"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertServizi" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Servizi">
                    <jdbc-ee:query key="InsertServizi" value="INSERT INTO ctrl_aemd_unb_servizi VALUES (#[map-payload:CTRL_ID_TIPO_OPERAZIONE], #[map-payload:CTRL_DESCRIZIONE], #[map-payload:CTRL_COD_SERVIZIO])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
            <when expression="INVOCATION:nome_file=richiesta" evaluator="header">
                <jdbc-ee:csv-to-maps-transformer delimiter="," mappingFile="src/main/resources/requests-csv-format.xml" ignoreFirstRecord="true" doc:name="CSV2Richiesta"/>
                <jdbc-ee:outbound-endpoint exchange-pattern="one-way" queryKey="InsertRichieste" queryTimeout="-1" connector-ref="jdbcConnector" doc:name="Database Richiesta">
                    <jdbc-ee:query key="InsertRichieste" value="INSERT INTO ctrl_aemd_unb_richiesta VALUES (#[map-payload:CTRL_ID_CONTROLLER], #[map-payload:CTRL_NUM_RICH_VENDITORE], #[map-payload:CTRL_VENDITORE], #[map-payload:CTRL_CANALE_VENDITORE], #[map-payload:CTRL_CODICE_SERVIZIO], #[map-payload:CTRL_STATO_AVANZ_SERVIZIO], #[map-payload:CTRL_DATA_INSERIMENTO])"/>
                </jdbc-ee:outbound-endpoint>
            </when>
        </choice>   
    </flow>

请,我不知道如何解决这个问题。
预先感谢任何帮助

Please, I do not know how to fix this problem. Thanks in advance for any kind of help

推荐答案

如Steve S所说, csv- to-maps-transformer 可能会尝试在处理之前将整个文件加载到内存。你可以尝试做的是拆分csv文件在较小的部分,并发送这些部分到 VM 单独处理。
首先,创建一个组件来实现这个第一步:

As SteveS said, the csv-to-maps-transformer might try to load the entire file to memory before process it. What you can try to do is split the csv file in smaller parts and send those parts to VM to be processed individually. First, create a component to achieve this first step:

public class CSVReader implements Callable{
    @Override
    public Object onCall(MuleEventContext eventContext) throws Exception {

        InputStream fileStream = (InputStream) eventContext.getMessage().getPayload();
        DataInputStream ds = new DataInputStream(fileStream);
        BufferedReader br = new BufferedReader(new InputStreamReader(ds));

        MuleClient muleClient = eventContext.getMuleContext().getClient();

        String line;
        while ((line = br.readLine()) != null) {
            muleClient.dispatch("vm://in", line, null);
        }

        fileStream.close();
        return null;
    }
}

然后,将主流分成两个

<file:connector name="File" 
    workDirectory="yourWorkDirPath" autoDelete="false" streaming="true"/>

<flow name="CsvToFile" doc:name="Split and dispatch">
    <file:inbound-endpoint path="inboxPath"
        moveToDirectory="processedPath" pollingFrequency="60000"
        doc:name="CSV" connector-ref="File">
        <file:filename-wildcard-filter pattern="*.csv"
            caseSensitive="true" />
    </file:inbound-endpoint>
    <component class="it.aizoon.grpBuyer.AddMessageProperty" doc:name="Add Message Property" />
    <component class="com.dgonza.CSVReader" doc:name="Split the file and dispatch every line to VM" />
</flow>

<flow name="storeInDatabase" doc:name="receive lines and store in database">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="in" doc:name="VM" />
    <Choice>
        .
        .
        Your JDBC Stuff
        .
        .
    <Choice />
</flow>

将当前的 file-connector 启用流式传输。使用此解决方案,可以处理csv数据,而无需首先将整个文件加载到内存。
HTH

Maintain your current file-connector configuration to enable streaming. With this solution the csv data can be processed without the need to load the entire file to memory first. HTH

这篇关于如何在Mule中读取巨大的CSV文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆