我们应该像训练之前并行化Seq那样并行化DataFrame吗 [英] Should we parallelize a DataFrame like we parallelize a Seq before training

查看:32
本文介绍了我们应该像训练之前并行化Seq那样并行化DataFrame吗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑此处给出的代码,

https://spark.apache.org/docs/1.2.0/ml-guide.html

import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))

val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)

val model1 = lr.fit(training)

假设我们使用sqlContext.read()将培训"作为数据帧读取,应该我们仍然会做类似

Assuming we read "training" as a dataframe using sqlContext.read(), should we still do something like

val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this

或fit函数在传递dataFrame时将自动处理并行化计算/数据

or the fit function will automatically take care of parallelizing the computation/ data when passed a dataFrame

此致

推荐答案

DataFrame 是分布式数据结构. parallelize 既不需要也不可能. SparkConext.parallelize 方法仅用于分布在驱动程序内存中的分布式本地数据结构.您不应该习惯于分发大型数据集,更不用说重新分发 RDD 或更高级别的数据结构了(就像您在上一个问题中所做的一样)

DataFrame is a distributed data structure. It is neither required nor possible to parallelize it. SparkConext.parallelize method is used only to distributed local data structures which reside in the driver memory. You shouldn't be used to distributed large datasets not to mention redistributing RDDs or higher level data structures (like you do in your previous question)

sc.parallelize(trainingData.collect()) 

如果要在 RDD / Dataframe ( Dataset )之间进行转换,请使用旨在做到这一点的方法:

If you want to convert between RDD / Dataframe (Dataset) use methods which are designed to do it:

  1. DataFrame RDD :

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

val df: DataFrame  = Seq(("foo", 1), ("bar", 2)).toDF("k", "v")
val rdd: RDD[Row] = df.rdd

  • RDD DataFrame :

    val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("foo", 1), ("bar", 2)))
    val df1: DataFrame = rdd.toDF
    // or
    val df2: DataFrame = spark.createDataFrame(rdd) // From 1.x use sqlContext
    

  • 这篇关于我们应该像训练之前并行化Seq那样并行化DataFrame吗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆