使用JAVA中的多线程(生产者消费者模型)读取和写入文件 [英] Read and Write to files using Multi-threading (producer consumer model) in JAVA

查看:132
本文介绍了使用JAVA中的多线程(生产者消费者模型)读取和写入文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我被困在这里,有人可以解释为什么消费者线程在下面的代码中运行先前的生产者线程。当生产者没有放置任何内容时,消费者线程如何运行。程序错了吗?

I am stuck here, can someone explain why the consumer thread is running prior producer thread in the below code. How can consumer thread run when the producer has not put any content. Is the program wrong?

达成: -
运行为从给定文件夹中提取的每个文件生成消费者线程。

Achieve:- run produce consumer thread for each file that is picked up from the given folder.

例如,如果指定的文件夹有3个,则必须启动每个文件2个线程(生产者/消费者),这使得线程数为6。

For instance if the specified folder has 3 then 2 thread (producer/consumer) per file must be initiated, which makes the thread count 6.

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;

class sharedInt {
    private int syncUponInt;
    private boolean available = false;
    private File processingFile;
    private static File[] listOfFile;

    sharedInt(File[] totalList) {
        listOfFile = totalList;
    }

    public int getTotalCount() {
        return listOfFile.length;
    }

    public static File[] getListOfFile() {
        return listOfFile;
    }

    public static void setListOfFile(File[] listOfFile) {
        sharedInt.listOfFile = listOfFile;
    }

    public File getProcessingFile() {
        return processingFile;
    }

    public void setProcessingFile(File processingFile) {
        this.processingFile = processingFile;
    }

    public synchronized int getContents() {
        while (available == false) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        available = false;
        notify();
        return syncUponInt;
    }

    public synchronized void setContents(int value) {
        while (available == true) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        syncUponInt = value;
        available = true;
        notify();
    }
}

class Producer1 extends Thread {
    private sharedInt cubbyhole;
    private int number;

    public Producer1(sharedInt c, int number) {
        cubbyhole = c;
        this.number = number;
    }

    public void run() {
        // for (int i = 0; i < cubbyhole.getTotalCount(); i++) {
        cubbyhole.setContents(this.number);
        Vector vectorList = new Vector();
        System.out.println("Producer <current thread>" + this.currentThread() + "put: " + this.number
                + "processing file is" + cubbyhole.getProcessingFile());
        RandomAccessFile raf = null;

        try {
            raf = new RandomAccessFile(cubbyhole.getProcessingFile(), "r");
            StringBuffer sb = new StringBuffer();
            String line = null;
            while ((line = raf.readLine()) != null) {
                sb.append(line);
            }
            vectorList.add(sb.toString());
            System.out.println(sb.toString());
        } catch (FileNotFoundException e) {
        } catch (IOException e) {
        }

        // }
    }
}

class Consumer1 extends Thread {
    private sharedInt cubbyhole;

    public Consumer1(sharedInt c) {
        cubbyhole = c;
    }

    public void run() {
        int value = 0;
        // for (int i = 0; i < cubbyhole.getTotalCount(); i++) {

        System.out.println("Consumer <current thread>" + this.currentThread() + "got: " + cubbyhole.getContents()
                + "processing file is" + cubbyhole.getProcessingFile());
    }
}

public class FileManagementApp {
    public static void main(String[] args) throws InterruptedException {

        System.out.println("1. Please enter the path of the <Directory/Folder>...");
        // Scanner scn = new Scanner(System.in);
        // String folderPath = scn.nextLine();
        File folder = new File("C:\\file\\output");
        File[] fileList = folder.listFiles();
        int countOfFiles = fileList.length;

        sharedInt c = new sharedInt(fileList);
        Producer1 p1 = null;
        List<Producer1> pList = new ArrayList<Producer1>();
        Consumer1 c1 = null;
        List<Consumer1> cList = new ArrayList<Consumer1>();
        for (int i = 0; i < countOfFiles; i++) {
            c = new sharedInt(fileList);
            c.setProcessingFile(fileList[i]);

            p1 = new Producer1(c, i);
            p1.setName("Producer--" + i);
            pList.add(p1);
            c1 = new Consumer1(c);
            c1.setName("Consumer--" + i);
            cList.add(c1);
            pList.get(i).start();
            cList.get(i).start();
        }

    }
}

输出: -

1. Please enter the path of the <Directory/Folder>...
Consumer <current thread>Thread[Consumer--0,5,main]got: 0processing file isC:\file\output\0.A.txt
Producer <current thread>Thread[Producer--0,5,main]put: 0processing file isC:\file\output\0.A.txt
Producer <current thread>Thread[Producer--1,5,main]put: 1processing file isC:\file\output\1.A.txt
Producer <current thread>Thread[Producer--2,5,main]put: 2processing file isC:\file\output\2.A.txt
Consumer <current thread>Thread[Consumer--1,5,main]got: 1processing file isC:\file\output\1.A.txt
fg
abc
Consumer <current thread>Thread[Consumer--2,5,main]got: 2processing file isC:\file\output\2.A.txt
de

编辑: -

将代码修改为类似的东西,并且能够实现并发/多线程使用生产者消费者模型同时读取和写入文件。

Modified the code to something like this and was able to achieve the concurrency /multi threading to read and write files simultaneously using producer consumer model.

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;

class SharedInteger {
    private boolean available = false;
    public File processingFile;
    public long totalNoOfSplits;
    public Vector<Byte> vectorBytes;
    private File[] listOfFiles;

    SharedInteger(File[] totalList) {
        listOfFiles = totalList;
    }

    public synchronized Vector<Byte> get() {
        while (available == false) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        available = false;
        notify();
        return vectorBytes;
    }

    public synchronized void put(Vector<Byte> value) {
        while (available == true) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        vectorBytes = value;
        available = true;
        notify();
    }
}

class Producer extends Thread {
    private SharedInteger sharedInteger;

    public Producer(SharedInteger c) {
        sharedInteger = c;
    }

    public void run() {
        FileInputStream fis = null;
        Vector<Byte> vectorBytes = new Vector<Byte>();
        try {
            fis = new FileInputStream(sharedInteger.processingFile);
            while (fis.available() != 0) {
                vectorBytes.add((byte) fis.read());
            }
            sharedInteger.put(vectorBytes);
        } catch (Exception e) {

        }
    }
}

class Consumer extends Thread {
    private SharedInteger sharedInteger;
    private FileOutputStream fos;

    public Consumer(SharedInteger c) {
        sharedInteger = c;
    }

    public void run() {
        File newFile = sharedInteger.processingFile;
        try {
            fos = new FileOutputStream(newFile.getParentFile()+"1\\"+newFile.getName());
        } catch (FileNotFoundException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        Vector<Byte> v = sharedInteger.get();
        try {
            if (null != v) {
                writeToAFile(v);
            }
        } catch (IOException e) { // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void writeToAFile(Vector<Byte> contents) throws IOException {
        for (int i = 0; i < contents.size(); i++) {
            System.out.println(Thread.currentThread());
            fos.write(contents.get(i));
            fos.flush();
        }

    }
}

public class ProducerConsumerTest {
    public static void main(String[] args) throws FileNotFoundException {
        File folder = new File("C:\\file\\output");
        File[] fileList = folder.listFiles();
        int countOfFiles = fileList.length;
        SharedInteger c = new SharedInteger(fileList);
        List<Producer> pList = new ArrayList<Producer>();
        List<Consumer> cList = new ArrayList<Consumer>();
        Producer p1 = null;
        Consumer c1 = null;

        for (int i = 0; i < countOfFiles; i++) {
            c = new SharedInteger(fileList);
            c.processingFile = fileList[i];

            p1 = new Producer(c);
            p1.setName("Producer--" + i);
            pList.add(p1);
            pList.get(i).start();

            c1 = new Consumer(c);
            c1.setName("Consumer--" + i);
            cList.add(c1);
            cList.get(i).start();
        }

    }
}


推荐答案

嗯,有一些东西很可疑。但是,请查看 run()方法:

Well, there're a few things which are quite fishy. However, have a look at your run() methods:

// Producer1
public void run() {
    cubbyhole.setContents(this.number);
    Vector vectorList = new Vector();
    System.out.println("Producer <current thread>" + this.currentThread()
            + "put: " + this.number
            + "processing file is" + cubbyhole.getProcessingFile());
    RandomAccessFile raf = null;

    try {
        // ...
    }

// Consumer1
public void run() {
    int value = 0;

    System.out.println("Consumer <current thread>" + this.currentThread()
            + "got: " + cubbyhole.getContents()
            + "processing file is" + cubbyhole.getProcessingFile());
}

生产者一旦调用 setContents(int) (因此, notify()),您的消费者可以继续。仅仅因为你首先看到消费者的控制台输出并不意味着什么。打印是在没有同步的情况下同时进行的,因此您不能依赖执行顺序。

As soon as your producer invokes setContents(int) (and, therefore, notify() as well), your consumer is free to continue. Just because you see the console output from your consumer first doesn't mean something. Printing is done concurrently without synchronization, so you can't rely on the order of execution.

编辑:根据您的要求使用 Vector wait() notifiy(),以及两个主题每个文件,但请记住,有更好的方法来实现这一点(见评论):

according to your requirements using Vector, wait(), notifiy(), and two threads per file, but please bear in mind that there're far superior ways to implement this (see comments):

public class FileMerger {

    private volatile int currentWriterId = 0;

    public static void main(String[] args) throws Exception {
        // 1st argument: target directory.
        File directory = new File(args[0]);
        // 2nd argument: merge files suffix.
        FilenameFilter filter = (dir, name) -> name.endsWith("." + args[1]);

        new FileMerger().merge(directory, filter);
    }

    public void merge(File directory, FilenameFilter filter) throws IOException, InterruptedException {
        File[] files = directory.listFiles(filter);
        int numberOfFiles = files.length;
        Path mergeFilePath = Paths.get(directory.getPath() + FileSystems.getDefault().getSeparator() + "merge.txt");
        Vector<String> fileContents = new Vector<>(Collections.nCopies(numberOfFiles, null));

        Files.createFile(mergeFilePath);

        for (int i = 0; i < numberOfFiles; i++) {
            final int writerId = i;
            File file = files[i];
            CountDownLatch readWriteLatch = new CountDownLatch(1);

            // Reader.
            new Thread(() -> {
                try {
                    List<String> lines = Files.readAllLines(Paths.get(file.getPath()));
                    String content = String.join("\n", lines);

                    fileContents.set(writerId, content);
                    readWriteLatch.countDown();
                } catch (IOException e) { /* NOP */ }
            }).start();

            // Writer.
            new Thread(() -> {
                try {
                    // Wait for corresponding reader to set content.
                    readWriteLatch.await();

                    // Wait for writer ID.
                    synchronized (this) {
                        while (currentWriterId != writerId) {
                            wait();
                        }
                        Files.write(mergeFilePath, (fileContents.get(writerId) + "\n").getBytes(), StandardOpenOption.APPEND);
                        currentWriterId++;
                        notifyAll();
                    }
                } catch (InterruptedException | IOException e) { /* NOP */ }
            }).start();
        }
    }

}

这篇关于使用JAVA中的多线程(生产者消费者模型)读取和写入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆