我们是否应该像在训练前并行化 Seq 一样并行化 DataFrame [英] Should we parallelize a DataFrame like we parallelize a Seq before training

查看:19
本文介绍了我们是否应该像在训练前并行化 Seq 一样并行化 DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑这里给出的代码,

https://spark.apache.org/docs/1.2.0/ml-guide.html

import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))

val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)

val model1 = lr.fit(training)

假设我们使用 sqlContext.read() 将训练"读作数据帧,应该我们仍然做类似

Assuming we read "training" as a dataframe using sqlContext.read(), should we still do something like

val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this

或者 fit 函数会在传递数据帧时自动处理计算/数据的并行化

or the fit function will automatically take care of parallelizing the computation/ data when passed a dataFrame

问候,

推荐答案

DataFrame 是一种分布式数据结构.并行化它既不是必需的,也不是不可能的.SparkConext.parallelize 方法仅用于驻留在驱动程序内存中的分布式本地数据结构.您不应该习惯于分发大型数据集,更不用说重新分发 RDDs 或更高级的数据结构(就像您在上一个问题中所做的那样)

DataFrame is a distributed data structure. It is neither required nor possible to parallelize it. SparkConext.parallelize method is used only to distributed local data structures which reside in the driver memory. You shouldn't be used to distributed large datasets not to mention redistributing RDDs or higher level data structures (like you do in your previous question)

sc.parallelize(trainingData.collect()) 

如果您想在 RDD/Dataframe (Dataset) 之间进行转换,请使用旨在执行此操作的方法:

If you want to convert between RDD / Dataframe (Dataset) use methods which are designed to do it:

  1. DataFrameRDD:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

val df: DataFrame  = Seq(("foo", 1), ("bar", 2)).toDF("k", "v")
val rdd: RDD[Row] = df.rdd

  • 形成RDDDataFrame:

    val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2)))
    val df1: DataFrame = rdd.toDF
    // or
    val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext
    

  • 这篇关于我们是否应该像在训练前并行化 Seq 一样并行化 DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆