Spark计算建议新的友谊 [英] Spark computation for suggesting new friendships

查看:38
本文介绍了Spark计算建议新的友谊的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark娱乐并学习有关MapReduce的新知识.因此,我正在尝试编写一个建议建立新友谊的程序(即一种推荐系统).如果两个人尚未建立联系并且有很多共同的朋友,则建议建立两个人之间的友谊.

I'm using Spark for fun and to learn new things about MapReduce. So, I'm trying to write a program suggesting new friendships (i.e., a sort of recommendation system). The suggestion of a friendship between two individuals is performed if they are not connected yet and have a lot of friends in common.

友谊文本文件的结构类似于以下内容:

The friendship text file has a structure similar to the following:

1   2,4,11,12,15
2   1,3,4,5,9,10
3   2,5,11,15,20,21
4   1,2,3
5   2,3,4,15,16
...

其中的语法为: ID_SRC1< TAB> ID_DST1,ID_DST2,... .

程序应输出(打印或文本文件)以下内容:

The program should output (print or text file) something like the following:

1   3,5
3   1
5   1
...

其中的语法为: ID_SRC1< TAB> ID_SUGG1,ID_SUGG2,... .当然,如果两个人共享最少的朋友,则程序必须建议建立友谊,在我们的例子中,我们说 3 .

where the syntax is: ID_SRC1<TAB>ID_SUGG1,ID_SUGG2,.... Of course the program must suggest a friendship if the two individuals shares a minimum number of friends, let's say 3 in our case.

我已经编写了程序,但是我想让您阅读更好,更强大的解决方案.确实,我认为我的代码可以改进很多,因为从4.2 MB的输入文件中输出文件需要花费很多时间.

I've written my program, but I'd like to read better and more powerful solutions by you. Indeed, I think my code can improved a lot since it takes much time to output from an input file of 4.2 MB.

在我的代码下面:

from pyspark import SparkContext, SparkConf

def linesToDataset(line):
    (src, dst_line) = line.split('\t')
    src = int(src.strip())

    dst_list_string = dst_line.split(',')
    dst_list = [int(x.strip()) for x in dst_list_string if x != '']

    return (src, dst_list)  

def filterPairs(x):
     # don't take into account pairs of a same node and pairs of already friends
    if (x[0][0] != x[1][0]) and (not x[0][0] in x[1][1]) and (not x[1][0] in x[0][1]):
        shared = len(list(set(x[0][1]).intersection(set(x[1][1]))))
        return (x[0][0], [x[1][0], shared])

def mapFinalDataset(elem):
    recommendations = []
    src = elem[0]
    dst_commons = elem[1]
    for pair in dst_commons:
        if pair[1] > 3: # 3 is the minimum number of friends in common
            recommendations.append(pair[0])
    return (src, recommendations)

def main():
    conf = SparkConf().setAppName("Recommendation System").setMaster("local[4]")
    sc = SparkContext(conf=conf)
    rdd = sc.textFile("data.txt")

    dataset = rdd.map(linesToDataset)

    cartesian = dataset.cartesian(dataset)
    filteredDatasetRaw = cartesian.map(filterPairs)
    filteredDataset = filteredDatasetRaw.filter(lambda x: x != None)
#   print filteredDataset.take(10)

    groupedDataset = filteredDataset.groupByKey().mapValues(list)
#   print groupedDataset.take(10)

    finalDataset = groupedDataset.map(mapFinalDataset)
    output = finalDataset.take(100)
    for (k, v) in output:
        if len(v) > 0:
            print str(k) + ': ' + str(v)

    sc.stop()


if __name__ == "__main__":
    main()

推荐答案

更好的是当然的观点.

我认为我要提出的策略在性能和可读性方面更好,但这必须是主观的.主要原因是我避免使用笛卡尔积,而用JOIN代替它.

I would argue the strategy I am about to propose is better in terms of performance and readability, but this has to be subjective. The main reason is that I avoid the cartesian product, to replace it with a JOIN.

我提出的策略基于以下事实:基本数据行

The strategy I propose is based on the fact that the basic data line

1   2,4,11,12,15

可以看作是友谊建议"的列表,意思是这一行告诉我:"2应该是与4,11,12,15的朋友","4应该是与2,11,12,15的朋友",依此类推.

Can be thought of as a list of "friendship suggestions", meaning this line tells me : "2 should be friends with 4, 11, 12, 15", "4 should be friends with 2, 11, 12, 15", and so on.

因此,我的实现要点是

  1. 将每一行变成建议列表(foo应该是bar的朋友)
  2. 按人分组建议(foo应该是bar,baz,bar的朋友)并重复
  3. 计算重复项的数量(foo应该和bar(2条建议),baz(1条建议)成为朋友
  4. 删除现有关系
  5. 过滤很少出现的建议
  6. 打印结果

实施

我更喜欢Java/scala,请原谅scala语言,但它应该可以很容易地映射到Python.

Implementation

As I'm more of a Java/scala guy, pardon the scala language, but it should map fairly easily to Python.

首先,从您的文本文件中创建基本的友谊数据

First, create basic friendship data from your text file

def parseLine(line: String): (Int, Array[String]) = {
  (Integer.parseInt(line.substring(0, line.indexOf("\t"))), line.substring(line.indexOf("\t")+1).split(","))
}
def toIntegerArray(strings: Array[String]): Array[Int] = { 
  strings.filter({ x => !x.isEmpty() }).map({ x => Integer.parseInt(x) }) 
}
// The friendships that exist
val alreadyFriendsRDD = sc.textFile("src/data.txt", 4)
        // Parse file : (id of the person, [Int] of friends)
        .map { parseLine }
        .mapValues( toIntegerArray );

并将其转换为建议

// If person 1 is friends with 2 and 4, this means we should suggest 2 to become friends with 4 , and vice versa
def toSymetricalPairs(suggestions: Array[Int]): TraversableOnce[(Int, Int)] = {
  suggestions.combinations(2)
             .map { suggestion => (suggestion(0), suggestion(1)) }
             .flatMap { suggestion => Iterator(suggestion, (suggestion._2, suggestion._1)) }
}
val suggestionsRDD = alreadyFriendsRDD
  .map( x => x._2 )
  // Then we create suggestions from the friends Array
  .flatMap( toSymetricalPairs ) 

一旦您有建议书的建议书,就将它们重新组合:

Once you have a RDD of suggestions, regroup them :

def mergeSuggestion(suggestions: mutable.HashMap[Int, Int], newSuggestion: Int): mutable.HashMap[Int, Int] = {
  suggestions.get(newSuggestion) match {
    case None => suggestions.put(newSuggestion, 1)
    case Some(x) => suggestions.put(newSuggestion, x + 1)
  }
  suggestions
}
def mergeSuggestions(suggestions: mutable.HashMap[Int, Int], toMerge: mutable.HashMap[Int, Int]) = {
  val keySet = suggestions.keySet ++: toMerge.keySet
  keySet.foreach { key =>
    suggestions.get(key) match {
      case None => suggestions.put(key, toMerge.getOrElse(key, 0))
      case Some(x) => suggestions.put(key, x + toMerge.getOrElse(key, 0))
    }
  }
  suggestions
}

def filterRareSuggestions(suggestions: mutable.HashMap[Int, Int]): scala.collection.Set[Int] = {
  suggestions.filter(p => p._2 >= 3).keySet
}

// We reduce as a RDD of suggestion count by person
val suggestionsByPersonRDD = suggestionsRDD.combineByKey(
    // For each person, create a map of suggestion count
    (person: Int) => new mutable.HashMap[Int, Int](),           
    // For every suggestion, merge it into the map
    mergeSuggestion , 
    // When merging two maps, sum the suggestions
    mergeSuggestions
    )
    // We restrict to suggestions that occur more than 3 times
    .mapValues( filterRareSuggestions )

最后考虑到已经存在的友谊来过滤建议

Finally filter the suggestions by taking into account already existing friendships

val suggestionsCleanedRDD = suggestionsByPersonRDD
  // We co-locate the suggestions with the known friends
  .join(alreadyFriendsRDD)
  // We clean the suggestions by removing the known friends
  .mapValues (_ match { case (suggestions, alreadyKnownFriendsByPerson) => {
    suggestions -- alreadyKnownFriendsByPerson
  }})

哪个输出,例如:

(49831,Set(49853, 49811, 49837, 49774))
(49835,Set(22091, 20569, 29540, 36583, 31122, 3004, 10390, 4113, 1137, 15064, 28563, 20596, 36623))
(49839,Set())
(49843,Set(49844))

含义49831应该与49853、49811、49837、49774成为朋友.

Meaning 49831 should be friends with 49853, 49811, 49837, 49774.

尝试使用您的数据集,并使用2012 Corei5@2.8GHz(双核超线程)/2g RAM,我们可以在1.5分钟内完成操作.

Trying on your dataset, and on a 2012 Corei5@2.8GHz (dual core hyperthread) / 2g RAM, we finish under 1.5 minutes.

这篇关于Spark计算建议新的友谊的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆