SPARK - 使用 RDD.foreach 创建数据帧并在数据帧上执行操作 [英] SPARK - Use RDD.foreach to Create a Dataframe and execute actions on the Dataframe

查看:27
本文介绍了SPARK - 使用 RDD.foreach 创建数据帧并在数据帧上执行操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 SPARK 的新手,正在想出更好的方法来实现以下场景.有一个包含 3 个字段的数据库表 - 类别、数量、数量.首先,我尝试从数据库中提取所有不同的类别.

I am new to SPARK and figuring out a better way to achieve the following scenario. There is a database table containing 3 fields - Category, Amount, Quantity. First I try to pull all the distinct Categories from the database.

 val categories:RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)

现在对于每个类别,我想执行流水线,它基本上从每个类别创建数据帧并应用一些机器学习.

Now for each category I want to execute the Pipeline which essentially creates dataframes from each category and apply some Machine Learning.

 categories.foreach(executePipeline)
 def execute(category: String): Unit = {
   val dfCategory = sqlCtxt.read.jdbc(JDBC_URL,"SELECT * FROM TABLE_NAME WHERE CATEGORY="+category)
dfCategory.show()    
}

可以做这样的事情吗?或者有没有更好的选择?

Is it possible to do something like this ? Or is there any better alternative ?

推荐答案

您的代码将因 TaskNotSerializable 异常而失败,因为您正在尝试使用 SQLContext(其中不可序列化)在 execute 方法中,该方法应该被序列化并发送给工作人员以在 categories RDD 中的每条记录上执行.

Your code would fail on a TaskNotSerializable exception since you're trying to use the SQLContext (which isn't serializable) inside the execute method, which should be serialized and sent to workers to be executed on each record in the categories RDD.

假设您知道类别的数量有限,这意味着类别列表不会太大而无法容纳您的驱动程序内存,您应该将类别收集到驱动程序,并使用 foreach 迭代该本地集合:

Assuming you know the number of categories is limited, which means the list of categories isn't too large to fit in your driver memory, you should collect the categories to driver, and iterate over that local collection using foreach:

val categoriesRdd: RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)
val categories: Seq[String] = categoriesRdd.collect()
categories.foreach(executePipeline)

另一个改进是重用您加载的数据框而不是执行另一个查询,为每个类别使用过滤器:

Another improvement would be reusing the dataframe that you loaded instead of performing another query, using a filter for each category:

def executePipeline(singleCategoryDf: DataFrame) { /* ... */ }

categories.foreach(cat => {
  val filtered = df.filter(col(CATEGORY) === cat)
  executePipeline(filtered)
})

注意:为了确保 df 的重用不会在每次执行时重新加载它,请确保在收集类别之前 cache() 它.

NOTE: to make sure the re-use of df doesn't reload it for every execution, make sure you cache() it before collecting the categories.

这篇关于SPARK - 使用 RDD.foreach 创建数据帧并在数据帧上执行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆