SPARK - 使用 RDD.foreach 创建数据帧并在数据帧上执行操作 [英] SPARK - Use RDD.foreach to Create a Dataframe and execute actions on the Dataframe
问题描述
我是 SPARK 的新手,正在想出更好的方法来实现以下场景.有一个包含 3 个字段的数据库表 - 类别、数量、数量.首先,我尝试从数据库中提取所有不同的类别.
I am new to SPARK and figuring out a better way to achieve the following scenario. There is a database table containing 3 fields - Category, Amount, Quantity. First I try to pull all the distinct Categories from the database.
val categories:RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)
现在对于每个类别,我想执行流水线,它基本上从每个类别创建数据帧并应用一些机器学习.
Now for each category I want to execute the Pipeline which essentially creates dataframes from each category and apply some Machine Learning.
categories.foreach(executePipeline)
def execute(category: String): Unit = {
val dfCategory = sqlCtxt.read.jdbc(JDBC_URL,"SELECT * FROM TABLE_NAME WHERE CATEGORY="+category)
dfCategory.show()
}
可以做这样的事情吗?或者有没有更好的选择?
Is it possible to do something like this ? Or is there any better alternative ?
推荐答案
您的代码将因 TaskNotSerializable
异常而失败,因为您正在尝试使用 SQLContext
(其中不可序列化)在 execute
方法中,该方法应该被序列化并发送给工作人员以在 categories
RDD 中的每条记录上执行.
Your code would fail on a TaskNotSerializable
exception since you're trying to use the SQLContext
(which isn't serializable) inside the execute
method, which should be serialized and sent to workers to be executed on each record in the categories
RDD.
假设您知道类别的数量有限,这意味着类别列表不会太大而无法容纳您的驱动程序内存,您应该将类别收集到驱动程序,并使用 foreach
迭代该本地集合:
Assuming you know the number of categories is limited, which means the list of categories isn't too large to fit in your driver memory, you should collect the categories to driver, and iterate over that local collection using foreach
:
val categoriesRdd: RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)
val categories: Seq[String] = categoriesRdd.collect()
categories.foreach(executePipeline)
另一个改进是重用您加载的数据框而不是执行另一个查询,为每个类别使用过滤器:
Another improvement would be reusing the dataframe that you loaded instead of performing another query, using a filter for each category:
def executePipeline(singleCategoryDf: DataFrame) { /* ... */ }
categories.foreach(cat => {
val filtered = df.filter(col(CATEGORY) === cat)
executePipeline(filtered)
})
注意:为了确保 df
的重用不会在每次执行时重新加载它,请确保在收集类别之前 cache()
它.
NOTE: to make sure the re-use of df
doesn't reload it for every execution, make sure you cache()
it before collecting the categories.
这篇关于SPARK - 使用 RDD.foreach 创建数据帧并在数据帧上执行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!