将行列表保存到pyspark中的Hive表中 [英] saving a list of rows to a Hive table in pyspark
问题描述
我有一个pyspark应用程序。我将配置单元表复制到我的hdfs目录中,&在python中我对这个表进行了一次查询。现在这个变量是一个数据框,我调用 rows
。我需要随机洗牌行
,所以我必须将它们转换为行列表 rows_list = rows.collect()
。因此,然后我在 shuffle(rows_list)
中混合列表。我需要的随机行数量我需要 x
:
x):
allrows2add.append(rows_list [r])
现在我想将allrows2add保存为配置单元表或者追加现有的配置单元表更容易做到)。问题是我不能这样做:
all_df = sc.parallelize(allrows2add).toDF()
无法做到这一点,模式不能被推断
ValueError:有些类型不能由前100行确定,请尝试再次采样
,而不需要放入整个模式。 rows
的模式有117列,所以我不想输入它们。有没有办法提取行
的模式,以帮助我使allrows2add数据框或以某种方式保存为配置单元表?
我可以做
rows.printSchema()
,但不知道如何将它作为变量传递给模式格式以传递 toDF()
而不必解析所有文本
谢谢
添加for循环信息
#Table是我使用
加载的小Hive表的行列表#query = SELECT * FROM Table
#Table = sqlContext.sql(query).collect()
在范围内(len(表)):
行= sqlContext.sql(qry)
val1 = Table [i] [0]
val2 = Table [i] [1]
count = Table [i] [2]
x = 100 - count
#hivetemp是我从Hive复制到我的hfs的表格:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION/ user / name / hiveBackup ;
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;
query =SELECT * FROM hivetemp WHERE col1<> \+ val1 +\AND col2 == \+ val2 +\ORDER BY RAND()LIMIT + str(x)
rows = sqlContext.sql(query)$ b $ rows = rows.withColumn(col4,lit(10))
rows = rows.withColumn (col5,点亮(some_string))
#writing到实木复合地板很慢,而且由于库没有安装在服务器上,我无法使用pandas
rows.saveAsParquetFile(rows+ str(i)+。parquet)
#在此之前执行此操作,并且速度也很慢
#rows_list = rows.collect()
#shuffle(rows_list)
解决方案当架构无法被推断时,通常是有原因的。
toDF
是createDataFrame
函数的语法糖,它默认只使用前100行(尽管文档说它只使用第一行)来确定模式应该是什么。要改变这种情况,你可以增加采样率来查看更多的数据比例: b
$ bdf = rdd.toDF(sampleRatio = 0.2)
#或...
df = sqlContext.createDataFrame(rdd,samplingRatio = 0.2)
您的随机样本也可能碰巧只对某些特定列采用空值的行。如果是这种情况,您可以从头创建一个模式,如下所示:
from pyspark.sql .types import *
#所有DataFrame行都是StructType
#可以用StructField的组合创建一个新的StructType $ b $ schema = StructType([column_1],StringType() ,True),
StructField(column_2,IntegerType(),True),
#等等
])
df = sqlContext.createDataFrame(rdd,schema = schema)
或者,您可以通过访问
schema
value:
$ bdf2 = sqlContext.createDataFrame(rdd, schema = df1.schema)
请注意,如果您的RDD行不是
StructType
(亦称行
)对象而不是字典或列表,您将无法从它们创建数据框。如果您的RDD行是字典,您可以将它们转换为Row
这样的对象:
rdd = rdd.map(lambda x:pyspark.sql.Row(** x))
#**将解压缩字典,因为Row构造函数
#仅使用关键字参数
I have a pyspark app. I copied a hive table to my hdfs directory, & in python I
sqlContext.sql
a query on this table. Now this variable is a dataframe I callrows
. I need to randomly shuffle therows
, so I had to convert them to a list of rowsrows_list = rows.collect()
. So then Ishuffle(rows_list)
which shuffles the lists in place. I take the amount of random rows I needx
:
for r in range(x): allrows2add.append(rows_list[r])
Now I want to save allrows2add as a hive table OR append an existing hive table (whichever is easier to do). The problem is that I can not do this:
all_df = sc.parallelize(allrows2add).toDF()
Cant do this, schema can't be inferredValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
without putting in the whole schema. The schema of
rows
has 117 columns, so I don't want to type them out. Is there a way to extract the schema ofrows
to help me make allrows2add a dataframe or somehow save as a hive table? I can dorows.printSchema()
but not sure how to get it into a schema format as a variable to passtoDF()
without having to parse all of that textThanks
Adding for loop info
#Table is a List of Rows from small Hive table I loaded using #query = "SELECT * FROM Table" #Table = sqlContext.sql(query).collect() for i in range(len(Table)): rows = sqlContext.sql(qry) val1 = Table[i][0] val2 = Table[i][1] count = Table[i][2] x = 100 - count #hivetemp is a table that I copied from Hive to my hfs using: #create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup"; #INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy; query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x) rows = sqlContext.sql(query) rows = rows.withColumn("col4", lit(10)) rows = rows.withColumn("col5", lit(some_string)) #writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server rows.saveAsParquetFile("rows"+str(i)+".parquet") #tried this before and heck slow also #rows_list = rows.collect() #shuffle(rows_list)
解决方案When the schema can't be inferred, there's usually a reason.
toDF
is syntactic sugar for thecreateDataFrame
function, which by default only uses the first 100 rows (despite the docs saying it only uses the first row) to determine what the schema should be. To change this, you can increase the sampling ratio to look at a greater percentage of your data:df = rdd.toDF(sampleRatio=0.2) # or... df = sqlContext.createDataFrame(rdd, samplingRatio=0.2)
It's also possible that your random sample happened to only take rows with empty values for some particular columns. If this is the case, you can either create a schema from scratch like so:
from pyspark.sql.types import * # all DataFrame rows are StructType # can create a new StructType with combinations of StructField schema = StructType([ StructField("column_1", StringType(), True), StructField("column_2", IntegerType(), True), # etc. ]) df = sqlContext.createDataFrame(rdd, schema=schema)
Or, you can get the schema from the previous DataFrame you created by accessing the
schema
value:df2 = sqlContext.createDataFrame(rdd, schema=df1.schema)
Note that if your RDD's rows aren't
StructType
(a.k.a.Row
) objects instead of dictionaries or lists, you won't be able to create a data frame from them. If your RDD rows are dictionaries, you can convert them toRow
objects like this:rdd = rdd.map(lambda x: pyspark.sql.Row(**x)) # ** is to unpack the dictionary since the Row constructor # only takes keyword arguments
这篇关于将行列表保存到pyspark中的Hive表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!