SPARK-使用RDD.foreach创建数据框并对该数据框执行操作 [英] SPARK - Use RDD.foreach to Create a Dataframe and execute actions on the Dataframe

查看:73
本文介绍了SPARK-使用RDD.foreach创建数据框并对该数据框执行操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是SPARK的新手,并想出一种更好的方法来实现以下方案. 有一个包含3个字段的数据库表-类别,金额,数量. 首先,我尝试从数据库中提取所有不同的类别.

I am new to SPARK and figuring out a better way to achieve the following scenario. There is a database table containing 3 fields - Category, Amount, Quantity. First I try to pull all the distinct Categories from the database.

 val categories:RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)

现在,对于每个类别,我要执行管道,该管道实质上从每个类别创建数据帧并应用一些机器学习.

Now for each category I want to execute the Pipeline which essentially creates dataframes from each category and apply some Machine Learning.

 categories.foreach(executePipeline)
 def execute(category: String): Unit = {
   val dfCategory = sqlCtxt.read.jdbc(JDBC_URL,"SELECT * FROM TABLE_NAME WHERE CATEGORY="+category)
dfCategory.show()    
}

是否可以做这样的事情?还是有更好的选择?

Is it possible to do something like this ? Or is there any better alternative ?

推荐答案

由于您尝试在execute中使用SQLContext(不可序列化),因此您的代码将在TaskNotSerializable异常上失败.方法,应进行序列化并将其发送给工作人员,以在categories RDD中的每个记录上执行.

Your code would fail on a TaskNotSerializable exception since you're trying to use the SQLContext (which isn't serializable) inside the execute method, which should be serialized and sent to workers to be executed on each record in the categories RDD.

假设,您知道类别的数量是受限制的,这意味着类别列表不会太大而无法容纳在驱动程序的内存中,您应该收集驱动程序的类别,并使用foreach遍历该本地集合:

Assuming you know the number of categories is limited, which means the list of categories isn't too large to fit in your driver memory, you should collect the categories to driver, and iterate over that local collection using foreach:

val categoriesRdd: RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)
val categories: Seq[String] = categoriesRdd.collect()
categories.foreach(executePipeline)

另一项改进是对每个类别使用过滤器,从而重用您加载的数据框,而不是执行其他查询:

Another improvement would be reusing the dataframe that you loaded instead of performing another query, using a filter for each category:

def executePipeline(singleCategoryDf: DataFrame) { /* ... */ }

categories.foreach(cat => {
  val filtered = df.filter(col(CATEGORY) === cat)
  executePipeline(filtered)
})

注意:为确保重用df不会在每次执行时都重新加载它,请确保在收集类别之前先cache()它.

NOTE: to make sure the re-use of df doesn't reload it for every execution, make sure you cache() it before collecting the categories.

这篇关于SPARK-使用RDD.foreach创建数据框并对该数据框执行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆