当高吞吐量(3GB/s)文件系统可用时,如何在 Java 中使用多线程读取文件 [英] How to read a file using multiple threads in Java when a high throughput(3GB/s) file system is available

查看:17
本文介绍了当高吞吐量(3GB/s)文件系统可用时,如何在 Java 中使用多线程读取文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道对于普通的主轴驱动系统,使用多线程读取文件效率低下.

I understand that for a normal Spindle Drive system, reading files using multiple threads is inefficient.

这是一个不同的案例,我有一个高吞吐量的文件系统可供我使用,它提供高达 3GB/s 的读取速度,具有 196 个 CPU 内核和 2TB RAM

单线程 Java 程序以最大 85-100 MB/s 的速度读取文件,因此我有可能比单线程更好.我必须读取大小为 1TB 的文件,而且我有足够的 RAM 来加载它.

A single threaded Java program reads the file with maximum 85-100 MB/s, so I have potential to get better than single thread. I have to read files as big as 1TB in size and I have enough RAM to load it.

目前我使用以下或类似的东西,但需要用多线程编写一些东西以获得更好的吞吐量:

Currently I use the following or something similar, but need to write something with multi-threading to get better throughput:

Java 7 文件:50 MB/s

Java 7 Files: 50 MB/s

List<String> lines = Files.readAllLines(Paths.get(path), encoding);

Java commons-io:48 MB/s

Java commons-io: 48 MB/s

List<String> lines = FileUtils.readLines(new File("/path/to/file.txt"), "utf-8");

与番石榴相同:45 MB/s

The same with guava: 45 MB/s

List<String> lines = Files.readLines(new File("/path/to/file.txt"), Charset.forName("utf-8"));

Java 扫描器类:非常慢

Java Scanner Class: Very Slow

Scanner s = new Scanner(new File("filepath"));
ArrayList<String> list = new ArrayList<String>();
while (s.hasNext()){
    list.add(s.next());
}
s.close();

我希望能够以正确的排序顺序尽快加载文件并构建相同的 ArrayList.

另一个问题,内容类似,但实际上是不同的,因为:问题是讨论多线程文件 I/O 在物理上不可能高效的系统,但由于技术进步,我们现在拥有旨在支持高吞吐量 I/O 的系统,因此限制因素是 CPU/SW ,这可以通过多线程 I/O 来克服.

There is another question that reads similar, but it is actually different, because of : The question is discussing about systems where multi-threaded file I/O is physically impossible to be efficient, but due to technological advancements, we now have systems that are designed to support high-throughput I/O , and so the limiting factor is CPU/SW , which can be overcome by multi-threading the I/O.

另一个问题没有回答如何编写多线程 I/O 的代码.

The other question does not answer how to write code to multi-thread I/O.

推荐答案

这里是多线程读取单个文件的解决方案.

将文件分成N个chunk,在一个线程中读取每个chunk,然后按顺序合并.当心跨越块边界的线.这是用户建议的基本思想slaks

对单个 20 GB 文件的多线程实现进行基准测试:

Bench-marking below implementation of multiple-threads for a single 20 GB file:

1 个线程:50 秒:400 MB/s

1 Thread : 50 seconds : 400 MB/s

2 个线程:30 秒:666 MB/s

2 Threads: 30 seconds : 666 MB/s

4 个线程:20 秒:1GB/s

8 个线程:60 秒:333 MB/s

8 Threads: 60 seconds : 333 MB/s

等效的 Java7 readAllLines() : 400 秒 : 50 MB/s

Equivalent Java7 readAllLines() : 400 seconds : 50 MB/s

注意:这可能仅适用于旨在支持高吞吐量 I/O 的系统,而不适用于普通的个人计算机

Note: This may only work on systems that are designed to support high-throughput I/O , and not on usual personal computers

package filereadtests;

import java.io.*;
import static java.lang.Math.toIntExact;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FileRead implements Runnable
{

private FileChannel _channel;
private long _startLocation;
private int _size;
int _sequence_number;

public FileRead(long loc, int size, FileChannel chnl, int sequence)
{
    _startLocation = loc;
    _size = size;
    _channel = chnl;
    _sequence_number = sequence;
}

@Override
public void run()
{
    try
    {
        System.out.println("Reading the channel: " + _startLocation + ":" + _size);

        //allocate memory
        ByteBuffer buff = ByteBuffer.allocate(_size);

        //Read file chunk to RAM
        _channel.read(buff, _startLocation);

        //chunk to String
        String string_chunk = new String(buff.array(), Charset.forName("UTF-8"));

        System.out.println("Done Reading the channel: " + _startLocation + ":" + _size);

    } catch (Exception e)
    {
        e.printStackTrace();
    }
}

//args[0] is path to read file
//args[1] is the size of thread pool; Need to try different values to fing sweet spot
public static void main(String[] args) throws Exception
{
    FileInputStream fileInputStream = new FileInputStream(args[0]);
    FileChannel channel = fileInputStream.getChannel();
    long remaining_size = channel.size(); //get the total number of bytes in the file
    long chunk_size = remaining_size / Integer.parseInt(args[1]); //file_size/threads

    //Max allocation size allowed is ~2GB
    if (chunk_size > (Integer.MAX_VALUE - 5))
    {
        chunk_size = (Integer.MAX_VALUE - 5);
    }

    //thread pool
    ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1]));

    long start_loc = 0;//file pointer
    int i = 0; //loop counter
    while (remaining_size >= chunk_size)
    {
        //launches a new thread
        executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i));
        remaining_size = remaining_size - chunk_size;
        start_loc = start_loc + chunk_size;
        i++;
    }

    //load the last remaining piece
    executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i));

    //Tear Down
    executor.shutdown();

    //Wait for all threads to finish
    while (!executor.isTerminated())
    {
        //wait for infinity time
    }
    System.out.println("Finished all threads");
    fileInputStream.close();
}

}

这篇关于当高吞吐量(3GB/s)文件系统可用时,如何在 Java 中使用多线程读取文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆