Hadoop Mapreduce - 访问本地文件系统 [英] Hadoop Mapreduce - Access Local File System
问题描述
我有一个要求,在Map Reduce代码中应该读取每个节点中的本地文件系统。该程序将在HDFS上运行,我不能在xml文件中更改hadoop的FileSystem属性进行配置。
我尝试了以下解决方案,但都没有给出结果。
方法1
Configuration config = new Configuration()
FileSystem localFileSystem = FileSystem.get(config);
localFileSystem.set(fs.defaultFS,file:///);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(/ user / input / localFile))));
方式2
Configuration config = new Configuration();
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(/ user / input / localFile))));
方法3
Configuration config = new Configuration();
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
localFileSystem.set(fs.defaultFS,file:///);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(/ user / input / localFile))));
方法4
Configuration config = new Configuration();
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.getRaw()。open(new Path(/ user / input / localFile))));
这也没有用
[
错误堆栈
attempt_201406050021_0018_m_000000_2:java.io.FileNotFoundException:文件/ home / cloudera / sftp / id_rsa不存在
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:468)
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:380)
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:231)
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:183)
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.LocalFileSystem.copyFromLoc alFile(LocalFileSystem.java:81)
attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1934)
attempt_201406050021_0018_m_000000_2:at com.skanda.ecomm.sftp.FTPMapper。 configure(FTPMapper.java:91)
我希望在这里得到一个积极的解决方案。主要类(Driver类)
<$>
c $ c> / *
* @ SFTPClient.java @ 2014年5月20日
*
*
* /
package com.skanda.ecomm.sftp;
import java.net.URI;
导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/ **
*
*< p>
* SFTPClient类
*< / p>
*
* @author skanda
* @version 1.0
*
* /
公共类SFTPClient扩展Configured implements工具{
public int run(String [] args)throws Exception {
Configuration config = getConf();
String inputPath = config.get(ApplicationConstants.INPUT_PATH);
String outputPath = config.get(ApplicationConstants.OUTPUT_PATH);
String configPath = config.get(ApplicationConstants.CONFIG_PATH);
int redurs = Integer.parseInt(config.get(ApplicationConstants.REDUCERS));
$ b $ if(outputPath == null || inputPath == null || configPath == null){
throw new Exception(Usage:\\\
+-D configPath =< ; configPath> -D inputPath =< inputPath> -D redurs =< redurs+
-D outputPath =< path>);
}
JobConf conf = new JobConf(SFTPClient.class);
conf.setJobName(SFTP注入客户端);
DistributedCache.addCacheFile(new URI(configPath),conf);
conf.setMapperClass(FTPMapper.class);
conf.setReducerClass(FTPReducer.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(IntWritable.class);
//配置应该包含对你的namenode的引用
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path(outputPath),true); // true代表递归,删除你给的文件夹
$ b $ conf.setStrings(ApplicationConstants.INPUT_PATH,inputPath);
conf.setStrings(ApplicationConstants.OUTPUT_PATH,outputPath);
FileInputFormat.setInputPaths(conf,new Path(inputPath));
FileOutputFormat.setOutputPath(conf,new Path(outputPath));
conf.setNumReduceTasks(reducer);
conf.setInt(ApplicationConstants.NUNBER_OF_REDUCERS,redurs);
JobClient.runJob(conf);
返回0;
public static void main(String [] args)throws Exception {
int exitCode = ToolRunner.run(new SFTPClient(),args);
System.exit(exitCode);
$ Map
$ b/ *
* @ FTPMapper.java @ 2014年5月20日
*
*
* /
包com.skanda.ecomm.sftp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
导入java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import com.ftp.mapreduce.CommonUtility;
import com.ftp.mapreduce.RetrieveFileNames;
import com.jcraft.jsch.hm.Channel;
/ **
*
*< p>
* FTP映射器类
*< / p>
*
* @author skanda
* @version 1.0
*
* /
@SuppressWarnings(unused)
public class FTPMapper扩展MapReduceBase实现Mapper< LongWritable,Text,IntWritable,Text> {
私人URI [] localFiles;
private String userName;
private String hostName;
private String folderPath;
private int redurs;
private byte [] pvtKey;
private String fileName;
private String startDate;
private String endDate;
private String sshKeyPath;
私人字符串密码;
$ b $ public void configure(JobConf job){
Properties properties = new Properties();
尝试{
localFiles = DistributedCache.getCacheFiles(job);
if(localFiles!= null& localFiles.length == 1){
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(localFiles [0],conf);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(localFiles [0]))));
properties.load(bufferRedaer);
userName = properties.getProperty(ApplicationConstants.USER_NAME);
redurs = job.getInt(ApplicationConstants.NUNBER_OF_REDUCERS,30);
hostName = properties.getProperty(ApplicationConstants.SFTP_SERVER_HOST);
folderPath = properties.getProperty(ApplicationConstants.HOSTFILE_DIRECTORY_PATH);
fileName = properties.getProperty(ApplicationConstants.FILE_NAME_PATTERN);
startDate = properties.getProperty(ApplicationConstants.FILE_START_DATE);
endDate = properties.getProperty(ApplicationConstants.FILE_END_DATE);
sshKeyPath = properties.getProperty(ApplicationConstants.SSH_KEY_PATH);
password = properties.getProperty(ApplicationConstants.PASSWORD);
System.out.println(----------------------------------- ---------------);
/ * FileSystem fs = FileSystem.getLocal(conf);
//路径inputPath = fs.makeQualified(new Path(sshKeyPath));
String inputPath = new Path(file:/// home / cloudera /+ sshKeyPath).toUri()。getPath();
fs.copyFromLocalFile(new Path(inputPath),new Path(outputSFTP / idFile)); * /
try {
Configuration conf1 = new Configuration();
Path pt = new Path(file:///home/cloudera/.ssh/id_rsa);
FileSystem fs = FileSystem.get(new URI(file:///home/cloudera/.ssh/id_rsa),conf);
LocalFileSystem localFileSystem = fs.getLocal(conf1);
BufferedReader bufferRedaer1 = new BufferedReader(new InputStreamReader(localFileSystem.open(pt)));
String str = null; ((str = bufferRedaer1.readLine())!= null)
{
System.out.println(-----------);
System.out.println(str);
catch(Exception e){
System.out.println(failed again);
String computername = InetAddress.getLocalHost()。getHostName();
System.out.println(computername);
e.printStackTrace();
}
System.out.println(------------------------------ --------------------);
配置config = new Configuration();
config.set(fs.defaultFS,file:////);
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(sshKeyPath))));
/ *配置config = new配置();
//config.set(\"fs.defaultFS,file:///home/cloudera/.ssh/id_rsa);
LocalFileSystem fileSystm = FileSystem.getLocal(config);
Path path = fileSystm.makeQualified(new Path(/ home / cloudera / .ssh / id_rsa)); * /
//FileInputFormat.setInputPaths(job,path);
// bufferRedaer = new BufferedReader(new InputStreamReader(fileSystem.open(path)));
String key =;
尝试{
String line =; ((line = bufferRedaer.readLine())!= null){
key + = line +\\\
;
while(
}
pvtKey = key.getBytes();
} catch(Exception e){
e.printStackTrace();
} finally {
//fileSystem.close();
//bufferRedaer.close();
$ b $ catch(IOException e){
e.printStackTrace();
$ b $公共无效映射(LongWritable键,文本值,OutputCollector< IntWritable,Text>输出,Reporter记者)
抛出IOException {
列表< String> filterFileNamesList = new ArrayList< String>();
频道频道= CommonUtility.connectSFTP(userName,hostName,pvtKey);
地图< String,String> fileNamesMap = CommonUtility.getFileNames(channel,folderPath);
列表< String> filterFileNameList_output = RetrieveFileNames.FILTER_BY_NAME.retrieveFileNames(fileNamesMap,filterFileNamesList,
fileName,startDate,endDate); (int i = 0; i< filterFileNameList_output.size(); i ++){
int keyGroup = i%reducer;
output.collect(new IntWritable(keyGroup),new Text(filterFileNameList_output.get(i)));
$ div class =h2_lin>解决方案
当程序运行在hdfs上,并且我的txt文件在这个位置时,这段代码正在为我工作:
$ b / home / Rishi / Documents / RishiFile /r.txt
public class HadoopRead {
public static void main(String [] args){
try {
Configuration conf = new Configuration();
Path pt = new Path(/ home / Rishi / Documents / RishiFile / r.txt);
FileSystem fs = FileSystem.get(new URI(/ home / Rishi / Documents / RishiFile),conf);
LocalFileSystem localFileSystem = fs.getLocal(conf);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(pt)));
String str = null; ((str = bufferRedaer.readLine())!= null)
{
System.out.println(-----------);
System.out.println(str);
}
} catch(Exception e){
e.printStackTrace();
$ p $ file on hdfs
我的主类
import org.apache。 hadoop.conf.Configuration;
导入org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FileDriver extends Configured implements Tool {
public static void main(String [] args){
try {
ToolRunner .run(new Configuration(),new FileDriver(),args);
System.exit(0);
} catch(Exception e){
e.printStackTrace();
$ b public int run(String [] arg0)throws Exception {
Configuration conf = new Configuration();
Path pt = new Path(file:/// home / winoria / Documents / Ri / r);
工作职位=新职位(conf,新职位);
job.setJarByClass(FileDriver.class);
job.setMapperClass(FileMapper.class);
job.setReducerClass(FileReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job,pt);
FileSystem.get(job.getConfiguration())。delete(new Path(Output2),true);
FileOutputFormat.setOutputPath(job,new Path(Output2));
job.waitForCompletion(true);
返回0;
$ p 映射类:
public class FileMapper扩展了Mapper< LongWritable,Text,Text,Text> {
protected void map(LongWritable key,Text value,Context context)引发java.io.IOException,InterruptedException {
String str [] = value.toString()。分裂( ); (new Text(str [i]),new Text());
for(int i = 0; i< str.length; i ++){
context.write
}
};
Reducer类:
public class FileReducer extends Reducer< Text,Text,Text,Text> {
$ b protected void reduce(Text key,Iterable< Text> value,Context context)throws java.io.IOException,InterruptedException {
int count = 0;
for(Text text:value){
count ++;
}
context.write(key,new Text(count +));
};
}
I have a requirement where in the Map Reduce code should read the local file system in each node. The program will be running on HDFS and I cannot change the FileSystem property for hadoop in xml files for configuration.
I have tried the following solutions, but none gave me results.
Approach 1
Configuration config = new Configuration();
FileSystem localFileSystem = FileSystem.get(config);
localFileSystem.set("fs.defaultFS", "file:///");
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path("/user/input/localFile"))));
Approach 2
Configuration config = new Configuration();
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path("/user/input/localFile"))));
Approach 3
Configuration config = new Configuration();
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
localFileSystem.set("fs.defaultFS", "file:///");
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path("/user/input/localFile"))));
Approach 4
Configuration config = new Configuration();
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.getRaw().open(new Path("/user/input/localFile"))));
This did not work either
[Reading HDFS and local files in Java
Each of them gave the error: No such file exists
Error Stack
attempt_201406050021_0018_m_000000_2: java.io.FileNotFoundException: File /home/cloudera/sftp/id_rsa does not exist
attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:468)
attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:380)
attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:231)
attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:183)
attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.LocalFileSystem.copyFromLocalFile(LocalFileSystem.java:81)
attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1934)
attempt_201406050021_0018_m_000000_2: at com.skanda.ecomm.sftp.FTPMapper.configure(FTPMapper.java:91)
I am hoping to get a positive solution here. Let me know where I am going wrong.
Main class (Driver class)
/*
* @SFTPClient.java @May 20, 2014
*
*
*/
package com.skanda.ecomm.sftp;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* <p>
* SFTPClient Class
* </p>
*
* @author skanda
* @version 1.0
*
*/
public class SFTPClient extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration config = getConf();
String inputPath = config.get(ApplicationConstants.INPUT_PATH);
String outputPath = config.get(ApplicationConstants.OUTPUT_PATH);
String configPath = config.get(ApplicationConstants.CONFIG_PATH);
int reducers = Integer.parseInt(config.get(ApplicationConstants.REDUCERS));
if(outputPath == null || inputPath == null || configPath == null) {
throw new Exception("Usage: \n" + "-D configPath=<configPath> -D inputPath=<inputPath> -D reducers=<reducers" +
"-D outputPath=<path>");
}
JobConf conf = new JobConf(SFTPClient.class);
conf.setJobName("SFTP Injection client");
DistributedCache.addCacheFile(new URI(configPath),conf);
conf.setMapperClass(FTPMapper.class);
conf.setReducerClass(FTPReducer.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(IntWritable.class);
// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path(outputPath), true); // true stands for recursively, deleting the folder you gave
conf.setStrings(ApplicationConstants.INPUT_PATH, inputPath);
conf.setStrings(ApplicationConstants.OUTPUT_PATH, outputPath);
FileInputFormat.setInputPaths(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setNumReduceTasks(reducers);
conf.setInt(ApplicationConstants.NUNBER_OF_REDUCERS, reducers);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SFTPClient(), args);
System.exit(exitCode);
}
}
Mapper
/*
* @FTPMapper.java @May 20, 2014
*
*
*/
package com.skanda.ecomm.sftp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import com.ftp.mapreduce.CommonUtility;
import com.ftp.mapreduce.RetrieveFileNames;
import com.jcraft.jsch.hm.Channel;
/**
*
* <p>
* FTP Mapper Class
* </p>
*
* @author skanda
* @version 1.0
*
*/
@SuppressWarnings("unused")
public class FTPMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
private URI[] localFiles;
private String userName;
private String hostName;
private String folderPath;
private int reducers;
private byte[] pvtKey;
private String fileName;
private String startDate;
private String endDate;
private String sshKeyPath;
private String password;
public void configure(JobConf job) {
Properties properties = new Properties();
try {
localFiles = DistributedCache.getCacheFiles(job);
if (localFiles != null && localFiles.length == 1) {
Configuration conf = new Configuration();
FileSystem fileSystem = FileSystem.get(localFiles[0], conf);
BufferedReader bufferRedaer=new BufferedReader(new InputStreamReader(fileSystem.open(new Path(localFiles[0]))));
properties.load(bufferRedaer);
userName = properties.getProperty(ApplicationConstants.USER_NAME);
reducers = job.getInt(ApplicationConstants.NUNBER_OF_REDUCERS, 30);
hostName = properties.getProperty(ApplicationConstants.SFTP_SERVER_HOST);
folderPath = properties.getProperty(ApplicationConstants.HOSTFILE_DIRECTORY_PATH);
fileName = properties.getProperty(ApplicationConstants.FILE_NAME_PATTERN);
startDate = properties.getProperty(ApplicationConstants.FILE_START_DATE);
endDate = properties.getProperty(ApplicationConstants.FILE_END_DATE);
sshKeyPath = properties.getProperty(ApplicationConstants.SSH_KEY_PATH);
password = properties.getProperty(ApplicationConstants.PASSWORD);
System.out.println("--------------------------------------------------");
/*FileSystem fs = FileSystem.getLocal(conf);
//Path inputPath = fs.makeQualified(new Path(sshKeyPath));
String inputPath = new Path("file:///home/cloudera/"+sshKeyPath).toUri().getPath();
fs.copyFromLocalFile(new Path(inputPath), new Path("outputSFTP/idFile") );*/
try{
Configuration conf1 = new Configuration();
Path pt = new Path("file:///home/cloudera/.ssh/id_rsa");
FileSystem fs = FileSystem.get( new URI("file:///home/cloudera/.ssh/id_rsa"), conf);
LocalFileSystem localFileSystem = fs.getLocal(conf1);
BufferedReader bufferRedaer1 = new BufferedReader(new InputStreamReader(localFileSystem.open(pt)));
String str = null;
while ((str = bufferRedaer1.readLine())!= null)
{
System.out.println("-----------");
System.out.println(str);
}
}catch(Exception e){
System.out.println("failed again");
String computername=InetAddress.getLocalHost().getHostName();
System.out.println(computername);
e.printStackTrace();
}
System.out.println("--------------------------------------------------");
Configuration config = new Configuration();
config.set("fs.defaultFS", "file:////");
LocalFileSystem localFileSystem = FileSystem.getLocal(config);
bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(sshKeyPath))));
/*Configuration config = new Configuration();
//config.set("fs.defaultFS", "file:///home/cloudera/.ssh/id_rsa");
LocalFileSystem fileSystm = FileSystem.getLocal(config);
Path path = fileSystm.makeQualified(new Path("/home/cloudera/.ssh/id_rsa"));*/
//FileInputFormat.setInputPaths(job, path);
//bufferRedaer = new BufferedReader(new InputStreamReader(fileSystem.open(path)));
String key = "";
try {
String line = "";
while ((line = bufferRedaer.readLine()) != null) {
key += line + "\n";
}
pvtKey = key.getBytes();
} catch(Exception e){
e.printStackTrace();
} finally {
//fileSystem.close();
//bufferRedaer.close();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter)
throws IOException {
List<String> filterFileNamesList = new ArrayList<String>();
Channel channel = CommonUtility.connectSFTP(userName, hostName, pvtKey);
Map<String, String> fileNamesMap = CommonUtility.getFileNames(channel, folderPath);
List<String> filterFileNameList_output = RetrieveFileNames.FILTER_BY_NAME.retrieveFileNames(fileNamesMap, filterFileNamesList,
fileName, startDate, endDate);
for (int i = 0; i < filterFileNameList_output.size(); i++) {
int keyGroup = i % reducers;
output.collect(new IntWritable(keyGroup), new Text(filterFileNameList_output.get(i)));
}
}
}
解决方案 This code is working for me when program runs on hdfs and my txt file is in this location:
/home/Rishi/Documents/RishiFile/r.txt
public class HadoopRead {
public static void main(String[] args) {
try{
Configuration conf = new Configuration();
Path pt = new Path("/home/Rishi/Documents/RishiFile/r.txt");
FileSystem fs = FileSystem.get( new URI("/home/Rishi/Documents/RishiFile"), conf);
LocalFileSystem localFileSystem = fs.getLocal(conf);
BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(pt)));
String str = null;
while ((str = bufferRedaer.readLine())!= null)
{
System.out.println("-----------");
System.out.println(str);
}
}catch(Exception e){
e.printStackTrace();
}
}
}
Word Count Example for reading local file on hdfs
my main class
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FileDriver extends Configured implements Tool {
public static void main(String[] args) {
try{
ToolRunner.run(new Configuration(), new FileDriver(), args);
System.exit(0);
}catch(Exception e){
e.printStackTrace();
}
}
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
Path pt = new Path("file:///home/winoria/Documents/Ri/r");
Job job = new Job(conf, "new Job");
job.setJarByClass(FileDriver.class);
job.setMapperClass(FileMapper.class);
job.setReducerClass(FileReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, pt);
FileSystem.get(job.getConfiguration()).delete(new Path("Output2"), true);
FileOutputFormat.setOutputPath(job, new Path("Output2"));
job.waitForCompletion(true);
return 0;
}
}
mapper class :
public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {
protected void map(LongWritable key, Text value,Context context) throws java.io.IOException ,InterruptedException {
String str[] = value.toString().split(" ");
for(int i =0; i<str.length;i++){
context.write(new Text(str[i]), new Text());
}
};
}
Reducer Class:
public class FileReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key,Iterable<Text> value,Context context) throws java.io.IOException ,InterruptedException {
int count=0;
for (Text text : value) {
count++;
}
context.write(key, new Text(count+""));
};
}
这篇关于Hadoop Mapreduce - 访问本地文件系统的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!