如何迭代记录Spark Scala? [英] How to iterate records spark scala?

查看:144
本文介绍了如何迭代记录Spark Scala?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个变量"myrdd",它是一个具有avro文件的文件,通过hadoopfile加载了10条记录.

I have a variable "myrdd" that is an avro file with 10 records loaded through hadoopfile.

我这样做

myrdd.first_1.datum.getName()

我可以知道这个名字.问题是,我在"myrdd"中有10条记录.当我这样做时:

I can get the name. Problem is, I have 10 records in "myrdd". When I do:

myrdd.map(x => {println(x._1.datum.getName())})

它不起作用,并且一次打印出一个奇怪的对象.如何遍历所有记录?

it does not work and prints out a weird object a single time. How can I iterate over all records?

推荐答案

以下是在类似情况下使用spark-shell的会话中的日志.

Here is a log from a session using spark-shell with a similar scenario.

给予

scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]

您的问题看起来像

scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]

因此,map仅返回另一个RDD(该函数不会立即应用,当您真正遍历结果时将延迟"应用该函数).

so map just returns another RDD (the function is not applied immediately, the function is applied "lazily" when you really iterate over the result).

因此,当您实现(使用collect())时,您将获得一个正常"集合:

So when you materialize (using collect()) you get a "normal" collection:

scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])

可以在其上map.请注意,在这种情况下,传递给map(println)的闭包中有副作用,println的结果为Unit):

over which which you can map. Note that in this case you have a side-effect in the closure passed to map (the println), the result of println is Unit):

scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())

如果在末尾应用collect,则结果相同:

Same result if collect is applied at the end:

scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())

但是,如果您只想打印行,则可以将其简化为使用foreach:

But if you just want to print the rows, you can simplify it to using foreach:

scala> persons.foreach(t => println(t))
[Justin,19]

正如@RohanAletty在评论中指出的那样,这适用于本地Spark作业.如果作业在群集中运行,则还需要collect:

As @RohanAletty has pointed out in a comment, this works for a local Spark job. If the job runs in a cluster, collect is required as well:

persons.collect().foreach(t => println(t))


注释


Notes

  • Iterator类中可以观察到相同的行为.
  • 以上会话的输出已重新排序
  • The same behaviour can be observed in the Iterator class.
  • The output of the session above has been reordered

更新

关于过滤:如果在collect之后应用可以在之前应用的过滤器,则collect的位置为错误".

As for filtering: The location of collect is "bad", if you apply filters after collect which can be applied before.

例如,这些表达式给出相同的结果:

For example these expressions give the same result:

scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]

scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]

,但第二种情况更糟,因为该过滤器可能早于collect就应用了.

but the 2nd case is worse, because that filter could have been applied before collect.

这同样适用于任何类型的聚合.

The same applies to any type of aggregation as well.

这篇关于如何迭代记录Spark Scala?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆