写R数据帧从SparkR返回:::地图 [英] Writing R data frames returned from SparkR:::map

查看:247
本文介绍了写R数据帧从SparkR返回:::地图的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用SparkR :::地图和我的函数返回一个肥胖型 - [R数据框为每个输入行,每个相同的形状。我想写这些dataframes拼花文件,而无需collect'ing他们。我可以映射write.df在我的输出列表?我可以得到工作人员的任务写的实木复合地板呢?

我现在有一个的工作的。例。我很高兴与这个以外我没想到的是降低隐含'收集'我想写得到的DF为木地板。

另外,我不相信:::地图实际执行并行东西。我是否需要一直叫'parallelise呢?

 #!在/ usr /斌/ RSCRIPT
库(SparkR,lib.loc =/选择/火花1.5.1斌且不的Hadoop / R / lib目录)源(JDBC-utils.R)选项​​(stringsAsFactors = FALSE)#我不喜欢这里了这些,但是当我将它们移动到主要的(),它打破 - 在sqlContext下降。
分配(SC,sparkR.init(主=火花:// POC - 硕士 - 1:7077
                         sparkHome =/opt/spark-1.5.1-bin-without-hadoop/
                         的appName =彼得火花试验
                         列表(spark.executor.memory =4G)),= ENVIR .GlobalEnv)
分配(sqlContext,sparkRSQL.init(SC),ENVIR = .GlobalEnv)#### MAP功能####
run.model< - 功能(五){
  X'LT; - V $ XS [1]
  Y'LT; - V $ YS [1]
  的startTime&下; - 格式(Sys.time(),%F%T)
  XS< - C(1:X)
  ENDTIME&下; - 格式(Sys.time(),%F%T)
  主机< - 系统(主机名,实习生= TRUE)
  XYS< - data.frame(XS,Y,startTime时,结束时间,主机名,stringsAsFactors = FALSE)
  返回(XYS)
}#这里是脚本BIT
主< - 功能(){  #使唯一标识符每次运行
  XS< - C(1:365)
  YS< - C(1:1)
  XYS< - data.frame(XS,YS,stringsAsFactors = FALSE)  #转换为星火映射数据框
  sqlContext< - 获得(sqlContext,ENVIR = .GlobalEnv)
  xys.sdf< - createDataFrame(sqlContext,XYS)  #让星火做做什么星火
  output.list< - SparkR :::地图(xys.sdf,run.model)  #减少给了我们一个单R数据帧,这可能不是我们想要的。
  output.redux< - SparkR :::减少(output.list,rbind)  #或者你可以把它作为数据帧的列表。
  output.col< - 收集(output.list)  返回(NULL)
}


解决方案

假设你的数据看起来或多或少是这样的:

  RDD<  -  SparkR :::并行化(SC,1:5)
DFS&下; - SparkR :::地图(RDD,函数(x)的mtcars [(X * 5):((X + 1)* 5),])

和所有列都支持类型,您可以把它转换成逐行格式:

 行<  -  SparkR ::: flatMap(DFS,功能(X){
  数据< - as.list(X)
  ARGS< - 列表(FUN =清单,简化= FALSE,USE.NAMES = FALSE)
  do.call(mapply,追加(参数,数据))
})

通话 createDataFrame

  SDF<  -  createDataFrame(sqlContext,行)
头(SDF)##英里共青团DISP马力DRAT重量的QseC VS上午齿轮碳水化合物
## 1 18.7 8 360.0 175 3.15 3.44 17.02 0 0 3 2
## 2 18.1 6 225.0 105 2.76 3.46 20.22 1 0 3 1
## 3 8 14.3 360.0 245 3.21 3.57 15.84 0 0 3 4
## 4 24.4 4 146.7 62 3.69 3.19 20.00 1 0 4 2
## 5 4 22.8 140.8 95 3.92 3.15 22.90 1 0 4 2
## 6 6 19.2 167.6 123 3.92 3.44 18.30 1 0 4 4printSchema(SDF)## 根
## | - MPG:双(可为空=真)
## | - 缸:双(可为空=真)
## | - DISP:双(可为空=真)
## | - 惠普:双(可为空=真)
## | - DRAT:双(可为空=真)
## | - 重量:双(可为空=真)
## | - QseC的:双(可为空=真)
## | - VS:双(可为空=真)
## | - 上午:双(可为空=真)
## | - 齿轮:双(可为空=真)
## | - 碳水化合物:双(可为空=真)

和简单地使用 write.df / saveDF

问题是你不应该摆在首位使用内部API。其中的一个是在初始释放除去的理由是不足够强大的可直接使用。且不说它目前还不清楚是否会得到支持,甚至在将来提供。只是说...

I am using SparkR:::map and my function returns a large-ish R dataframe for each input row, each of the same shape. I would like to write these dataframes as parquet files without 'collect'ing them. Can I map write.df over my output list? Can I get the worker tasks to write the parquet instead?

I now have a working. example. I am happy with this other than I did not expect the reduce to implicitly 'collect' as I wanted to write the resultant DF as Parquet.

Also, I'm not convinced that :::map actually does anything in parallel. Do I need always to call 'parallelise' as well?

#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")

source("jdbc-utils.R")

options(stringsAsFactors = FALSE)

# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
                         sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
                         appName = "Peter Spark test",
                         list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)

#### MAP function ####
run.model <- function(v) {
  x <- v$xs[1]
  y <- v$ys[1]
  startTime     <- format(Sys.time(), "%F %T")
  xs <- c(1:x)
  endTime <- format(Sys.time(), "%F %T")
  hostname <- system("hostname", intern = TRUE)
  xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
  return(xys)
}

# HERE BE THE SCRIPT BIT
main <- function() {

  # Make unique identifiers for each run
  xs <- c(1:365)
  ys <- c(1:1)
  xys <- data.frame(xs,ys,stringsAsFactors = FALSE)

  # Convert to Spark dataframe for mapping
  sqlContext <- get("sqlContext", envir = .GlobalEnv)
  xys.sdf <- createDataFrame(sqlContext, xys)

  # Let Spark do what Spark does
  output.list <- SparkR:::map(xys.sdf, run.model)

  # Reduce gives us a single R dataframe, which may not be what we want.
  output.redux <- SparkR:::reduce(output.list, rbind)

  # Or you can have it as a list of data frames.
  output.col <- collect(output.list)

  return(NULL)
}

解决方案

Assuming your data looks more or less like this:

rdd <- SparkR:::parallelize(sc, 1:5)
dfs <- SparkR:::map(rdd, function(x) mtcars[(x * 5):((x + 1) * 5), ])

and all columns have supported types you can convert it to the row-wise format:

rows <- SparkR:::flatMap(dfs, function(x) {
  data <- as.list(x)
  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
  do.call(mapply, append(args, data))
})

call createDataFrame:

sdf <- createDataFrame(sqlContext, rows)
head(sdf)

##    mpg cyl  disp  hp drat   wt  qsec vs am gear carb
## 1 18.7   8 360.0 175 3.15 3.44 17.02  0  0    3    2
## 2 18.1   6 225.0 105 2.76 3.46 20.22  1  0    3    1
## 3 14.3   8 360.0 245 3.21 3.57 15.84  0  0    3    4
## 4 24.4   4 146.7  62 3.69 3.19 20.00  1  0    4    2
## 5 22.8   4 140.8  95 3.92 3.15 22.90  1  0    4    2
## 6 19.2   6 167.6 123 3.92 3.44 18.30  1  0    4    4

printSchema(sdf)

## root
##  |-- mpg: double (nullable = true)
##  |-- cyl: double (nullable = true)
##  |-- disp: double (nullable = true)
##  |-- hp: double (nullable = true)
##  |-- drat: double (nullable = true)
##  |-- wt: double (nullable = true)
##  |-- qsec: double (nullable = true)
##  |-- vs: double (nullable = true)
##  |-- am: double (nullable = true)
##  |-- gear: double (nullable = true)
##  |-- carb: double (nullable = true)

and simply use write.df / saveDF.

Problem is you shouldn't use an internal API in the first place. One of the reasons it was removed in the initial release is not robust enough to be used directly. Not to mention it is still not clear if it will be supported or even available in the future. Just saying...

这篇关于写R数据帧从SparkR返回:::地图的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆