Hadoop Map Reduce适用于Google网络图 [英] Hadoop Map Reduce For Google web graph

查看:136
本文介绍了Hadoop Map Reduce适用于Google网络图的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们已经给出了创建地图缩减功能的任务,这些功能将在谷歌网络图表列表中为每个节点n输出可以从节点n跳到3跳的节点。 (实际数据可以在这里找到::

  public静态类NthHopVertex扩展了顶点< Text,NullWritable,HopMessage> {

@覆盖
公共无效计算(可迭代< HopMessage>消息)抛出IOException异常{
如果(getSuperstepCount()== 0L){
HopMessage味精=新HopMessage();
msg.origin = getVertexID()。toString();
msg.hopCounter = 0;
sendMessageToNeighbors(msg);
} else {
for(HopMessage message:messages){
message.hopCounter + = 1;
if(message.hopCounter == 3){
getPeer()。write(new Text(message.origin),getVertexID());
voteToHalt();
} else {
sendMessageToNeighbors(message);
}

}
}
}
}

BTW在您的示例中创建的新图形如下所示:

  1 = [ 1,5,6] 
2 = [2,3,6]
3 = [2,3,6]
4 = [4,5]

以下是完整的哈马图实现:

https://github.com/thomasjungblut/tjungblut- graph / blob / master / src / de / jungblut / graph / bsp / NthHop.java

we have been given as an assignment the task of creating map reduce functions that will output for each node n in the google web graph list the nodes that you can go from node n in 3 hops. (The actual data can be found here: http://snap.stanford.edu/data/web-Google.html) Here's an example of how the items in the list will be :

1 2 
1 3 
2 4 
3 4 
3 5 
4 1 
4 5 
4 6 
5 6 

From the above an example graph will be this

In the above simplified example the paths for example of node 1 are α [1 -> 2 -> 4 -> 1], [1 -> 2 -> 4 -> 5], [1 -> 2 -> 4 -> 6], [1 -> 3 -> 4 -> 1], [1 -> 3 -> 4 -> 5], [1 -> 3 -> 4 -> 6] και [1 -> 3 -> 5 -> 6] and thus the map reduce will output for node 1 the vertices 1,5,6 ( (a) each vertex must be counted only once, and (b) we include the current vertex only when there is a circular path of length 3, as is the example [1 -> 2 -> 4 -> 1] and [1 -> 3 -> 4 -> 1].

I am very lost because i believe this requires knowledge of graph theory and algorithms and we haven't been taught anything related to that.

I will greatly appreciate if somebody can give me a right direction to start . ( I've looked into shortest path theory and such but i'm not sure if it's going to be useful for this particular exercise)

Thanks in advance, and have a good Holidays season.

EDIT

I try to create the adjucency list , but while i expect the output to be "vertexID" "node1 node2 node3 node4..." i see that in the output file my reducer splits the list for each vertex Id into pairs of three.

for example if i have vertex A that connects to Z,X,C,V,B,N,M,,G,H,J,K,L I it outputs this as

A Z,X,C

A V,B,N

A M,G,H

A J,K,L

below are my mapper and reducer

public class AdjacentsListDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {



        Configuration conf = getConf();
        Job job = Job.getInstance(conf);
        job.setJobName("Test driver");
        job.setJarByClass(AdjacentsListDriver.class);

        String[] arg0 = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (arg0.length != 2) {
            System.err.println("Usage: hadoop jar <my_jar> <input_dir> <output_dir>");
            System.exit(1);
        }

        Path in = new Path(arg0[0]);
        Path out = new Path(arg0[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setMapperClass(ListMapper.class);
        job.setReducerClass(ListReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);    
        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AdjacentsListDriver(), args);
        System.exit(res);

    }



}





/**
 * @author George
 * Theoretically this takes a key(vertexID) and maps all nodes that are connected to it in one hop....
 *
 */
public class ListMapper extends Mapper<LongWritable, Text, Text, Text> {

    private Text vertexID = new Text();
    //private LongWritable vertice= new LongWritable(0);
    private Text vertice=new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        StringTokenizer itr = new StringTokenizer(line,"\n");
        StringTokenizer itrInside;

        //vertice=new LongWritable(Long.valueOf(value.toString()).longValue());


        while (itr.hasMoreTokens()) {
            if(itr.countTokens() > 2){

            }//ignore first line ??
            else{
                itrInside=new StringTokenizer(itr.toString());
                vertexID.set(itr.nextToken());

                while(itrInside.hasMoreTokens()){               
                    vertice.set(itrInside.nextToken());
                    context.write(vertexID, value);
                }           
            }
        }

    }

}

@override
public class ListReducer extends Reducer<Text, Text, Text, Text>{
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        String vertices="";

        for (Text value : values) {
            if(vertices=="")
                vertices+=value.toString();
            else
                vertices=vertices+","+value.toString();         
        }

        Text value=new Text();
        value.set(vertices);
        context.write(key, value);

    }

}

解决方案

Since it is your (homework) assignment, I will not include a Java/Hadoop solution, but I will attempt to make the concept of graph computation with MapReduce a bit clearer for you so you can implement it yourself.

You want, for each vertex, the vertices that are exactly n-hops away. You were on the right path while looking at the shortest path algorithm, but it can be solved easier with a simple breadth-first search.

When crunching graphs with MapReduce however, you will need to dive a bit deeper into message passing between vertices. Graph algorithms are usually expressed with multiple jobs where the map and reduce stage have the following assignment:

  • Mapper: Emit a message to another vertex (usually for every neighbour of a vertex)
  • Reducer: Group incoming messages, joining the core graph and reduce them, sometimes sending more messages.

Each job will always operate on the output of the previous job until you have reached either your result, or give up.

Data Preparation

Before you actually want to run your graph algorithm, make sure that your data is in the form of an adjacency list. It will make the following iteration algorithms much easier to implement.

So instead of your adjacency tuples, you will need to group them by the vertex id. Here is some pseudo code:

map input tuple (X, Y): 
   emit (X, Y)

reduce input (X, Y[]) :
  emit (X, Y[])

Basically you are just grouping by the vertex id, so your input data is a key (vertex id) to its neighbours (list of vertex ids that can be reached from that particular key vertex id). If you want to save resources, you can use the reducer as a combiner.

Algorithm

Like I already mentioned you will only need a breadth first search algorithm. You will execute the breadth first search algorithm for every vertex in your graph, when hitting a neighbour you will just increment a hop-counter that tells how far we are away from our start vertex (which is the simplest case of a shortest path algorithm, namely when the edge weight is 1).

Let me show you a simple picture describing it with a simplistic graph. Orange means visited, blue unvisited and green is our result. In the parenthesis is the hop counter.

You see, at every iteration we are setting a hopcounter for every vertex. If we hit a new vertex, we will just increment it by one. If we hit the n-th vertex, we will somehow mark it for later lookup.

Distributing with MapReduce

While it seems really wasteful to run a breadth first search for every vertex, we can do better by parallelizing it. Here comes the message passing into play. Just like in that picture above, every vertex we get in our mapping step will initially send a message to its neighbours containing following payload:

HopMessage: Origin (VertexID) | HopCounter(Integer)

At the first iteration we will attempt to send the message to the neighbours to kick off the computation. Otherwise we will just proxy the graph or incoming messages.

So in your very first job after the data preparation your map and reduce looks like that:

map input (VertexID key, either HopMessage or List<VertexID> adjacents):
  if(iterationNumber == 1): // only in the first iteration to kick off
     for neighbour in adjacents:
        emit (neighbour, new HopMessage(key, 0))
  emit (key, adjacents or HopMessage) // for joining in the reducer

The reducer now does a simple join between the graph and the messages, majorly to get the neighbours of a vertex, leading to that input (taking my simplistic graph):

1 2 // graph 
2 1 // hop message
2 3 // graph
3 1 // hop message
3 4 // graph
4 1 // hop message
4 - // graph

In the reducer step we will just forward the messages to the neighbours again and check if the hop counter has reached 3 already after incrementing.

reducer input(VertexID key, List<either HopMessage or List<VertexID> neighbours> values):
 for hopMessage in values:
    hopMessage.counter += 1
    if (hopMessage.counter == 3) 
       emit to some external resource (HopMessage.origin, key)
    else 
       for neighbour in neighbours of key:
          emit (neighbour, hopMessage)
    emit (key, neighbours)

As you can see, it can get really messy here: You need to manage two different kinds of messages and then also write to some external resource that will keep track of the vertices that are exactly 3 hops away.

You schedule jobs that iterate as long as there are HopMessages to be sent. This is prone to problems with cycles in the graph, because you will infinitely increment the hopcounter in this case. So I suggest to either send the complete traversed path so far with every message (quite wasteful) or simply cap the number of iterations to take. In the n=3 case, more than 3 job iterations are not needed.

There are plenty of blogs and projects that will give you examples on how to deal with each of those problems in Hadoop. At least I have written about graph crunching in MapReduce in my blog and you can find some examples on my github.

Cleaning output data

In the end you will have a bunch of files that contain a vertex -> vertex mapping. You can reduce them the same way you have done in the preparation.

Niftier way using Pregel

A less cumbersome way of dealing with graphs is to use the Pregel-way of expressing a graph computation. Pregel is using a vertex-centric model and makes it more easier to express such breadth-first computations.

Here is a simple example of the above algorithm using Apache Hama:

  public static class NthHopVertex extends Vertex<Text, NullWritable, HopMessage> {

    @Override
    public void compute(Iterable<HopMessage> messages) throws IOException {
      if (getSuperstepCount() == 0L) {
        HopMessage msg = new HopMessage();
        msg.origin = getVertexID().toString();
        msg.hopCounter = 0;
        sendMessageToNeighbors(msg);
      } else {
        for (HopMessage message : messages) {
          message.hopCounter += 1;
          if (message.hopCounter == 3) {
            getPeer().write(new Text(message.origin), getVertexID());
            voteToHalt();
          } else {
            sendMessageToNeighbors(message);
          }

        }
      }
    }
  }

BTW The new graph that was created in your example looks like the following:

1=[1, 5, 6]
2=[2, 3, 6]
3=[2, 3, 6]
4=[4, 5]

Here is the full Hama Graph implementation:

https://github.com/thomasjungblut/tjungblut-graph/blob/master/src/de/jungblut/graph/bsp/NthHop.java

这篇关于Hadoop Map Reduce适用于Google网络图的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆