如何将小的 ORC 文件合并或合并为较大的 ORC 文件? [英] How do I Combine or Merge Small ORC files into Larger ORC file?

查看:77
本文介绍了如何将小的 ORC 文件合并或合并为较大的 ORC 文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

SO 和网络上的大多数问题/答案讨论使用 Hive 将一堆小的 ORC 文件组合成一个更大的文件,但是,我的 ORC 文件是按天分隔的日志文件,我需要将它们分开.我只想每天汇总"ORC 文件(它们是 HDFS 中的目录).

Most questions/answers on SO and the web discuss using Hive to combine a bunch of small ORC files into a larger one, however, my ORC files are log files which are separated by day and I need to keep them separate. I only want to "roll-up" the ORC files per day (which are directories in HDFS).

我最有可能需要用 Java 编写解决方案并且遇到了 OrcFileMergeOperator 这可能是我需要使用的,但现在说还为时过早.

I need to write the solution in Java most likely and have come across OrcFileMergeOperator which may be what I need to use, but still too early to tell.

解决此问题的最佳方法是什么?

What is the best approach to solving this issue?

推荐答案

这里有很好的答案,但没有一个允许我运行 cron 作业,以便我可以进行每日汇总.我们每天都有日志文件写入 HDFS,我不想每天在 Hive 中运行查询.

There are good answers here, but none of those allow me to run a cron job so that I can do daily roll-ups. We have journald log files writing daily to HDFS and I do not want to run a query in Hive every day when I come in.

我最终做的事情对我来说似乎更简单.我编写了一个 Java 程序,它使用 ORC 库扫描目录中的所有文件并创建这些文件的列表.然后打开一个新的 Writer,它是组合"文件(以."开头,因此它对 Hive 隐藏,否则 Hive 将失败).然后程序打开列表中的每个文件并读取内容并写出到组合文件中.当所有文件都被读取后,它会删除这些文件.我还添加了一次运行一个目录的功能,以备不时之需.

What I ended up doing seemed more straightforward to me. I wrote a Java program that uses the ORC libraries to scans all files in a directory and creates an List of those files. Then opens a new Writer which is the "combined" file (which starts with a "." so it is hidden from Hive or else Hive will fail). The program then opens each file in the List and reads the contents and writes out to the combined file. When all files have been read, it deletes the files. I also added capability to run one directory at at time in case that was needed.

注意:您将需要一个架构文件.Journald 日志可以在 json "journalctl -o json" 中输出,然后您可以使用 Apache ORC 工具生成模式文件,也可以手动生成.ORC 的自动发电机不错,但手动的总是更好.

NOTE: You will need a schema file. Journald logs can be output in json "journalctl -o json" and then you can use the Apache ORC tools to generate a schema file or you can do one by hand. The auto-gen from ORC is good but manual is always better.

注意:要按原样使用此代码,您需要一个有效的密钥表并在类路径中添加 -Dkeytab=.

NOTE: To use this code as-is, you will need to a valid keytab and add -Dkeytab= in your classpath.

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import com.cloudera.org.joda.time.LocalDate;

public class OrcFileRollUp {

  private final static String SCHEMA = "journald.schema";
  private final static String UTF_8 = "UTF-8";
  private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
  private static final String keytabLocation = System.getProperty("keytab");
  private static final String kerberosUser = "<userName>";
  private static Writer writer;

  public static void main(String[] args) throws IOException {

    Configuration conf = new Configuration();
    conf.set("hadoop.security.authentication", "Kerberos");

    InetAddress myHost = InetAddress.getLocalHost();
    String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
    UserGroupInformation.setConfiguration(conf);
    UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);

    int currentDay = LocalDate.now().getDayOfMonth();
    int currentMonth = LocalDate.now().getMonthOfYear();
    int currentYear = LocalDate.now().getYear();

    Path path = new Path(HDFS_BASE_LOGS_DIR);

    FileSystem fileSystem = path.getFileSystem(conf);
    System.out.println("The URI is: " + fileSystem.getUri());


    //Get Hosts:
    List<String> allHostsPath = getHosts(path, fileSystem);

    TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
        .replaceAll("
", ""));

    //Open each file for reading and write contents
    for(int i = 0; i < allHostsPath.size(); i++) {

      String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working";            //filename:  .2018_04_24.orc.working

      //Create list of files from directory and today's date OR pass a directory in via the command line in format 
      //hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
      String directory = "";
      Path outFilePath;
      Path argsPath;
      List<String> orcFiles;

      if(args.length == 0) {
        directory = currentYear + "/" + currentMonth + "/" + currentDay;
        outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
        try {
          orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
        } catch (Exception e) {
          continue;
        }
      } else {
        outFilePath = new Path(args[0] + "/" + outFile);
        argsPath = new Path(args[0]);
        try {
          orcFiles = getAllFilePath(argsPath, fileSystem);
        } catch (Exception e) {
          continue;
        }
      }

      //Create List of files in the directory

      FileSystem fs = outFilePath.getFileSystem(conf);

      //Writer MUST be below ^^ or the combination file will be deleted as well.
      if(fs.exists(outFilePath)) {
        System.out.println(outFilePath + " exists, delete before continuing.");
      } else {
       writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
            .setSchema(schema));
      }

      for(int j = 0; j < orcFiles.size(); j++ ) { 
        Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));

        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
        RecordReader rows = reader.rows();

        while (rows.nextBatch(batch)) {
          if (batch != null) {
             writer.addRowBatch(batch);
          }
        }
        rows.close();
        fs.delete(new Path(orcFiles.get(j)), false);
      }
      //Close File
      writer.close();

      //Remove leading "." from ORC file to make visible to Hive
      outFile = fileSystem.getFileStatus(outFilePath)
                                      .getPath()
                                      .getName();

      if (outFile.startsWith(".")) {
        outFile = outFile.substring(1);

        int lastIndexOf = outFile.lastIndexOf(".working");
        outFile = outFile.substring(0, lastIndexOf);
      }

      Path parent = outFilePath.getParent();

      fileSystem.rename(outFilePath, new Path(parent, outFile));

      if(args.length != 0)
        break;
    }
  }

  private static String getSchema(String resource) throws IOException {
    try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {
      return IOUtils.toString(input, UTF_8);
    }
  }

  public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> hostsList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      hostsList.add(fileStat.getPath().toString());
    }
    return hostsList;
  }

  private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> fileList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      if (fileStat.isDirectory()) {
        fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
      } else {
        fileList.add(fileStat.getPath()
                             .toString());
      }
    }
    for(int i = 0; i< fileList.size(); i++) {
      if(!fileList.get(i).endsWith(".orc"))
        fileList.remove(i);
    }

    return fileList;
  }

}

这篇关于如何将小的 ORC 文件合并或合并为较大的 ORC 文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆