转换序列文件并通过地图获取键值对,并减少hadoop中的任务 [英] Convert Sequence file and get key, value pairs via map and reduce tasks in hadoop

查看:99
本文介绍了转换序列文件并通过地图获取键值对,并减少hadoop中的任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想通过hadoop map reduce应用程序从序列文件中获取所有关键值对。
我跟随后的 http://lintool.github.com /Cloud9/docs/content/staging-records.html 阅读主类中的顺序文件,但该工作。
我想在hdfs系统中将所有keysvalue对打印到正常的文本文件,我如何执行此操作?
我写了下面的代码。

  import java.io.File; 
import java.io.IOException;
import java.util。*;
import java.util.logging.Level;
import java.util.logging.Logger;

导入org.apache.hadoop.fs.Path;
导入org.apache.hadoop.conf。*;
导入org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io。*;

import org.apache.hadoop.mapreduce。*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount
{
public static class Map extends Mapper
{
private final static IntWritable one = new IntWritable(1);
私人文字=新文字();

public void map(BytesWritable key,BytesWritable value,Context context)throws IOException,InterruptedException
{
System.out.println(key.toString());
System.out.println(value.toString());
context.write(key,value);


public static class Reduce extends Reducer
{
public void reduce(Text key,Iterable< IntWritable> values,Context context)
throws IOException,InterruptedException
{
int sum = 0; (IntWritable val:values)

{
sum + = val.get();
}
context.write(key,new IntWritable(sum));

$ b $ public static void main(String [] args)throws Exception
{
FileUtil.fullyDelete(new File(args [1]));

配置conf = new Configuration();

工作职位=新职位(conf,wordcount);

job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));

job.setJarByClass(WordCount.class);

job.waitForCompletion(true);


$ / code $ / pre

解决方案

 请找到下面的程序。将BytesWritable转换为Text可能会有所帮助。 

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;


public class SequenceFileRead {
public static void main(String args [])throws IOException {
Configuration conf = new Configuration();
Path path = new Path(args [0]);
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf,Reader.file(path));
Text key = new Text();
BytesWritable value = new BytesWritable();
while(reader.next(key,value)){
System.out.println(key);
byte [] bytes = value.getBytes();
int size = bytes.length;
byte [] b =新字节[size];
InputStream is = new ByteArrayInputStream(bytes);
is.read(b);
System.out.println(new String(b));
}
}
finally {
IOUtils.closeStream(reader);
}

}

}


I want to get all key values pairs from a sequencial files via hadoop map reduce application. I followed following post http://lintool.github.com/Cloud9/docs/content/staging-records.html for reading sequencial file in the main class but that dint work. i want to print all keysvalue pairs to normal text file in hdfs system, how can i achive that ? i wrote my code as bellow.

            import java.io.File;
            import java.io.IOException;
            import java.util.*;
            import java.util.logging.Level;
            import java.util.logging.Logger;

            import org.apache.hadoop.fs.Path;
            import org.apache.hadoop.conf.*;
            import org.apache.hadoop.fs.FileSystem;
            import org.apache.hadoop.fs.FileUtil;
            import org.apache.hadoop.io.*;

            import org.apache.hadoop.mapreduce.*;
            import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
            import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
            import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
            import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
            import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

            public class WordCount
            {
                public static class Map extends Mapper
                {
                    private final static IntWritable one = new IntWritable(1);
                    private Text word = new Text();

                    public void map(BytesWritable key, BytesWritable value, Context context) throws IOException, InterruptedException
                    {
                        System.out.println(key.toString());
                        System.out.println(value.toString());
                        context.write(key, value);
                    }
                }
                public static class Reduce extends Reducer
                {
                    public void reduce(Text key, Iterable<IntWritable> values, Context context)
                            throws IOException, InterruptedException
                    {
                        int sum = 0;
                        for (IntWritable val : values)
                        {
                            sum += val.get();
                        }
                        context.write(key, new IntWritable(sum));
                    }
                }
                public static void main(String[] args) throws Exception
                {
                    FileUtil.fullyDelete(new File(args[1]));

                    Configuration conf = new Configuration();

                    Job job = new Job(conf, "wordcount");

                    job.setOutputKeyClass(BytesWritable.class);
                    job.setOutputValueClass(BytesWritable.class);

                    job.setMapperClass(Map.class);
                    job.setReducerClass(Reduce.class);

                    job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat.class);
                    job.setOutputFormatClass(TextOutputFormat.class);

                    FileInputFormat.addInputPath(job, new Path(args[0]));
                    FileOutputFormat.setOutputPath(job, new Path(args[1]));

                    job.setJarByClass(WordCount.class);

                    job.waitForCompletion(true);
                }
            }

解决方案

Please find the Below program. It may be useful in getting some idea in converting BytesWritable to Text.

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;


public class SequenceFileRead {
    public static void main(String args[]) throws IOException{
        Configuration conf=new Configuration();
        Path path=new Path(args[0]);
        SequenceFile.Reader reader=null;
        try{
        reader=new SequenceFile.Reader(conf, Reader.file(path));
        Text key= new Text();
        BytesWritable value=new BytesWritable();
        while(reader.next(key,value)){
            System.out.println(key);
            byte[] bytes=value.getBytes();
            int size=bytes.length;
            byte[] b=new byte[size];
            InputStream is=new ByteArrayInputStream(bytes);
            is.read(b);
            System.out.println(new String(b));
        }
        }
        finally {
            IOUtils.closeStream(reader);
        }

    }

}

这篇关于转换序列文件并通过地图获取键值对,并减少hadoop中的任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆