火花收藏获取由值排序 [英] Spark get collection sorted by value

查看:200
本文介绍了火花收藏获取由值排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试这个教程 http://spark.apache.org/docs /latest/quick-start.html
我首先从一个文件中创建一个集合

I was trying this tutorial http://spark.apache.org/docs/latest/quick-start.html I first created a collection from a file

textFile = sc.textFile("README.md")

然后我尝试了命令cound的话:

Then I tried a command to cound the words:

wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

要打印集合:

 wordCounts.collect()

我发现了如何通过使用命令sortByKey字排序。我想知道它怎么可能做同样的事情在价值排序,在这种情况下,在数一个字出现在文档中。

I found how to sort it by word using the command sortByKey. I was wondering how it could be possible to do the same thing for sorting by the value, that in this case in the number that a word occur in the document.

推荐答案

)的排序通常前应收集(完成被称为,因为它返回的数据集驱动程序,也就是那样一个Hadoop的地图,减少工作这一点在Java编程,让你想在最终输出写入(典型值)到HDFS。与火花API此方法提供了写在要在原始的形式输出,如要在那里它可以被用作进一步处理输入的文件的灵活性。

The sorting usually should be done before collect() is called since that returns the dataset to the driver program and also that is the way an hadoop map-reduce job would be programmed in java so that the final output you want is written (typically) to HDFS. With the spark API this approach provides the flexibility of writing the output in "raw" form where you want, such as to a file where it could be used as input for further processing.

使用火花的斯卡拉API排序前收集()可以做以下eliasah的建议,并使用Tuple2.swap()两次,第一次分拣之前,为了产生增加或减少他们的第二个的排序元组的列表后一次字段(被命名为_2),包含在其第一场(名为_1)字的数量的计数。下面是如何在这种火花外壳脚本示例:

Using spark's scala API sorting before collect() can be done following eliasah's suggestion and using Tuple2.swap() twice, once before sorting and once after in order to produce a list of tuples sorted in increasing or decreasing order of their second field (which is named _2) and contains the count of number of words in their first field (named _1). Below is an example of how this is scripted in spark-shell:

// this whole block can be pasted in spark-shell in :paste mode followed by <Ctrl>D
val file = sc.textFile("some_local_text_file_pathname")
val wordCounts = file.flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _, 1)  // 2nd arg configures one task (same as number of partitions)
  .map(item => item.swap) // interchanges position of entries in each tuple
  .sortByKey(true, 1) // 1st arg configures ascending sort, 2nd arg configures one task
  .map(item => item.swap)

为了扭转排序使用sortByKey的顺序(假,1),因为它的第一个参数是上升的布尔值。其第二个参数是被设置为1用于测试与仅一个输出数据文件所需的小的输入文件的任务(equivilent给分区的数)的数量; reduceByKey还借此可选参数。

In order to reverse the ordering of the sort use sortByKey(false,1) since its first arg is the boolean value of ascending. Its second argument is the number of tasks (equivilent to number of partitions) which is set to 1 for testing with a small input file where only one output data file is desired; reduceByKey also takes this optional argument.

在这个wordCounts RDD可以保存为文本文件的目录与saveAsTextFile(directory_pathname),其中将存入一个或多个部分-XXXXX文件
取决于配置为(每减速器1的输出数据文件)作业减速器的数目(从部分00000),这取决于如果作业成功与否和.crc文件_SUCCESS文件。

After this the wordCounts RDD can be saved as text files to a directory with saveAsTextFile(directory_pathname) in which will be deposited one or more part-xxxxx files (starting with part-00000) depending on the number of reducers configured for the job (1 output data file per reducer), a _SUCCESS file depending on if the job succeeded or not and .crc files.

使用pyspark一个python脚本非常相似,上面显示的斯卡拉脚本生成输出,实际上是相同的。这里是pyspark版本通过示范价值排序的集合:

Using pyspark a python script very similar to the scala script shown above produces output that is effectively the same. Here is the pyspark version demonstrating sorting a collection by value:

file = sc.textFile("file:some_local_text_file_pathname")
wordCounts = file.flatMap(lambda line: line.strip().split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b, 1) \ # last arg configures one reducer task
    .map(lambda (a, b): (b, a)) \
    .sortByKey(1, 1) \ # 1st arg configures ascending sort, 2nd configures 1 task
    .map(lambda (a, b): (b, a))

为了sortbyKey降序排列的第一个参数应该是0。由于蟒蛇捕获开头和结尾的空白数据,带()分割空间上的每一行之前插入,但这种利用火花壳/ Scala是没有必要的。

In order to sortbyKey in descending order its first arg should be 0. Since python captures leading and trailing whitespace as data, strip() is inserted before splitting each line on spaces, but this is not necessary using spark-shell/scala.

中的wordCount的火花和python版本输出的主要区别在于,在火花输出(字,3)蟒输出(u'word',3)。

The main difference in the output of the spark and python version of wordCount is that where spark outputs (word,3) python outputs (u'word', 3).

有关火花更多信息,RDD方法请参见<一个href=\"http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html\">http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html Python和<一个href=\"https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD\">https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD斯卡拉。

For more information on spark RDD methods see http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html for python and https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD for scala.

在火花壳上wordCounts收集(运行)从RDD其转换为Array [(字符串,整数)] =数组[Tuple2(字符串,整数)],它本身就可以对第二个字段进行排序使用每个Tuple2元素:

In the spark-shell, running collect() on wordCounts transforms it from an RDD to an Array[(String, Int)] = Array[Tuple2(String,Int)] which itself can be sorted on the second field of each Tuple2 element using:

Array.sortBy(_._2) 

<一个href=\"http://www.scala-lang.org/api/2.11.8/index.html#scala.Array@sortBy[B](f:A=%3EB)(implicitord:scala.math.Ordering[B]):Repr\">sortBy也有一个可选的隐含参数math.Ordering如罗密欧Ki​​enzler显示在previous这个问题的答案。 Array.sortBy(_._ 2)将只通过运行的map-reduce脚本,因为它覆盖Int的pre-现有订货之前定义的隐含反向顺序做他们_2领域阵列Tuple2元素的逆向排序。已经罗密欧Ki​​enzler定义的反向INT排序是:

sortBy also takes an optional implicit math.Ordering argument such as Romeo Kienzler showed in a previous answer to this question. Array.sortBy(_._2) will do a reverse sort of the Array Tuple2 elements on their _2 fields just by defining an implicit reverse ordering before running the map-reduce script because it overrides the pre-existing ordering of Int. The reverse int Ordering already defined by Romeo Kienzler is:

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = a.compare(b)*(-1)
}

定义该反向订货另一种常见的方法是将扭转的a和b的顺序和下降的(-1)的比较定义的右手侧

Another common way to define this reverse Ordering is to reverse the order of a and b and drop the (-1) on the right hand side of the compare definition:

// for reverse order
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = b.compare(a)
}   

这篇关于火花收藏获取由值排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆