使用压缩文件作为输入运行hadoop。由hadoop读取的数据输入不按顺序。数字格式异常 [英] Running hadoop with compressed files as input. Data Input read by hadoop not in sequence. Number format exception

查看:143
本文介绍了使用压缩文件作为输入运行hadoop。由hadoop读取的数据输入不按顺序。数字格式异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在更改mapred-site.xml中的属性后,我输入tar.bz2文件,.gz和tar.gz文件作为输入。以上都没有工作。我假设发生在这里是由hadoop读取作为输入的记录失去​​序列ie。一列输入是字符串,另一列是整数,但是当从压缩文件中读取它时,由于一些失序数据,在某一点,hadoop将字符串部分作为整数读取,并生成非法格式异常。我只是一个noob。我想知道在配置或我的代码是否有问题。

I giving a tar.bz2 file ,.gz and tar.gz files as input after changing the properties in mapred-site.xml. None of the above seem to have worked. What I assumed to happen here is the records read as input by hadoop go out of sequence ie. one column of input is string and the other is an integer but while reading it from the compressed file because of some out of sequence data, at some point hadoop reads the string part as an integer and generates an illegal format exception. I'm just a noob. I want to know whether there is a problem in the configuration or my code.

core-site.xml中的属性是

The properties in core-site.xml are

<property>
  <name>io.compression.codecs</name>
   <value>org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apac\
he.hadoop.io.compress.SnappyCodec</value>
   <description>A list of the compression codec classes that can be used for compression/decompression.</description>
</property>

属性

<property>
  <name>mapred.compress.map.output</name>
  <value>true</value>
</property>

<property>
   <name>mapred.map.output.compression.codec</name>
   <value>org.apache.hadoop.io.compress.BZip2Codec</value>
</property>
<property>
<name>mapred.output.compression.type</name>
<value>BLOCK</value>
</property>

这是我的代码

package org.myorg;

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.util.NativeCodeLoader;        
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.compress.GzipCodec;        
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.BZip2Codec;

public class MySort{
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable Marks = new IntWritable();
    private Text name = new Text();
        String one,two;
    int num;
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
        one=tokenizer.nextToken();
        name.set(one);
        if(tokenizer.hasMoreTokens())
            two=tokenizer.nextToken();
        num=Integer.parseInt(two);
        Marks.set(num);
        context.write(name, Marks);
        }
    }
    } 

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
        sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
    }

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    //  conf.set("mapreduce.job.inputformat.class", "com.wizecommerce.utils.mapred.TextInputFormat");

    //  conf.set("mapreduce.job.outputformat.class", "com.wizecommerce.utils.mapred.TextOutputFormat");
    //  conf.setBoolean("mapreduce.map.output.compress",true);
    conf.setBoolean("mapred.output.compress",true);
    //conf.setBoolean("mapreduce.output.fileoutputformat.compress",false);
    //conf.setBoolean("mapreduce.map.output.compress",true);
    conf.set("mapred.output.compression.type", "BLOCK");     
    //conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
    //      conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
    conf.setClass("mapred.map.output.compression.codec", BZip2Codec.class, CompressionCodec.class);
        Job job = new Job(conf, "mysort");
    job.setJarByClass(org.myorg.MySort.class);
    job.setJobName("mysort");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    //  FileInputFormat.setCompressInput(job,true);
    FileOutputFormat.setCompressOutput(job, true);
    //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    //  conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString()); 

    FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
    }

}

makefile

run:    all
        -sudo ./a.out
        sudo chmod 777 -R Data
        -sudo rm data.tar.bz2
        sudo tar -cvjf data.tar.bz2 Data/data.txt
        sudo javac -classpath /home/hduser/12115_Select_Query/hadoop-core-1.1.2.jar -d mysort MySort.java
        sudo jar -cvf mysort.jar -C mysort/ .
        -hadoop fs -rmr MySort/output
        -hadoop fs -rmr MySort/input
        hadoop fs -mkdir MySort/input
        hadoop fs -put data.tar.bz2 MySort/input
        hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/output
        -sudo rm /home/hduser/Out/sort.txt
        hadoop fs -copyToLocal MySort/output/part-r-00000 /home/hduser/Out/sort.txt
        sudo gedit /home/hduser/Out/sort.txt

all:    rdata.c
        -sudo rm a.out
        -gcc rdata.c -o a.out

exec:   run

.PHONY: exec run

命令:

hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/output

/ p>

Here is the output:

Java HotSpot(TM) 64-Bit Server VM warning: You have loaded library /usr/local/hadoop/lib/native/libhadoop.so.1.0.0 which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
14/06/25 11:20:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/06/25 11:20:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/06/25 11:20:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/06/25 11:20:29 INFO input.FileInputFormat: Total input paths to process : 1
14/06/25 11:20:29 INFO mapreduce.JobSubmitter: number of splits:1
14/06/25 11:20:29 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
14/06/25 11:20:29 INFO Configuration.deprecation: mapred.map.output.compression.codec is deprecated. Instead, use mapreduce.map.output.compress.codec
14/06/25 11:20:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1403675322820_0001
14/06/25 11:20:30 INFO impl.YarnClientImpl: Submitted application application_1403675322820_0001
14/06/25 11:20:30 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1403675322820_0001/
14/06/25 11:20:30 INFO mapreduce.Job: Running job: job_1403675322820_0001
14/06/25 11:20:52 INFO mapreduce.Job: Job job_1403675322820_0001 running in uber mode : false
14/06/25 11:20:52 INFO mapreduce.Job:  map 0% reduce 0%
14/06/25 11:21:10 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_0, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.myorg.MySort$Map.map(MySort.java:36)
    at org.myorg.MySort$Map.map(MySort.java:23)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:21:29 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_1, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.myorg.MySort$Map.map(MySort.java:36)
    at org.myorg.MySort$Map.map(MySort.java:23)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:21:49 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_2, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.myorg.MySort$Map.map(MySort.java:36)
    at org.myorg.MySort$Map.map(MySort.java:23)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:22:10 INFO mapreduce.Job:  map 100% reduce 100%
14/06/25 11:22:10 INFO mapreduce.Job: Job job_1403675322820_0001 failed with state FAILED due to: Task failed task_1403675322820_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0

14/06/25 11:22:10 INFO mapreduce.Job: Counters: 9
    Job Counters 
        Failed map tasks=4
        Launched map tasks=4
        Other local map tasks=3
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=69797
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=69797
        Total vcore-seconds taken by all map tasks=69797
        Total megabyte-seconds taken by all map tasks=71472128

我也试过使用这个:

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar   -Dmapred.output.compress=true   -Dmapred.compress.map.output=true   -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec   -Dmapred.reduce.tasks=0   -input MySort/input/data.txt   -output MySort/zip1

它是成功创建压缩文件

hadoop fs -ls MySort/zip1

Found 3 items
-rw-r--r--   1 hduser supergroup          0 2014-06-25 10:43 MySort/zip1/_SUCCESS
-rw-r--r--   1 hduser supergroup   42488018 2014-06-25 10:43 MySort/zip1/part-00000.bz2
-rw-r--r--   1 hduser supergroup   42504084 2014-06-25 10:43 MySort/zip1/part-00001.bz2

然后运行:

hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/zip1

当我运行它而不使用压缩文件bz2并直接传递它的文本文件Data / data.txt,即上传它到MySort /输入hdfs(hadoop fs -put Data / data.txt MySort / input)。

It works fine when I run it without using compressed file bz2 and directly passing it the text file Data/data.txt i.e uploading it to MySort/input in hdfs (hadoop fs -put Data/data.txt MySort/input).

任何帮助是感谢的

推荐答案

我做了一个工作。我使用了一个工具Runner。

I did a work around for this. I used a Tool Runner.

package org.myorg;

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.util.NativeCodeLoader;        
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.compress.GzipCodec;        
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ToolMapReduce extends Configured implements Tool 
{


    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> 
    {
        private final static IntWritable Marks = new IntWritable();
        private Text name = new Text();
        String one,two;
        int num;
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) 
            {
            one=tokenizer.nextToken();
            name.set(one);
            if(tokenizer.hasMoreTokens())
                two=tokenizer.nextToken();
            num=Integer.parseInt(two);
            Marks.set(num);
            context.write(name, Marks);
            }
        }
    } 

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> 
    {

        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException 
        {
            int sum = 0;
            for (IntWritable val : values) 
            {
            sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception  
    {
        int res = ToolRunner.run(new Configuration(), new ToolMapReduce(), args);
        System.exit(res);
    }

    public int run(String[] args) throws Exception
    {   

        Configuration conf = this.getConf();
        //Configuration conf = new Configuration();
        //conf.setOutputFormat(SequenceFileOutputFormat.class); 
        //SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); 
        //SequenceFileOutputFormat.setCompressOutput(conf, true); 
        //conf.set("mapred.output.compress","true");
        //  conf.set("mapred.output.compression","org.apache.hadoop.io.compress.SnappyCodec");

        //conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");
        //  conf.set("mapreduce.job.inputformat.class", "com.wizecommerce.utils.mapred.TextInputFormat");

        //  conf.set("mapreduce.job.outputformat.class", "com.wizecommerce.utils.mapred.TextOutputFormat");
        //  conf.setBoolean("mapreduce.map.output.compress",true);
        conf.setBoolean("mapred.output.compress",true);
        //conf.setBoolean("mapreduce.output.fileoutputformat.compress",false);
        //conf.setBoolean("mapreduce.map.output.compress",true);
        conf.set("mapred.output.compression.type", "BLOCK");     
        //conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
        //      conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
        conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
        Job job = new Job(conf, "mysort");
        job.setJarByClass(org.myorg.ToolMapReduce.class);
        //job.setJarByClass(org.myorg.MySort.class);
        job.setJobName("mysort");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        //  FileInputFormat.setCompressInput(job,true);
        FileOutputFormat.setCompressOutput(job, true);
        //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        //  conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString()); 

        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
        //job.waitForCompletion(true);
    }


}

这篇关于使用压缩文件作为输入运行hadoop。由hadoop读取的数据输入不按顺序。数字格式异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆