无法写入由两个压缩数据帧创建的PySpark数据帧 [英] Unable to write PySpark Dataframe created from two zipped dataframes

查看:47
本文介绍了无法写入由两个压缩数据帧创建的PySpark数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试遵循此处给出的示例,用于在没有共享联接键的情况下合并两个数据框 strong>(通过数据库表或熊猫数据框中的索引"组合,但PySpark不具有该概念):

I am trying to follow the example given here for combining two dataframes without a shared join key (combining by "index" in a database table or pandas dataframe, except that PySpark does not have that concept):

left_df = left_df.repartition(right_df.rdd.getNumPartitions()) # FWIW, num of partitions = 303
joined_schema = StructType(left_df.schema.fields + right_df.schema.fields)
interim_rdd = left_df.rdd.zip(right_df.rdd).map(lambda x: x[0] + x[1])
full_data = spark.createDataFrame(interim_rdd, joined_schema)

这一切似乎都正常.我正在使用DataBricks进行测试,并且可以运行单元"程序.上面没有问题.但是,当我去保存它时,我无法执行操作,因为它抱怨分区不匹配(???).我已经确认分区数匹配,但是您还可以在上面看到我明确地确定它们匹配.我的保存命令:

This all seems to work fine. I am testing it out while using DataBricks, and I can run the "cell" above with no problem. But then when I go to save it, I am unable because it complains that the partitions do not match (???). I have confirmed that the number of partitions match, but you can also see above that I am explicitly making sure they match. My save command:

full_data.write.parquet(my_data_path, mode="overwrite")

错误

我收到以下错误:

Error

I receive the following error:

Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition

我的猜测

我怀疑问题是,即使我匹配了分区的 number ,每个分区中的行数也不相同.但是我不知道该怎么做.我只知道如何指定分区编号,而不是分区方式.

My Guess

I am suspecting the problem is that, even though I have matched the number of partitions, I do not have the same number of rows in each partition. But I do not know how to do that. I only know how to specify the # of partitions, not the way to partition.

或者,更具体地说,如果没有我可以使用的列,我不知道指定如何分区的方法.请记住,它们没有共享列.

Or, more specifically, I do not know the way to specify how to partition if there is no column I can use. Remember, they have no shared column.

我怎么知道我可以通过这种方式将它们组合在一起,而没有共享的联接密钥?在这种情况下,这是因为我正在尝试将模型预测与输入数据结合起来,但实际上,这种情况更普遍,而不仅仅是模型数据和预测.

How do I know that I can combine them this way, with no shared join key? In this case, it is because I am trying to join model predictions with input data, but I actually have this case more generally, in situations beyond just model data + predictions.

  1. 特别是在上述情况下,如何正确设置分区以使其起作用?
  2. 我如何应该按行索引联接两个数据框?
    • (我知道标准响应是您不应该...分区使索引变得毫无意义",但是直到Spark创建ML库(如我在上面的链接中所述,不会造成数据丢失)之前,这始终是问题.)
  1. Specifically in the case above, how can I properly set up the partitioning so that it works?
  2. How should I join two dataframes by row index?
    • (I know the standard response is "you shouldn't... partitioning makes indices nonsensical", but until Spark creates ML libraries that do not force data loss like I described in the link above, this will always be an issue.)

推荐答案

RDD很老套,但是从这个角度回答错误.

RDD's are old hat, but answering from that perspective the error.

从拉筹伯大学 http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#zip 以下内容:

From la Trobe University http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html#zip the following:

通过将任一分区的第i个分区与每个分区相结合来加入两个RDD其他.生成的RDD将由两部分元组组成通过以下方法提供的方法被解释为键值对:PairRDDFunctions扩展.

Joins two RDDs by combining the i-th of either partition with each other. The resulting RDD will consist of two-component tuples which are interpreted as key-value pairs by the methods provided by the PairRDDFunctions extension.

注释对.

这意味着您必须具有相同的分区程序,该分区程序具有分区数量和每个分区的kv数量,否则上面的定义将不成立.

This means you must have the same partitioner with number of partitions and number of kv's per partition, else the definition above does not hold.

以repartition(n)的形式从文件中读取时应用的最佳方式可能不会提供相同的分配.

Best applied when reading in from files as repartition(n) may not give same distribution.

解决这个问题的一个小技巧是使用zipWithIndex作为k,v的k,就像这样(Scala不是pyspark特定方面):

A little trick to get around that is to use zipWithIndex for the k of k, v, like so (Scala as not a pyspark specific aspect):

val rddA = sc.parallelize(Seq(
  ("ICCH 1", 10.0), ("ICCH 2", 10.0), ("ICCH 4", 100.0), ("ICCH 5", 100.0)
))
val rddAA = rddA.zipWithIndex().map(x => (x._2, x._1)).repartition(5)

val rddB = sc.parallelize(Seq(
  (10.0, "A"), (64.0, "B"), (39.0, "A"), (9.0, "C"), (80.0, "D"), (89.0, "D")
))
val rddBB = rddA.zipWithIndex().map(x => (x._2, x._1)).repartition(5)

val zippedRDD = (rddAA zip rddBB).map{ case ((id, x), (y, c)) => (id, x, y, c) }
zippedRDD.collect

然后重新分区(n)似乎可以工作,因为k是相同的类型.

The repartition(n) then seems to work as the k is the same type.

但是每个分区必须具有相同的num元素.就是这样,但这是有道理的.

But you must have same num elements per partition. It is what it is, but it makes sense.

这篇关于无法写入由两个压缩数据帧创建的PySpark数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆