如何迭代记录Spark Scala? [英] How to iterate records spark scala?
问题描述
我有一个变量"myrdd",它是一个具有avro文件的文件,通过hadoopfile加载了10条记录.
I have a variable "myrdd" that is an avro file with 10 records loaded through hadoopfile.
我这样做
myrdd.first_1.datum.getName()
我可以知道这个名字.问题是,我在"myrdd"中有10条记录.当我这样做时:
I can get the name. Problem is, I have 10 records in "myrdd". When I do:
myrdd.map(x => {println(x._1.datum.getName())})
它不起作用,并且一次打印出一个奇怪的对象.如何遍历所有记录?
it does not work and prints out a weird object a single time. How can I iterate over all records?
推荐答案
以下是在类似情况下使用spark-shell
的会话中的日志.
Here is a log from a session using spark-shell
with a similar scenario.
给予
scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]
您的问题看起来像
scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]
因此,map
仅返回另一个RDD(该函数不会立即应用,当您真正遍历结果时将延迟"应用该函数).
so map
just returns another RDD (the function is not applied immediately, the function is applied "lazily" when you really iterate over the result).
因此,当您实现(使用collect()
)时,您将获得一个正常"集合:
So when you materialize (using collect()
) you get a "normal" collection:
scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])
可以在其上map
.请注意,在这种情况下,传递给map
(println
)的闭包中有副作用,println
的结果为Unit
):
over which which you can map
. Note that in this case you have a side-effect in the closure passed to map
(the println
), the result of println
is Unit
):
scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())
如果在末尾应用collect
,则结果相同:
Same result if collect
is applied at the end:
scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())
但是,如果您只想打印行,则可以将其简化为使用foreach
:
But if you just want to print the rows, you can simplify it to using foreach
:
scala> persons.foreach(t => println(t))
[Justin,19]
正如@RohanAletty在评论中指出的那样,这适用于本地Spark作业.如果作业在群集中运行,则还需要collect
:
As @RohanAletty has pointed out in a comment, this works for a local Spark job. If the job runs in a cluster, collect
is required as well:
persons.collect().foreach(t => println(t))
注释
Notes
- 在
Iterator
类中可以观察到相同的行为. - 以上会话的输出已重新排序
- The same behaviour can be observed in the
Iterator
class. - The output of the session above has been reordered
更新
关于过滤:如果在collect
之后应用可以在之前应用的过滤器,则collect
的位置为错误".
As for filtering: The location of collect
is "bad", if you apply filters after collect
which can be applied before.
例如,这些表达式给出相同的结果:
For example these expressions give the same result:
scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]
scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]
,但第二种情况更糟,因为该过滤器可能早于collect
就应用了.
but the 2nd case is worse, because that filter could have been applied before collect
.
这同样适用于任何类型的聚合.
The same applies to any type of aggregation as well.
这篇关于如何迭代记录Spark Scala?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!