在聚集后读取和写入具有火花的配置单元表 [英] reading and writing from hive tables with spark after aggregation
问题描述
我们有一个蜂房仓库,并且想要使用火花来完成各种任务(主要是分类)。有时将结果写回配置单元表。例如,我们编写了下面的python函数来查找由original_table第一列分组的original_table第二列的总和。该函数可以工作,但我们担心效率低下,特别是要转换为键值对的映射和字典版本。功能组合,mergeValue,mergeCombiner在别处定义的,但工作细
We have a hive warehouse, and wanted to use spark for various tasks (mainly classification). At times write the results back as a hive table. For example, we wrote the following python function to find the total sum of original_table column two, grouped by original_table column one. The function works, but we are worried that it is inefficient, particularly the maps to convert to key-value pairs, and dictionary versions. Functions combiner, mergeValue, mergeCombiner are defined elsewhere, but work fine.
from pyspark import HiveContext
rdd = HiveContext(sc).sql('from original_table select *')
#convert to key-value pairs
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1])))
#create rdd where rows are (key, (sum, count)
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner)
# creates rdd with dictionary values in order to create schemardd
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]})
# infer the schema
schema_rdd = HiveContext(sc).inferSchema(dict_rdd)
# save
schema_rdd.saveAsTable('new_table_name')
是否有更有效的方式同样的东西?
Are there more efficient ways of doing the same thing?
推荐答案
您使用的是什么版本的火花?
What version of spark you are using ?
这个答案基于1。 6&使用数据框。
This answer is based on 1.6 & using the data frames.
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val client = Seq((1, "A", 10), (2, "A", 5), (3, "B", 56)).toDF("ID", "Categ", "Amnt")
import org.apache.spark.sql.functions._
client.groupBy("Categ").agg(sum("Amnt").as("Sum"), count("ID").as("count")).show()
+-----+---+-----+
|Categ|Sum|count|
+-----+---+-----+
| A| 15| 2|
| B| 56| 1|
+-----+---+-----+
希望这有助于!!
这篇关于在聚集后读取和写入具有火花的配置单元表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!