将行列表保存到 pyspark 中的 Hive 表 [英] saving a list of rows to a Hive table in pyspark

查看:31
本文介绍了将行列表保存到 pyspark 中的 Hive 表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 pyspark 应用程序.我复制了一个 hive 表到我的 hdfs 目录,&在 python 中,我 sqlContext.sql 对这个表进行查询.现在这个变量是一个我称之为 rows 的数据框.我需要随机打乱 rows,所以我必须将它们转换为行列表 rows_list = rows.collect().然后我 shuffle(rows_list) 将列表打乱到位.我取了我需要的随机行数 x:

I have a pyspark app. I copied a hive table to my hdfs directory, & in python I sqlContext.sql a query on this table. Now this variable is a dataframe I call rows. I need to randomly shuffle the rows, so I had to convert them to a list of rows rows_list = rows.collect(). So then I shuffle(rows_list) which shuffles the lists in place. I take the amount of random rows I need x:

for r in range(x):allrows2add.append(rows_list[r])现在我想将 allrows2add 保存为 hive 表或附加现有的 hive 表(以更容易的方式进行).问题是我不能这样做:

for r in range(x): allrows2add.append(rows_list[r]) Now I want to save allrows2add as a hive table OR append an existing hive table (whichever is easier to do). The problem is that I can not do this:

all_df = sc.parallelize(allrows2add).toDF() 不能这样做,无法推断架构ValueError: 某些类型无法通过前 100 行确定,请重新采样

all_df = sc.parallelize(allrows2add).toDF() Cant do this, schema can't be inferred ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

无需放入整个架构.rows 的模式有 117 列,所以我不想把它们打出来.有没有办法提取 rows 的模式来帮助我让 allrows2add 一个数据框或以某种方式保存为一个配置单元表?我可以rows.printSchema() 但不知道如何将其作为变量导入模式格式以传递 toDF() 而不必解析所有文本

without putting in the whole schema. The schema of rows has 117 columns, so I don't want to type them out. Is there a way to extract the schema of rows to help me make allrows2add a dataframe or somehow save as a hive table? I can do rows.printSchema() but not sure how to get it into a schema format as a variable to pass toDF() without having to parse all of that text

谢谢

添加循环信息

#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()

for i in range(len(Table)):

    rows = sqlContext.sql(qry)
    val1 = Table[i][0]
    val2 = Table[i][1]
    count = Table[i][2]
    x = 100 - count

#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;

    query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)

    rows = sqlContext.sql(query)
    rows = rows.withColumn("col4", lit(10))
    rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
    rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
    #rows_list = rows.collect()
    #shuffle(rows_list)

推荐答案

当无法推断架构时,通常是有原因的.toDFcreateDataFrame 函数的语法糖,默认情况下它只使用前 100 行 (尽管文档说它只使用第一行)来确定架构应该是什么.要改变这种情况,您可以提高采样率以查看更大比例的数据:

When the schema can't be inferred, there's usually a reason. toDF is syntactic sugar for the createDataFrame function, which by default only uses the first 100 rows (despite the docs saying it only uses the first row) to determine what the schema should be. To change this, you can increase the sampling ratio to look at a greater percentage of your data:

df = rdd.toDF(sampleRatio=0.2)
# or...
df = sqlContext.createDataFrame(rdd, samplingRatio=0.2)

也有可能您的随机样本碰巧只对某些特定列采用空值的行.如果是这种情况,您可以 从头开始创建架构 像这样:

It's also possible that your random sample happened to only take rows with empty values for some particular columns. If this is the case, you can either create a schema from scratch like so:

from pyspark.sql.types import *
# all DataFrame rows are StructType
# can create a new StructType with combinations of StructField
schema = StructType([
    StructField("column_1", StringType(), True),
    StructField("column_2", IntegerType(), True),
    # etc.
])
df = sqlContext.createDataFrame(rdd, schema=schema)

或者,您可以通过访问 schema 值从之前创建的 DataFrame 中获取架构:

Or, you can get the schema from the previous DataFrame you created by accessing the schema value:

df2 = sqlContext.createDataFrame(rdd, schema=df1.schema)

请注意,如果您的 RDD 的行不是 StructType(又名 Row)对象而不是字典或列表,您将无法从他们.如果您的 RDD 行是字典,您可以将它们转换为 Row 对象,如下所示:

Note that if your RDD's rows aren't StructType (a.k.a. Row) objects instead of dictionaries or lists, you won't be able to create a data frame from them. If your RDD rows are dictionaries, you can convert them to Row objects like this:

rdd = rdd.map(lambda x: pyspark.sql.Row(**x))
# ** is to unpack the dictionary since the Row constructor
# only takes keyword arguments

这篇关于将行列表保存到 pyspark 中的 Hive 表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆