Reducer使用Apache Hadoop Map Reduce在三个变量的K-Means聚类中没有给出正确的输出 [英] Reducer not giving correct output in K-Means Clustering on three variables using Apache Hadoop Map Reduce

查看:88
本文介绍了Reducer使用Apache Hadoop Map Reduce在三个变量的K-Means聚类中没有给出正确的输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

reduce方法中值迭代器的输出显示所有点为零。在reduce方法中是否有任何缺陷?



The output from the values iterator in the reduce method shows all point as zeros.Is there any flaw in the reduce method?

import java.io.IOException;
import java.util.*;
import java.io.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

@SuppressWarnings("deprecation")
public class KMEANS {
	public static String OUT = "OUT";
	public static String IN = "IN";
	public static String CENTROID_FILE_NAME = "/centroid.txt";
	public static String OUTPUT_FILE_NAME = "/part-00000";
	public static String DATA_FILE_NAME = "/data.txt";
	public static String JOB_NAME = "KMeans";
	public static String SPLITTER = "\t| ";
	public static List<Point> mCenters = new ArrayList<Point>();

	public static class Map extends MapReduceBase implements
			Mapper<LongWritable, Text, DoubleWritable, Point> {
		@Override
		public void configure(JobConf job) {
			// System.out.println("Second");
			try {
				// Fetch the file from Distributed Cache Read it and store the
				// centroid in the ArrayList
				Path[] cacheFiles = DistributedCache.getLocalCacheFiles(job);
				if (cacheFiles != null && cacheFiles.length > 0) {
					String line;
					mCenters.clear();
					BufferedReader cacheReader = new BufferedReader(
							new FileReader(cacheFiles[0].toString()));
					try {
						while ((line = cacheReader.readLine()) != null) {
							String[] temp = line.split(SPLITTER);
							String[] temp2=temp[0].split(",");
							mCenters.add(new Point(
									Double.parseDouble(temp2[0]), Double
											.parseDouble(temp2[1]), Double
											.parseDouble(temp2[2])));
							// System.out.println(mCenters.get(0).toString());
						}
					} finally {
						cacheReader.close();
					}
				}
			} catch (IOException e) {
				System.err.println("Exception reading DistribtuedCache: " + e);
			}
		}

		public void map(LongWritable key, Text value,
				OutputCollector<DoubleWritable, Point> output,
				Reporter reporter) throws IOException {
			String line = value.toString();
			String temp[] = line.split(",");
			Point point = new Point(Double.parseDouble(temp[0]),
					Double.parseDouble(temp[1]), Double.parseDouble(temp[2]));
			// System.out.println(point.toString());
			double min1, min2 = Double.MAX_VALUE;
			Point nearest_center = mCenters
					.get(0);
			// Find the minimum center from a point
			for (Point c : mCenters) {
				min1 = c.z - point.z;
				if (Math.abs(min1) < Math.abs(min2)) {
					nearest_center = c;
					min2 = min1;
				}
			}
			// Emit the nearest center and the point
			output.collect(new DoubleWritable(nearest_center.z), new Point(
					point));
		}
	}

	public static class Reduce extends MapReduceBase implements
			Reducer<DoubleWritable, Point, Point, Text> {

		public void reduce(DoubleWritable key, Iterator<Point> values,
				OutputCollector<Point, Text> output, Reporter reporter)
				throws IOException {
			Point newCenter = new Point(0, 0, 0);
			Point sum = new Point(0, 0, 0);
			int no_elements = 0;
			String points = "";
			while (values.hasNext()) {
				Point d = values.next().get();
				points = points + " " + d.toString();
				sum.z = sum.z + d.z;
				sum.x = sum.x + d.x;
				sum.y = sum.y + d.y;
				++no_elements;
			}
			// Find new center
			newCenter.z = sum.z / no_elements;
			newCenter.x = sum.x / no_elements;
			newCenter.y = sum.y / no_elements;

			// Emit new center and point
			output.collect(new Point(newCenter), new Text(points));
		}
	}

	public static void main(String[] args) throws Exception {
		run();
	}

	public static void run() throws Exception {
		IN = "IN";
		OUT = "OUT";
		String input = IN;
		String output = OUT;
		String again_input = output;
		boolean isdone = false;
		int iteration = 0;
		// Reiterating till the convergence
		while (isdone == false) {
			JobConf conf = new JobConf(KMEANS.class);
			if (iteration == 0) {
				Path hdfsPath = new Path(input + CENTROID_FILE_NAME);
				// upload the file to hdfs. Overwrite any existing copy.
				DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
			} else {
				Path hdfsPath = new Path(again_input + OUTPUT_FILE_NAME);
				// upload the file to hdfs. Overwrite any existing copy.
				DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
			}
			conf.setJobName(JOB_NAME);
			conf.setMapOutputKeyClass(DoubleWritable.class);
			conf.setMapOutputValueClass(Point.class);
			conf.setOutputKeyClass(Point.class);
			conf.setOutputValueClass(Text.class);
			conf.setMapperClass(Map.class);
			conf.setReducerClass(Reduce.class);
			conf.setInputFormat(TextInputFormat.class);
			conf.setOutputFormat(TextOutputFormat.class);

			FileInputFormat.setInputPaths(conf,
					new Path(input + DATA_FILE_NAME));
			FileOutputFormat.setOutputPath(conf, new Path(output));

			JobClient.runJob(conf);
			// System.out.println("First");
			Path ofile = new Path(output + OUTPUT_FILE_NAME);
			FileSystem fs = FileSystem.get(new Configuration());
			BufferedReader br = new BufferedReader(new InputStreamReader(
					fs.open(ofile)));
			List<Point> centers_next = new ArrayList<Point>();
			String line = br.readLine();
			while (line != null) {
				String[] sp = line.split("\t| ");
				String[] temp = sp[0].split(",");
				Point c = new Point(Double.parseDouble(temp[0]),
						Double.parseDouble(temp[1]),
						Double.parseDouble(temp[2]));
					centers_next.add(c);

				line = br.readLine();
			}
			br.close();

			String prev;
			if (iteration == 0)
				prev = input + CENTROID_FILE_NAME;
			else
				prev = again_input + OUTPUT_FILE_NAME;
			Path prevfile = new Path(prev);
			FileSystem fs1 = FileSystem.get(new Configuration());
			BufferedReader br1 = new BufferedReader(new InputStreamReader(
					fs1.open(prevfile)));
			List<Point> centers_prev = new ArrayList<Point>();
			String l = br1.readLine();
			while (l != null) {
				String[] sp1 = l.split(SPLITTER);
				String temp[] = sp1[0].split(",");
				Point d = new Point(Double.parseDouble(temp[0]),
						Double.parseDouble(temp[1]),
						Double.parseDouble(temp[2]));
				centers_prev.add(d);
				l = br1.readLine();
			}
			br1.close();

			// Sort the old centroid and new centroid and check for convergence
			// condition
			List<Double> prev_z = new ArrayList<Double>();
			List<Double> next_z = new ArrayList<Double>();
			for (int i = 0; i < centers_next.size(); i++) {
				prev_z.add(centers_prev.get(i).z);
				next_z.add(centers_next.get(i).z);
			}

			Collections.sort(prev_z);
			Collections.sort(next_z);
			Iterator<Point> it = centers_prev.iterator();
			for (Point d : centers_next) {
				double temp = it.next().z;
				if (Math.abs(temp - d.z) <= 0.1) {
					isdone = true;
				} else {
					isdone = false;
					break;
				}
			}
			++iteration;
			again_input = output;
			output = OUT + System.nanoTime();
			// System.out.println("Third");
			// isdone = true;
		}
	}
}





//这是代表a的点类的代码3D点。





//This is the code for point class that represents a 3D point.

public class Point {
    public double x;
    public double y;
    public double z;

    public Point(double x, double y, double z) {
        this.x = x;
        this.y = y;
        this.z = z;
    }

    public Point(Point a) {
        this.x = a.x;
        this.y = a.y;
        this.z = a.z;
    }

    Point get() {
        return this;
    }
    public String toString() {
        return this.x + "," + this.y + "," + this.z;
    }

}

推荐答案

这篇关于Reducer使用Apache Hadoop Map Reduce在三个变量的K-Means聚类中没有给出正确的输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆