设置和访问分布式缓存的问题 [英] Problems with setting up and accessing Distributed Cache

查看:86
本文介绍了设置和访问分布式缓存的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

出于某种原因,我无法在网上找到任何有关使用新API获取分布式缓存的好资源。希望这里的某个人能解释我做错了什么。我目前的尝试是我在网上找到的各种东西的一种混杂。



这个程序试图运行k-最近邻居算法。输入文件是测试数据集,分布式缓存保存火车数据集和火车标签。映射器应该获取一行测试数据,并将其与分布式缓存数据中的每一行进行比较,并返回与其最相似的行的标签。

  import java.net.URI; 
导入org.apache.hadoop.conf.Configured;
导入org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KNNDriver extends Configured implements Tool {
public int run(String [] args)throws Exception {
if(args.length!= 2){
System.out.printf(Usage:%s [generic options]< input dir>< output dir> \\\
,getClass()。getSimpleName());
返回-1;
}

配置conf = new Configuration();
// conf.set(mapreduce.input.keyvaluelinerecordreader.key.value.separator,^);

conf.setInt(train_rows,1000);
conf.setInt(test_rows,1000);
conf.setInt(cols,612);
DistributedCache.addCacheFile(新的URI(cacheData / train_sample.csv),conf);
DistributedCache.addCacheFile(新的URI(cacheData / train_labels.csv),conf);

工作职位=新职位(conf);
job.setJarByClass(KNNDriver.class);
job.setJobName(KNN);

FileInputFormat.setInputPaths(job,new Path(args [0]));
FileOutputFormat.setOutputPath(job,new Path(args [1]));

job.setMapperClass(KNNMapper.class);
job.setReducerClass(KNNReducer.class);
// job.setInputFormatClass(KeyValueTextInputFormat.class);

job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

布尔成功= job.waitForCompletion(true);
返回成功? 0:1;

$ b $ public static void main(String [] args)throws Exception {
int exitCode = ToolRunner.run(new Configuration(),new KNNDriver(),args);
System.exit(exitCode);
}
}

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.Scanner;

导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KNNMapper扩展了Mapper< LongWritable,Text,IntWritable,IntWritable> {

int [] [] train_vals;
int [] train_label_vals;
int train_rows;
int test_rows;
int cols;

@Override
public void setup(Context context)throws IOException,InterruptedException {
Configuration conf = context.getConfiguration();

// Path [] cacheFiles = context.getLocalCacheFiles();

int train_rows = conf.getInt(train_rows,0);
int test_rows = conf.getInt(test_rows,0);
int cols = conf.getInt(cols,0);

train_vals = new int [train_rows] [cols];
train_label_vals = new int [train_rows];

//读取训练csv,解析并存储到2d int数组
Scanner myScan;
尝试{
myScan =新扫描仪(新文件(train_sample.csv));

//设置文件
中使用的分隔符myScan.useDelimiter([,\r\\\
] +);

//获取所有令牌并将其存储在某些数据结构中
//我只是将它们打印

System.out.println(myScan loaded for train_sample ); (int row = 0; row< train_rows; row ++){
for(int col = 0; col< cols; col ++){
train_vals [row])的

; [col] = Integer.parseInt(myScan.next()。toString());

}
}

myScan.close();

} catch(FileNotFoundException e){
System.out.print(错误:找不到培训文件。);
}

//读取train_labels csv,解析并存储到2d int数组
try {
myScan = new Scanner(new File(train_labels.csv ));

//设置文件
中使用的分隔符myScan.useDelimiter([,\r\\\
] +);

//获取所有令牌并将其存储在某些数据结构中
//我只是将它们打印

System.out.println(myScan loaded for train_sample );

$ b for(int row = 0; row< train_rows; row ++){
train_label_vals [row] = Integer.parseInt(myScan.next()。toString()) ;
}

myScan.close();
$ b $ catch(FileNotFoundException e){
System.out.print(Error:Train Labels file not found。);


$ b @Override
public void map(LongWritable key,Text value,Context context)
throws IOException,InterruptedException {

// setup()给了我们train_vals& amp; amp; train_label_vals。
// map()中的每一行代表一个测试观察值。我们通过每个train_val行迭代
//以找到最近的L2匹配,然后
//返回一个< observation#,

// //从文本转换的键/值对to String
String line = value.toString();长
;
double best_distance = Double.POSITIVE_INFINITY;
int col_num;

int best_digit = -1;
IntWritable rowId = null;
int i;
IntWritable rowNum;
String []像素;

//逗号分隔的文件,以逗号分隔
//首先我们找到行数
(i = 0; i< train_rows; i ++){
distance = 0;
col_num = 0;
pixels = line.split(,);
rowId = new IntWritable(Integer.parseInt(pixels [0])); (int j = 1; j distance + =(Integer.parseInt(pixels [j]) - train_vals [i] [j-1] )^ 2;
}
if(distance< best_distance){
best_distance = distance;
best_digit = train_label_vals [i];
}
}
context.write(rowId,new IntWritable(best_digit));
}
}

我评论了Path ...语句,因为我不明白它做了什么,或者它如何将文件数据发送给映射器,但我注意到它在几个网站上列出。目前,即使将程序上传到HDFS,该程序仍未找到分布式缓存数据集。

解决方案

尝试使用符号链接:

  DistributedCache.createSymlink(conf); 
DistributedCache.addCacheFile(新的URI(cacheData / train_sample.csv#train_sample.csv),conf);
DistributedCache.addCacheFile(新的URI(cacheData / train_labels.csv#train_labels.csv),conf);

这将使映射器的本地目录中的文件以您实际尝试的名称访问它。

For some reason I can't find any good sources online for getting Distributed Cache working with the new API. Hoping someone here can explain what I'm doing wrong. My current attempt is sort of a mish-mash of various things I've found online.

This program attempts to run the k-nearest neighbors algorithm. The input file is the test dataset, while the distributed cache holds the train dataset and train labels. The mapper should take one row of test data, compare it to every row in the distributed cache data, and return the label of the row it is most similar to.

import java.net.URI;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KNNDriver extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.printf("Usage: %s [generic options] <input dir> <output dir>\n", getClass().getSimpleName());
            return -1;
        }

        Configuration conf = new Configuration();
        // conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "^");

        conf.setInt ("train_rows",1000);
        conf.setInt ("test_rows",1000);
        conf.setInt ("cols",612);
        DistributedCache.addCacheFile(new URI("cacheData/train_sample.csv"),conf);
        DistributedCache.addCacheFile(new URI("cacheData/train_labels.csv"),conf);

        Job job = new Job(conf);
        job.setJarByClass(KNNDriver.class); 
        job.setJobName("KNN");

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(KNNMapper.class);
        job.setReducerClass(KNNReducer.class);
        // job.setInputFormatClass(KeyValueTextInputFormat.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Configuration(), new KNNDriver(), args);
        System.exit(exitCode);
    }
}

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KNNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

  int[][] train_vals;
  int[] train_label_vals;
  int train_rows;
  int test_rows;
  int cols;

  @Override
  public void setup(Context context) throws IOException, InterruptedException {
      Configuration conf = context.getConfiguration();

      // Path[] cacheFiles = context.getLocalCacheFiles();

      int train_rows = conf.getInt("train_rows", 0);
      int test_rows = conf.getInt("test_rows", 0);
      int cols = conf.getInt("cols", 0);

      train_vals = new int[train_rows][cols];
      train_label_vals = new int[train_rows];

      // read train csv, parse, and store into 2d int array
      Scanner myScan;
        try {
            myScan = new Scanner(new File("train_sample.csv"));

            //Set the delimiter used in file
            myScan.useDelimiter("[,\r\n]+");

            //Get all tokens and store them in some data structure
            //I am just printing them

            System.out.println("myScan loaded for train_sample");

            for(int row = 0; row < train_rows; row++) {
                for(int col = 0; col < cols; col++) {
                    train_vals[row][col] = Integer.parseInt(myScan.next().toString());

                }
            }

            myScan.close();

        } catch (FileNotFoundException e) {
            System.out.print("Error: Train file not found.");
        }

    // read train_labels csv, parse, and store into 2d int array
        try {
            myScan = new Scanner(new File("train_labels.csv"));

            //Set the delimiter used in file
            myScan.useDelimiter("[,\r\n]+");

            //Get all tokens and store them in some data structure
            //I am just printing them

            System.out.println("myScan loaded for train_sample");


            for(int row = 0; row < train_rows; row++) {
                    train_label_vals[row] = Integer.parseInt(myScan.next().toString());
            }

            myScan.close();

        } catch (FileNotFoundException e) {
            System.out.print("Error: Train Labels file not found.");
        }
  }

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

        // setup() gave us train_vals & train_label_vals.
        // Each line in map() represents a test observation.  We iterate 
        // through every train_val row to find nearest L2 match, then
        // return a key/value pair of <observation #, 

        // convert from Text to String
        String line = value.toString();
        long distance;
        double best_distance = Double.POSITIVE_INFINITY;
        int col_num;

        int best_digit = -1;
        IntWritable rowId = null;
        int i;
        IntWritable rowNum;
        String[] pixels;

        // comma delimited files, split on commas
        // first we find the # of rows
        for (i = 0; i < train_rows; i++) {
            distance = 0;
            col_num = 0;
            pixels = line.split(",");
            rowId = new IntWritable(Integer.parseInt(pixels[0]));

            for (int j = 1; j < cols; j++) {
                distance += (Integer.parseInt(pixels[j]) - train_vals[i][j-1])^2;
            }
            if (distance < best_distance) {
                best_distance = distance;
                best_digit = train_label_vals[i];
            }
        }
        context.write(rowId, new IntWritable(best_digit));
  }
}

I commented out the Path... statement because I don't understand what it does, or how it sends the file data to the mapper, but I noticed it listed on a couple websites. Currently the program is not finding the Distributed Cache datasets even though they are uploaded to HDFS.

解决方案

Try to use symlinking:

DistributedCache.createSymlink(conf);
DistributedCache.addCacheFile(new URI("cacheData/train_sample.csv#train_sample.csv"),conf);
DistributedCache.addCacheFile(new URI("cacheData/train_labels.csv#train_labels.csv"),conf);

This will make the files available in the local directory of the mapper under the name that you are actually trying to access it.

这篇关于设置和访问分布式缓存的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆