java.lang.IllegalArgumentException:FS错误:,expected:hdfs:// localhost:9000 [英] java.lang.IllegalArgumentException: Wrong FS: , expected: hdfs://localhost:9000

查看:800
本文介绍了java.lang.IllegalArgumentException:FS错误:,expected:hdfs:// localhost:9000的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图实现减少边连接,并使用mapfile reader来查找分布式缓存,但在stderr中检查时没有查找值,它显示以下错误,lookupfile文件已存在于hdfs中,似乎正如在标准输出中看到的那样正确加载到缓存中。


java.lang.IllegalArgumentException:错误的FS:
文件:/ app / hadoop / tmp / mapred / local / taskTracker / distcache / -8118663285704962921_-1196516983_170706299 / localhost / input / delivery_status / DeliveryStatusCodes / data,
expected:hdfs:// localhost:9000 at
org.apache。 hadoop.fs.FileSystem.checkPath(FileSystem.java:390)at
org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140)
at
org.apache。 hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:554)
at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:816)at
org.apache.hadoop.io。 SequenceFile $读者。(组合舞蹈
org.apache.hadoop.io.SequenceFile $ Reader。(SequenceFile.java:1474)at
org.apache.hadoop.io.MapFile $ Reader.createDataFileReader(MapFile .java:302)
at org.apache.hadoop.io.MapFile $ Reader.open(MapFile.java:284)at
org.apache.hadoop.io.MapFile $ Reader。(MapFile。
org.apache.hadoop.io.MapFile $ Reader。(MapFile.java:260)at
org.apache.hadoop.io.MapFile $ Reader。(MapFile.java: 253)at

处的mr_poc.reducerrsj.initializeDepartmentsMap(reducerrsj.java:59)
处的$ mr_poc.reducerrsj.setup(reducerrsj.java:42)org.apache.hadoop.mapreduce.Reducer .run(Reducer.java:174)at
org.apache.hadoop.mapred.ReduceTask.run(ReduceTask)
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
.java:418)at
org.apache.hadoop.mapred.Child $ 4.run(Child.java:255)at
java.security.AccessController.doPrivileged(Native Method)at
javax.security.auth.Subject.doAs(Subject.java:416)at
org.ap ache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
java.lang.NullPointerException at
mr_poc.reducerrsj.buildOutputValue(reducerrsj.java:83)at
mr_poc.reducerrsj.reduce(reducerrsj.java:127)at
mr_poc.reducerrsj.reduce(reducerrsj.java:1)at
org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)at
org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)at
org.apache.hadoop.mapred.Child $ 4.run(Child.java:255)at
java.security .accessController.doPrivileged(Native Method)at
javax.security.auth.Subject.doAs(Subject.java:416)at
org.apache.hadoop.security。


这是我的驱动程序代码,

  package mr_poc; 

import java.net.URI;

导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class driverrsj extends Configured implements Tool {
$ b @Override
public int run(String [] arg)throws Exception {
if(arg。 System.out.printf(DriverJS< Input Dir1>< Input Dir2>< Output Dir> \\\
需要3个参数);
返回-1;
}
Job job = new Job(getConf());
配置conf = job.getConfiguration();
DistributedCache.addCacheFile(新URI(/ input / delivery_status),conf);
System.out.println(Cache:+ job.getConfiguration()。get(mapred.cache.files));
job.setJarByClass(driverrsj.class);
conf.setInt(cust_info,1);
conf.setInt(status,2);
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(arg [0] .toString())。append(,)。append(arg [1] .toString());
FileInputFormat.setInputPaths(job,inputPaths.toString());
FileOutputFormat.setOutputPath(job,new Path(arg [2]));
job.setJarByClass(driverrsj.class);
job.setMapperClass(mappperRSJ.class);
job.setReducerClass(reducerrsj.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
//job.setPartitionerClass (partinonrsj.class);
job.setSortComparatorClass(s​​econdarysortcomp.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);


布尔成功= job.waitForCompletion(true);
返回成功? 0:1;


$ b public static void main(String [] args)throws Exception {
int exitCode = ToolRunner.run(new Configuration(),new driverrsj() ,参数);
System.exit(exitCode);

}


}

这是我的reducer代码



package mr_poc;

  import java .io.File; 
import java.io.IOException;
import java.net.URI;

导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
导入org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class reducerrsj extends Reducer< CompositeKeyWritableRSJ,Text,NullWritable,Text> {
StringBuilder reduceValueBuilder = new StringBuilder();
NullWritable nullWritableKey = NullWritable.get();
Text reduceOutputValue = new Text();
String strSeparator =,;
private MapFile.Reader deptMapReader = null;
Text txtMapFileLookupKey = new Text();
Text txtMapFileLookupValue = new Text();
// Path [] cacheFilesLocal;
// Path [] eachPath;

@Override
protected void set(Context context)throws IOException,InterruptedException {
Path [] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());


$ b $ for(Path eachPath:cacheFiles){

System.out.println(eachPath.toString());
System.out.println(eachPath.getName());
if(eachPath.getName()。toString()。contains(delivery_status))
{

URI uriUncompressedFile = new File(eachPath.toString()+/ DeliveryStatusCodes)toURI();
initializeDepartmentsMap(uriUncompressedFile,context);




$ b // @ SuppressWarnings(deprecation)
private void initializeDepartmentsMap(URI uriUncompressedFile,Context context)
抛出IOException {
// {{
//初始化映射文件的阅读器(边数据)
配置conf = context.getConfiguration();
conf.addResource(新路径(/ usr / local / hadoop-1.2.1 / conf / core-site.xml));
FileSystem dfs = FileSystem.get(conf);
尝试{


deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(),context.getConfiguration());
} catch(Exception e){
e.printStackTrace();
}
//}}
}
private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
StringBuilder reduceValueBuilder,Text value){

if key.getsourceindex()== 2){


String arrSalAttributes [] = value.toString()。split(,);
txtMapFileLookupKey.set(arrSalAttributes [0] .toString());
System.out.println(key =+ txtMapFileLookupKey);


尝试{

deptMapReader.get(txtMapFileLookupKey,txtMapFileLookupValue);
}
catch(Exception e){
txtMapFileLookupValue.set();
e.printStackTrace();
} finally {
txtMapFileLookupValue
.set((txtMapFileLookupValue.equals(null)|| txtMapFileLookupValue
.equals())?NOT-FOUND
:txtMapFileLookupValue.toString());
}

reduceValueBuilder.append(txtMapFileLookupValue.toString()); (key.getsourceindex()== 1){

String arrEmpAttributes [] = value.toString()。split(,)


} else if );
reduceValueBuilder.append(arrEmpAttributes [0] .toString())。append(
strSeparator);
}



txtMapFileLookupKey.set();
txtMapFileLookupValue.set();

return reduceValueBuilder;

$ b @Override
public void reduce(CompositeKeyWritableRSJ key,Iterable< Text> values,
Context context)throws IOException,InterruptedException {


for(Text value:values){
buildOutputValue(key,reduceValueBuilder,value);


//删除最后一个逗号,设置值并发出输出
if(reduceValueBuilder.length()> 1){

/ /reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
//发送输出
reduceOutputValue.set(reduceValueBuilder.toString());
context.write(nullWritableKey,reduceOutputValue);
} else {
System.out.println(Key =+ key.getjoinkey()+src =
+ key.getsourceindex());

}
//复位变量
reduceValueBuilder.setLength(0);
reduceOutputValue.set();

$ b @Override
protected void cleanup(Context context)抛出IOException,
InterruptedException {
if(deptMapReader!= null)
{
deptMapReader.close();
}
}
}

这是我的核心网站-Xml

 <?xml version =1.0?> 
<?xml-stylesheet type =text / xslhref =configuration.xsl?>

<! - 将特定于站点的属性覆盖到此文件中。 - >

<配置>
<属性>
< name> hadoop.tmp.dir< / name>
< value> / app / hadoop / tmp< / value>
< description>其他临时目录的基础。< / description>
< / property>
<属性>
<名称> fs.default.name< /名称>
< value> hdfs:// localhost:9000< / value>
< description>默认文件系统的名称。一个URI,其
模式和权限决定了FileSystem的实现。
uri的方案决定配置属性(fs.SEMEME.impl)命名
FileSystem实现类。 uri的权限用于
确定文件系统的主机,端口等。< / description>
< / property>
< / configuration>

任何帮助将不胜感激。提前致谢!

解决方案

即使我有同样的问题,我解决了这个问题,通过添加
FileSystem fs = FileSystem.get(new URI(hdfs:// localhost:9000),conf)在驱动程序类中


URI必须从 java.net.URI
导入

I am trying to implement reduce side join , and using mapfile reader to look up distributed cache but it is not looking up the values when checked in stderr it showed the following error, lookupfile file is already present in hdfs , and seems to be loaded correctly into cache as seen in the stdout.

java.lang.IllegalArgumentException: Wrong FS: file:/app/hadoop/tmp/mapred/local/taskTracker/distcache/-8118663285704962921_-1196516983_170706299/localhost/input/delivery_status/DeliveryStatusCodes/data, expected: hdfs://localhost:9000 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:390) at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:554) at org.apache.hadoop.fs.FileSystem.getLength(FileSystem.java:816) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1479) at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1474) at org.apache.hadoop.io.MapFile$Reader.createDataFileReader(MapFile.java:302) at org.apache.hadoop.io.MapFile$Reader.open(MapFile.java:284) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:273) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:260) at org.apache.hadoop.io.MapFile$Reader.(MapFile.java:253) at mr_poc.reducerrsj.initializeDepartmentsMap(reducerrsj.java:59) at mr_poc.reducerrsj.setup(reducerrsj.java:42) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:174) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) at org.apache.hadoop.mapred.Child.main(Child.java:249) java.lang.NullPointerException at mr_poc.reducerrsj.buildOutputValue(reducerrsj.java:83) at mr_poc.reducerrsj.reduce(reducerrsj.java:127) at mr_poc.reducerrsj.reduce(reducerrsj.java:1) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:416) at org.apache.hadoop.security.

This is my driver code ,

package mr_poc;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class driverrsj extends Configured implements Tool{

    @Override
    public int run(String[] arg) throws Exception {
if(arg.length != 3)
{
    System.out.printf("3 parameters are required for DriverRSJ- <Input Dir1> <Input Dir2> <Output Dir> \n");
    return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
DistributedCache.addCacheFile(new URI("/input/delivery_status"), conf);
System.out.println("Cache : " + job.getConfiguration().get("mapred.cache.files"));
job.setJarByClass(driverrsj.class);
conf.setInt("cust_info", 1);
conf.setInt("status", 2);
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(arg[0].toString()).append(",").append(arg[1].toString());
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(arg[2]));
job.setJarByClass(driverrsj.class);
job.setMapperClass(mappperRSJ.class);
job.setReducerClass(reducerrsj.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
//job.setPartitionerClass(partinonrsj.class);
job.setSortComparatorClass(secondarysortcomp.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);


boolean success =job.waitForCompletion(true);
return success? 0 : 1;

    }

    public static void main(String[] args) throws Exception{
        int exitCode = ToolRunner.run(new Configuration(), new driverrsj(),args);
        System.exit(exitCode);

    }


}

This is my reducer code

package mr_poc;

import java.io.File;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class reducerrsj extends Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text>{
    StringBuilder reduceValueBuilder = new StringBuilder("");
    NullWritable nullWritableKey = NullWritable.get();
    Text reduceOutputValue = new Text("");
    String strSeparator = ",";
    private MapFile.Reader deptMapReader = null;
    Text txtMapFileLookupKey = new Text();
    Text txtMapFileLookupValue = new Text();
    //Path[] cacheFilesLocal;
    //Path[] eachPath;

    @Override
    protected void setup(Context context) throws IOException,InterruptedException {
        Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());



        for ( Path eachPath : cacheFiles){

            System.out.println(eachPath.toString());
              System.out.println(eachPath.getName());
            if(eachPath.getName().toString().contains("delivery_status"))
            {

                URI uriUncompressedFile = new File(eachPath.toString()+ "/DeliveryStatusCodes").toURI();
                initializeDepartmentsMap(uriUncompressedFile, context);

            }
            }
        }

    //@SuppressWarnings("deprecation")
    private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
    throws IOException {
    // {{
    // Initialize the reader of the map file (side data)
        Configuration conf = context.getConfiguration();
        conf.addResource(new Path("/usr/local/hadoop-1.2.1/conf/core-site.xml"));
        FileSystem dfs = FileSystem.get(conf);
    try {


    deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());
    } catch (Exception e) {
    e.printStackTrace();
    }
    // }}
    }
    private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
            StringBuilder reduceValueBuilder, Text value) {

            if (key.getsourceindex() == 2) {


            String arrSalAttributes[] = value.toString().split(",");
            txtMapFileLookupKey.set(arrSalAttributes[0].toString());
            System.out.println("key=" + txtMapFileLookupKey);


            try {

            deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
            }
             catch (Exception e) {
            txtMapFileLookupValue.set("");
                e.printStackTrace();
            } finally {
            txtMapFileLookupValue
            .set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
            .equals("")) ? "NOT-FOUND"
            : txtMapFileLookupValue.toString());
            }

            reduceValueBuilder.append(txtMapFileLookupValue.toString());


            } else if(key.getsourceindex() == 1) {

            String arrEmpAttributes[] = value.toString().split(",");
            reduceValueBuilder.append(arrEmpAttributes[0].toString()).append(
            strSeparator);
            } 



            txtMapFileLookupKey.set("");
            txtMapFileLookupValue.set("");

            return reduceValueBuilder;
    }

    @Override
    public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
    Context context) throws IOException, InterruptedException {


    for (Text value : values) {
    buildOutputValue(key, reduceValueBuilder, value);
    }

    // Drop last comma, set value, and emit output
    if (reduceValueBuilder.length() > 1) {

    //reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
    // Emit output
    reduceOutputValue.set(reduceValueBuilder.toString());
    context.write(nullWritableKey, reduceOutputValue);
    } else {
    System.out.println("Key=" + key.getjoinkey() + "src="
    + key.getsourceindex());

    }
    // Reset variables
    reduceValueBuilder.setLength(0);
    reduceOutputValue.set("");

    }
    @Override
    protected void cleanup(Context context) throws IOException,
    InterruptedException {
         if(deptMapReader != null)
         {
deptMapReader.close();
    }
    }
}

this is my core-site-Xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/app/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>
<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:9000</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>

Any help would be highly appreciated. Thanks in advance!!!

解决方案

Even i had a same issue, i resolved that by adding FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),conf) in driver class

URI have to import from java.net.URI

这篇关于java.lang.IllegalArgumentException:FS错误:,expected:hdfs:// localhost:9000的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆