Hadoop Mapreduce - 访问本地文件系统 [英] Hadoop Mapreduce - Access Local File System

查看:190
本文介绍了Hadoop Mapreduce - 访问本地文件系统的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个要求,在Map Reduce代码中应该读取每个节点中的本地文件系统。该程序将在HDFS上运行,我不能在xml文件中更改hadoop的FileSystem属性进行配置。



我尝试了以下解决方案,但都没有给出结果。

方法1

  Configuration config = new Configuration() 
FileSystem localFileSystem = FileSystem.get(config);
localFileSystem.set(fs.defaultFS,file:///);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(/ user / input / localFile))));

方式2

  Configuration config = new Configuration(); 
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(/ user / input / localFile))));

方法3

  Configuration config = new Configuration(); 
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
localFileSystem.set(fs.defaultFS,file:///);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(/ user / input / localFile))));

方法4

  Configuration config = new Configuration(); 
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.getRaw()。open(new Path(/ user / input / localFile))));

这也没有用
[

>每个人都给出了错误:没有这样的文件存在



错误堆栈

  attempt_201406050021_0018_m_000000_2:java.io.FileNotFoundException:文件/ home / cloudera / sftp / id_rsa不存在
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:468)
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:380)
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:231)
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:183)
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.LocalFileSystem.copyFromLoc alFile(LocalFileSystem.java:81)
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1934)
attempt_201406050021_0018_m_000000_2:at com.skanda.ecomm.sftp.FTPMapper。 configure(FTPMapper.java:91)

我希望在这里得到一个积极的解决方案。主要类(Driver类)

 <$> 

c $ c> / *
* @ SFTPClient.java @ 2014年5月20日
*
*
* /
package com.skanda.ecomm.sftp;

import java.net.URI;

导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/ **
*
*< p>
* SFTPClient类
*< / p>
*
* @author skanda
* @version 1.0
*
* /
公共类SFTPClient扩展Configured implements工具{

public int run(String [] args)throws Exception {

Configuration config = getConf();

String inputPath = config.get(ApplicationConstants.INPUT_PATH);
String outputPath = config.get(ApplicationConstants.OUTPUT_PATH);
String configPath = config.get(ApplicationConstants.CONFIG_PATH);
int redurs = Integer.parseInt(config.get(ApplicationConstants.REDUCERS));
$ b $ if(outputPath == null || inputPath == null || configPath == null){
throw new Exception(Usage:\\\
+-D configPath =< ; configPath> -D inputPath =< inputPath> -D redurs =< redurs+
-D outputPath =< path>);
}

JobConf conf = new JobConf(SFTPClient.class);
conf.setJobName(SFTP注入客户端);

DistributedCache.addCacheFile(new URI(configPath),conf);

conf.setMapperClass(FTPMapper.class);
conf.setReducerClass(FTPReducer.class);

conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(Text.class);

conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(IntWritable.class);

//配置应该包含对你的namenode的引用
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path(outputPath),true); // true代表递归,删除你给的文件夹
$ b $ conf.setStrings(ApplicationConstants.INPUT_PATH,inputPath);
conf.setStrings(ApplicationConstants.OUTPUT_PATH,outputPath);

FileInputFormat.setInputPaths(conf,new Path(inputPath));
FileOutputFormat.setOutputPath(conf,new Path(outputPath));

conf.setNumReduceTasks(reducer);
conf.setInt(ApplicationConstants.NUNBER_OF_REDUCERS,redurs);

JobClient.runJob(conf);

返回0;


public static void main(String [] args)throws Exception {
int exitCode = ToolRunner.run(new SFTPClient(),args);
System.exit(exitCode);




$ Map









$ b

  / * 
* @ FTPMapper.java @ 2014年5月20日
*
*
* /
包com.skanda.ecomm.sftp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
导入java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import com.ftp.mapreduce.CommonUtility;
import com.ftp.mapreduce.RetrieveFileNames;
import com.jcraft.jsch.hm.Channel;

/ **
*
*< p>
* FTP映射器类
*< / p>
*
* @author skanda
* @version 1.0
*
* /

@SuppressWarnings(unused)
public class FTPMapper扩展MapReduceBase实现Mapper< LongWritable,Text,IntWritable,Text> {

私人URI [] localFiles;
private String userName;
private String hostName;
private String folderPath;
private int redurs;
private byte [] pvtKey;
private String fileName;
private String startDate;
private String endDate;
private String sshKeyPath;
私人字符串密码;
$ b $ public void configure(JobConf job){
Properties properties = new Properties();

尝试{
localFiles = DistributedCache.getCacheFiles(job);

if(localFiles!= null& localFiles.length == 1){
Configuration conf = new Configuration();



FileSystem fileSystem = FileSystem.get(localFiles [0],conf);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(localFiles [0]))));
properties.load(bufferRedaer);

userName = properties.getProperty(ApplicationConstants.USER_NAME);
redurs = job.getInt(ApplicationConstants.NUNBER_OF_REDUCERS,30);
hostName = properties.getProperty(ApplicationConstants.SFTP_SERVER_HOST);
folderPath = properties.getProperty(ApplicationConstants.HOSTFILE_DIRECTORY_PATH);
fileName = properties.getProperty(ApplicationConstants.FILE_NAME_PATTERN);
startDate = properties.getProperty(ApplicationConstants.FILE_START_DATE);
endDate = properties.getProperty(ApplicationConstants.FILE_END_DATE);
sshKeyPath = properties.getProperty(ApplicationConstants.SSH_KEY_PATH);
password = properties.getProperty(ApplicationConstants.PASSWORD);

System.out.println(----------------------------------- ---------------);
/ * FileSystem fs = FileSystem.getLocal(conf);
//路径inputPath = fs.makeQualified(new Path(sshKeyPath));
String inputPath = new Path(file:/// home / cloudera /+ sshKeyPath).toUri()。getPath();
fs.copyFromLocalFile(new Path(inputPath),new Path(outputSFTP / idFile)); * /
try {
Configuration conf1 = new Configuration();
Path pt = new Path(file:///home/cloudera/.ssh/id_rsa);
FileSystem fs = FileSystem.get(new URI(file:///home/cloudera/.ssh/id_rsa),conf);
LocalFileSystem localFileSystem = fs.getLocal(conf1);
BufferedReader bufferRedaer1 = new BufferedReader(new InputStreamReader(localFileSystem.open(pt)));

String str = null; ((str = bufferRedaer1.readLine())!= null)

{
System.out.println(-----------);
System.out.println(str);

catch(Exception e){
System.out.println(failed again);
String computername = InetAddress.getLocalHost()。getHostName();
System.out.println(computername);
e.printStackTrace();
}

System.out.println(------------------------------ --------------------);



配置config = new Configuration();
config.set(fs.defaultFS,file:////);
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(sshKeyPath))));

/ *配置config = new配置();
//config.set(\"fs.defaultFS,file:///home/cloudera/.ssh/id_rsa);
LocalFileSystem fileSystm = FileSystem.getLocal(config);
Path path = fileSystm.makeQualified(new Path(/ home / cloudera / .ssh / id_rsa)); * /

//FileInputFormat.setInputPaths(job,path);

// bufferRedaer = new BufferedReader(new InputStreamReader(fileSystem.open(path)));

String key =;

尝试{
String line =; ((line = bufferRedaer.readLine())!= null){
key + = line +\\\
;
while(
}
pvtKey = key.getBytes();
} catch(Exception e){
e.printStackTrace();
} finally {
//fileSystem.close();
//bufferRedaer.close();


$ b $ catch(IOException e){
e.printStackTrace();

$ b $公共无效映射(LongWritable键,文本值,OutputCollector< IntWritable,Text>输出,Reporter记者)
抛出IOException {

列表< String> filterFileNamesList = new ArrayList< String>();

频道频道= CommonUtility.connectSFTP(userName,hostName,pvtKey);

地图< String,String> fileNamesMap = CommonUtility.getFileNames(channel,folderPath);

列表< String> filterFileNameList_output = RetrieveFileNames.FILTER_BY_NAME.retrieveFileNames(fileNamesMap,filterFileNamesList,
fileName,startDate,endDate); (int i = 0; i< filterFileNameList_output.size(); i ++){
int keyGroup = i%reducer;


output.collect(new IntWritable(keyGroup),new Text(filterFileNameList_output.get(i)));




$ div class =h2_lin>解决方案

当程序运行在hdfs上,并且我的txt文件在这个位置时,这段代码正在为我工​​作:
$ b / home / Rishi / Documents / RishiFile /r.txt

  public class HadoopRead {

public static void main(String [] args){
try {
Configuration conf = new Configuration();
Path pt = new Path(/ home / Rishi / Documents / RishiFile / r.txt);
FileSystem fs = FileSystem.get(new URI(/ home / Rishi / Documents / RishiFile),conf);
LocalFileSystem localFileSystem = fs.getLocal(conf);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(pt)));

String str = null; ((str = bufferRedaer.readLine())!= null)

{
System.out.println(-----------);
System.out.println(str);
}
} catch(Exception e){
e.printStackTrace();





$ p $ file on hdfs



我的主类

  import org.apache。 hadoop.conf.Configuration; 
导入org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class FileDriver extends Configured implements Tool {

public static void main(String [] args){
try {
ToolRunner .run(new Configuration(),new FileDriver(),args);
System.exit(0);
} catch(Exception e){
e.printStackTrace();


$ b public int run(String [] arg0)throws Exception {

Configuration conf = new Configuration();
Path pt = new Path(file:/// home / winoria / Documents / Ri / r);

工作职位=新职位(conf,新职位);
job.setJarByClass(FileDriver.class);

job.setMapperClass(FileMapper.class);
job.setReducerClass(FileReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job,pt);
FileSystem.get(job.getConfiguration())。delete(new Path(Output2),true);
FileOutputFormat.setOutputPath(job,new Path(Output2));
job.waitForCompletion(true);
返回0;




$ p

映射类:

  public class FileMapper扩展了Mapper< LongWritable,Text,Text,Text> {

protected void map(LongWritable key,Text value,Context context)引发java.io.IOException,InterruptedException {

String str [] = value.toString()。分裂( ); (new Text(str [i]),new Text());
for(int i = 0; i< str.length; i ++){
context.write
}
};

Reducer类:

  public class FileReducer extends Reducer< Text,Text,Text,Text> {
$ b protected void reduce(Text key,Iterable< Text> value,Context context)throws java.io.IOException,InterruptedException {

int count = 0;
for(Text text:value){
count ++;
}
context.write(key,new Text(count +));
};
}


I have a requirement where in the Map Reduce code should read the local file system in each node. The program will be running on HDFS and I cannot change the FileSystem property for hadoop in xml files for configuration.

I have tried the following solutions, but none gave me results.

Approach 1

Configuration config = new Configuration(); 
FileSystem localFileSystem = FileSystem.get(config);
localFileSystem.set("fs.defaultFS", "file:///");
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path("/user/input/localFile"))));

Approach 2

Configuration config = new Configuration();
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path("/user/input/localFile"))));

Approach 3

Configuration config = new Configuration(); 
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
localFileSystem.set("fs.defaultFS", "file:///");
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path("/user/input/localFile"))));

Approach 4

Configuration config = new Configuration(); 
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.getRaw().open(new Path("/user/input/localFile"))));

This did not work either [Reading HDFS and local files in Java

Each of them gave the error: No such file exists

Error Stack

attempt_201406050021_0018_m_000000_2: java.io.FileNotFoundException: File /home/cloudera/sftp/id_rsa does not exist
attempt_201406050021_0018_m_000000_2:   at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:468)
attempt_201406050021_0018_m_000000_2:   at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:380)
attempt_201406050021_0018_m_000000_2:   at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:231)
attempt_201406050021_0018_m_000000_2:   at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:183)
attempt_201406050021_0018_m_000000_2:   at org.apache.hadoop.fs.LocalFileSystem.copyFromLocalFile(LocalFileSystem.java:81)
attempt_201406050021_0018_m_000000_2:   at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1934)
attempt_201406050021_0018_m_000000_2:   at com.skanda.ecomm.sftp.FTPMapper.configure(FTPMapper.java:91)

I am hoping to get a positive solution here. Let me know where I am going wrong.

Main class (Driver class)

/*
 * @SFTPClient.java @May 20, 2014
 *
 * 
 */
package com.skanda.ecomm.sftp;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 
 * <p>
 * SFTPClient Class
 * </p>
 *
 * @author skanda
 * @version 1.0
 *
 */
public class SFTPClient extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        Configuration config = getConf();

        String inputPath = config.get(ApplicationConstants.INPUT_PATH);
        String outputPath = config.get(ApplicationConstants.OUTPUT_PATH);
        String configPath = config.get(ApplicationConstants.CONFIG_PATH);
        int reducers = Integer.parseInt(config.get(ApplicationConstants.REDUCERS));

        if(outputPath == null || inputPath == null || configPath == null) {
            throw new Exception("Usage: \n" + "-D configPath=<configPath> -D inputPath=<inputPath> -D reducers=<reducers" +  
                    "-D outputPath=<path>");
        }

        JobConf conf = new JobConf(SFTPClient.class);
        conf.setJobName("SFTP Injection client");

        DistributedCache.addCacheFile(new URI(configPath),conf); 

        conf.setMapperClass(FTPMapper.class);
        conf.setReducerClass(FTPReducer.class);

        conf.setMapOutputKeyClass(IntWritable.class);
        conf.setMapOutputValueClass(Text.class);

        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputValueClass(IntWritable.class);

        // configuration should contain reference to your namenode
        FileSystem fs = FileSystem.get(new Configuration());
        fs.delete(new Path(outputPath), true); // true stands for recursively, deleting the folder you gave

        conf.setStrings(ApplicationConstants.INPUT_PATH, inputPath);
        conf.setStrings(ApplicationConstants.OUTPUT_PATH, outputPath);

        FileInputFormat.setInputPaths(conf, new Path(inputPath));       
        FileOutputFormat.setOutputPath(conf, new Path(outputPath));     

        conf.setNumReduceTasks(reducers);
        conf.setInt(ApplicationConstants.NUNBER_OF_REDUCERS, reducers);

        JobClient.runJob(conf);

        return 0;
    }        

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SFTPClient(), args);
        System.exit(exitCode);
    }
}

Mapper

/*
 * @FTPMapper.java  @May 20, 2014
 *
 * 
 */
package com.skanda.ecomm.sftp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

import com.ftp.mapreduce.CommonUtility;
import com.ftp.mapreduce.RetrieveFileNames;
import com.jcraft.jsch.hm.Channel;

/**
 * 
 * <p>
 * FTP Mapper Class
 * </p>
 *
 * @author skanda
 * @version 1.0
 *
 */

@SuppressWarnings("unused")
public class FTPMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {

    private URI[] localFiles;       
    private String userName;
    private String hostName;
    private String folderPath;
    private int reducers;
    private byte[] pvtKey;
    private String fileName;
    private String startDate;
    private String endDate;
    private String sshKeyPath;
    private String password;

    public void configure(JobConf job) {
        Properties properties = new Properties();

        try {
            localFiles = DistributedCache.getCacheFiles(job);

            if (localFiles != null && localFiles.length == 1) {
                Configuration conf = new Configuration();



                FileSystem fileSystem = FileSystem.get(localFiles[0], conf);
                BufferedReader bufferRedaer=new BufferedReader(new InputStreamReader(fileSystem.open(new Path(localFiles[0]))));
                properties.load(bufferRedaer);

                userName = properties.getProperty(ApplicationConstants.USER_NAME);
                reducers = job.getInt(ApplicationConstants.NUNBER_OF_REDUCERS, 30);
                hostName = properties.getProperty(ApplicationConstants.SFTP_SERVER_HOST);
                folderPath = properties.getProperty(ApplicationConstants.HOSTFILE_DIRECTORY_PATH);
                fileName = properties.getProperty(ApplicationConstants.FILE_NAME_PATTERN);
                startDate = properties.getProperty(ApplicationConstants.FILE_START_DATE);
                endDate = properties.getProperty(ApplicationConstants.FILE_END_DATE);
                sshKeyPath = properties.getProperty(ApplicationConstants.SSH_KEY_PATH);
                password = properties.getProperty(ApplicationConstants.PASSWORD);

                System.out.println("--------------------------------------------------");
                /*FileSystem fs = FileSystem.getLocal(conf);
                //Path inputPath = fs.makeQualified(new Path(sshKeyPath));
                String inputPath = new Path("file:///home/cloudera/"+sshKeyPath).toUri().getPath();
                fs.copyFromLocalFile(new Path(inputPath), new Path("outputSFTP/idFile") );*/
                try{
                    Configuration conf1 = new Configuration();
                    Path pt = new Path("file:///home/cloudera/.ssh/id_rsa");
                    FileSystem fs = FileSystem.get( new URI("file:///home/cloudera/.ssh/id_rsa"), conf);
                    LocalFileSystem localFileSystem = fs.getLocal(conf1);
                    BufferedReader bufferRedaer1 = new BufferedReader(new InputStreamReader(localFileSystem.open(pt)));

                    String str = null;
                    while ((str = bufferRedaer1.readLine())!= null)
                    {
                        System.out.println("-----------");
                        System.out.println(str);
                    }
                }catch(Exception e){
                    System.out.println("failed again");
                    String computername=InetAddress.getLocalHost().getHostName();
                    System.out.println(computername);
                    e.printStackTrace();
                }

                System.out.println("--------------------------------------------------");



                Configuration config = new Configuration(); 
                config.set("fs.defaultFS", "file:////");
                LocalFileSystem localFileSystem = FileSystem.getLocal(config);
                bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(sshKeyPath))));

                /*Configuration config = new Configuration();
                //config.set("fs.defaultFS", "file:///home/cloudera/.ssh/id_rsa");
                LocalFileSystem fileSystm = FileSystem.getLocal(config);
                Path path = fileSystm.makeQualified(new Path("/home/cloudera/.ssh/id_rsa"));*/

                //FileInputFormat.setInputPaths(job, path);

                //bufferRedaer = new BufferedReader(new InputStreamReader(fileSystem.open(path)));

                String key = "";

                try {
                      String line = "";
                      while ((line = bufferRedaer.readLine()) != null) {
                          key += line + "\n";   
                        }
                      pvtKey = key.getBytes();
                } catch(Exception e){
                    e.printStackTrace();
                } finally {
                    //fileSystem.close();
                    //bufferRedaer.close();
                }       
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter)
        throws IOException {

        List<String> filterFileNamesList = new ArrayList<String>();

        Channel channel = CommonUtility.connectSFTP(userName, hostName, pvtKey);

        Map<String, String> fileNamesMap = CommonUtility.getFileNames(channel, folderPath);

        List<String> filterFileNameList_output = RetrieveFileNames.FILTER_BY_NAME.retrieveFileNames(fileNamesMap, filterFileNamesList, 
                                                            fileName, startDate, endDate);

        for (int i = 0; i < filterFileNameList_output.size(); i++) {
            int keyGroup = i % reducers;
            output.collect(new IntWritable(keyGroup), new Text(filterFileNameList_output.get(i)));
        }
    }
}

解决方案

This code is working for me when program runs on hdfs and my txt file is in this location:

/home/Rishi/Documents/RishiFile/r.txt

public class HadoopRead {

    public static void main(String[] args) {
        try{
            Configuration conf = new Configuration();
            Path pt = new Path("/home/Rishi/Documents/RishiFile/r.txt");
            FileSystem fs = FileSystem.get( new URI("/home/Rishi/Documents/RishiFile"), conf);
            LocalFileSystem localFileSystem = fs.getLocal(conf);
            BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(pt)));

            String str = null;
            while ((str = bufferRedaer.readLine())!= null)
            {
                System.out.println("-----------");
                System.out.println(str);
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

Word Count Example for reading local file on hdfs

my main class

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class FileDriver extends Configured implements Tool {

    public static void main(String[] args) {
        try{
            ToolRunner.run(new Configuration(), new FileDriver(), args);
            System.exit(0);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    public int run(String[] arg0) throws Exception {

        Configuration conf = new Configuration();
        Path pt = new Path("file:///home/winoria/Documents/Ri/r");

        Job job = new Job(conf, "new Job");
        job.setJarByClass(FileDriver.class);

        job.setMapperClass(FileMapper.class);
        job.setReducerClass(FileReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, pt);
        FileSystem.get(job.getConfiguration()).delete(new Path("Output2"), true);
        FileOutputFormat.setOutputPath(job, new Path("Output2"));
        job.waitForCompletion(true);
        return 0;
    }
}

mapper class :

public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {

    protected void map(LongWritable key, Text value,Context context) throws java.io.IOException ,InterruptedException {

        String str[] = value.toString().split(" ");
        for(int i =0; i<str.length;i++){
            context.write(new Text(str[i]), new Text());
        }
    };
}

Reducer Class:

public class FileReducer extends Reducer<Text, Text, Text, Text> {

    protected void reduce(Text key,Iterable<Text> value,Context context) throws java.io.IOException ,InterruptedException {

        int count=0;
        for (Text text : value) {
            count++;
        }
        context.write(key, new Text(count+""));
    };
}

这篇关于Hadoop Mapreduce - 访问本地文件系统的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆