将RDD的值作为变量传递给另一个RDD-Spark #Pyspark [英] passing value of RDD to another RDD as variable - Spark #Pyspark

查看:481
本文介绍了将RDD的值作为变量传递给另一个RDD-Spark #Pyspark的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在探索如何通过sqlContext调用大型hql文件(在select语句中包含100行插入内容).

I am currently exploring how to call big hql files (contains 100 line of an insert into select statement) via sqlContext.

另一件事是,hqls文件是参数化的,因此从sqlContext调用它时,我也想传递参数.

Another thing is, The hqls files are parameterize, so while calling it from sqlContext, I want to pass the parameters as well.

经历了很多博客和帖子,但未找到任何答案.

Have gone through loads of blogs and posts, but not found any answers to this.

我正在尝试的另一件事是将rdd的输出存储到变量中.

Another thing I was trying, to store an output of rdd into a variable.

pyspark

max_date=sqlContext.sql("select max(rec_insert_date) from table")

现在想将max_date作为变量传递给下一个rdd

now want to pass max_date as variable to next rdd

incremetal_data=sqlConext.sql(s"select count(1) from table2 where rec_insert_date > $max_dat")

这不起作用,而且max_date的值是=

This is not working , moreover the value for max_date is coming as =

u[row-('20018-05-19 00:00:00')]

现在还不清楚如何修剪那些多余的字符.

now this is not clear how to trim those extra characters.

推荐答案

您不使用max(rec_insert_date)代替count(rec_insert_date)吗?

对于将一个查询返回的值传递给另一个查询,您有两种选择:

You have two options on passing values returned from one query to another:

  1. 使用collect会触发计算并将返回值分配给变量

  1. Use collect, which will trigger computations and assign returned value to a variable

max_date = sqlContext.sql("select max(rec_insert_date) from table").collect()[0][0] # max_date has actual date assigned to it incremetal_data = sqlConext.sql(s"select count(1) from table2 where rec_insert_date > '{}'".format(max_date))

max_date = sqlContext.sql("select max(rec_insert_date) from table").collect()[0][0] # max_date has actual date assigned to it incremetal_data = sqlConext.sql(s"select count(1) from table2 where rec_insert_date > '{}'".format(max_date))

另一个(更好)的选择是使用Dataframe API

Another (and better) option is to use Dataframe API

from pyspark.sql.functions import col, lit incremental_data = sqlContext.table("table2").filter(col("rec_insert_date") > lit(max_date))

from pyspark.sql.functions import col, lit incremental_data = sqlContext.table("table2").filter(col("rec_insert_date") > lit(max_date))

使用交叉连接-如果第一个查询的结果不止一个,则应避免使用交叉连接.优点是您不会破坏处理过程,因此Spark可以优化所有内容.

Use cross join - it should be avoided if you have more than 1 result from the first query. The advantage is that you don't break the graph of processing, so everything can be optimized by Spark.

max_date_df = sqlContext.sql("select max(rec_insert_date) as max_date from table") # max_date_df is a dataframe with just one row incremental_data = sqlContext.table("table2").join(max_date_df).filter(col("rec_insert_date") > col("max_date"))

max_date_df = sqlContext.sql("select max(rec_insert_date) as max_date from table") # max_date_df is a dataframe with just one row incremental_data = sqlContext.table("table2").join(max_date_df).filter(col("rec_insert_date") > col("max_date"))

关于您的第一个问题,如何从Spark调用大型hql文件:

As for you first question how to call large hql files from Spark:

  • If you're using Spark 1.6 then you need to create a HiveContext https://spark.apache.org/docs/1.6.1/sql-programming-guide.html#hive-tables
  • If you're using Spark 2.x then while creating SparkSession you need to enable Hive Support https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables

您可以从在sqlContext.sql(...)方法中插入im开始,根据我的经验,这通常是可行的,并且是将逻辑重写为DataFrames/Datasets API的一个不错的起点.在集群中运行它时可能会出现一些问题,因为您的查询将由Spark的SQL引擎(Catalyst)执行,并且不会传递给Hive.

You can start by inserting im in a sqlContext.sql(...) method, from my experience this usually works and is a nice starting point to rewrite the logic to DataFrames/Datasets API. There may be some issues while running it in your cluster because your queries will be executed by Spark's SQL engine (Catalyst) and won't be passed to Hive.

这篇关于将RDD的值作为变量传递给另一个RDD-Spark #Pyspark的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆