使用lucene改进多线程索引 [英] Improve multi-thread indexing with lucene

查看:215
本文介绍了使用lucene改进多线程索引的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用多个线程在Lucene中构建索引。所以,我开始编写代码并编写了以下代码。首先,我找到文件,并为每个文件,我创建一个索引它的线程。之后,我加入线程并优化索引。它有效,但我不确定......我可以大规模信任它吗?有没有办法改进它?

I am trying to build my indexes in Lucene with multiple threads. So, I started my coding and wrote the following code. First I find the files and for each file, I create a thread to index it. After that I join the threads and optimize the indexes. It works but I'm not sure... can I trust it in large scale? Is there any way to improve it?

import java.io.File;
import java.io.FileFilter;
import java.io.FileReader;
import java.io.IOException;
import java.io.File;
import java.io.FileReader;
import java.io.BufferedReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.analysis.StopAnalyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.lucene.index.TermFreqVector;

public class mIndexer extends Thread {

    private File ifile;
    private static IndexWriter writer;

    public mIndexer(File f) {
    ifile = f.getAbsoluteFile();
    }

    public static void main(String args[]) throws Exception {
    System.out.println("here...");

    String indexDir;
        String dataDir;
    if (args.length != 2) {
        dataDir = new String("/home/omid/Ranking/docs/");
        indexDir = new String("/home/omid/Ranking/indexes/");
    }
    else {
        dataDir = args[0];
        indexDir = args[1];
    }

    long start = System.currentTimeMillis();

    Directory dir = FSDirectory.open(new File(indexDir));
    writer = new IndexWriter(dir,
    new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")),
    true,
    IndexWriter.MaxFieldLength.UNLIMITED);
    int numIndexed = 0;
    try {
        numIndexed = index(dataDir, new TextFilesFilter());
    } finally {
        long end = System.currentTimeMillis();
        System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds");
        writer.optimize();
        System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds");
        writer.close();
    }
    System.out.println("Enjoy your day/night");
    }

    public static int index(String dataDir, FileFilter filter) throws Exception {
    File[] dires = new File(dataDir).listFiles();
    for (File d: dires) {
        if (d.isDirectory()) {
        File[] files = new File(d.getAbsolutePath()).listFiles();
        for (File f: files) {
            if (!f.isDirectory() &&
            !f.isHidden() &&
            f.exists() &&
            f.canRead() &&
            (filter == null || filter.accept(f))) {
                Thread t = new mIndexer(f);
                t.start();
                t.join();
            }
        }
        }
    }
    return writer.numDocs();
    }

    private static class TextFilesFilter implements FileFilter {
    public boolean accept(File path) {
        return path.getName().toLowerCase().endsWith(".txt");
    }
    }

    protected Document getDocument() throws Exception {
    Document doc = new Document();
    if (ifile.exists()) {
        doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES));
        doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED));
        String cat = "WIR";
        cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1);
        cat = cat.substring(cat.lastIndexOf('/')+1, cat.length());
        //doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES));
        //System.out.println(cat.subSequence(0, cat.length()));
    }
    return doc;
    }

    public void run() {
    try {
        System.out.println("Indexing " + ifile.getAbsolutePath());
        Document doc = getDocument();
        writer.addDocument(doc);
    } catch (Exception e) {
        System.out.println(e.toString());
    }

    }
}

任何肝脏被认为。

推荐答案

如果你想并行化索引,你可以做两件事:

If you want to parallelize indexing, there are two things you can do:


  • 并行调用addDocument,

  • 增加合并调度程序的最大线程数。

您正在使用并行化对addDocuments的调用的正确途径,但是每个文档生成一个线程将无法扩展,因为您需要索引的文档数量将增加。您应该使用固定大小的 ThreadPoolExecutor 。由于此任务主要是CPU密集型(取决于您的分析器和检索数据的方式),因此将计算机的CPU数量设置为最大线程数可能是一个良好的开端。

You are on the right path to parallelize calls to addDocuments, but spawning one thread per document will not scale as the number of documents you need to index will grow. You should rather use a fixed-size ThreadPoolExecutor. Since this task is mainly CPU-intensive (depending on your analyzer and the way you retrieve your data), setting the number of CPUs of your computer as the maximum number of threads might be a good start.

关于合并调度程序,您可以增加可以与 ConcurrentMergeScheduler的setMaxThreadCount方法。请注意,磁盘在顺序读取/写入方面比随机读取/写入要好得多,因此为合并调度程序设置过高的最大线程数可能会降低索引速度,从而加快速度。

Regarding the merge scheduler, you can increase the maximum number of threads which can be used with the setMaxThreadCount method of ConcurrentMergeScheduler. Beware that disks are much better at sequential reads/writes than random read/writes, as a consequence setting a too high maximum number of threads to your merge scheduler is more likely to slow indexing down than to speed it up.

但在尝试并行化索引过程之前,您应该尝试找出瓶颈所在。如果您的磁盘太慢,则瓶颈可能是刷新和合并步骤,因此并行调用addDocument(主要包括分析文档并在内存中缓冲分析结果)将无法提高索引速度总的来说。

But before trying to parallelizing your indexing process, you should probably try to find where the bottleneck is. If your disk is too slow, the bottleneck is likely to be the flush and the merge steps, as a consequence parallelizing calls to addDocument (which essentially consists in analyzing a document and buffering the result of the analysis in memory) will not improve indexing speed at all.

一些附注:


  • 有一些在Lucene的开发版本中正在进行的工作是为了改进索引并行性(特别是冲洗部分,这个博客文章解释了它是如何工作的。)

  • There is some ongoing work in the development version of Lucene in order to improve indexing parallelism (the flushing part especially, this blog entry explains how it works).

Lucene在如何提高索引速度,您将找到其他方法来提高索引速度。

Lucene has a nice wiki page on how to improve indexing speed where you will find other ways to improve indexing speed.

这篇关于使用lucene改进多线程索引的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆