Pyspark 2.1.0 中的自定义分区器 [英] Custom Partitioner in Pyspark 2.1.0

查看:44
本文介绍了Pyspark 2.1.0 中的自定义分区器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我读到具有相同分区器的 RDD 将位于同一位置.这对我很重要,因为我想加入几个未分区的大型 Hive 表.我的理论是,如果我可以将它们分区(通过字段调用 date_day)并位于同一位置,那么我将避免改组.

这是我要为每张桌子做的事情:

def date_day_partitioner(key):返回 (key.date_day - datetime.date(2017,05,01)).daysdf = sqlContext.sql("select * from hive.table")rdd = df.rddrdd2 = rdd.partitionBy(100, date_day_partitioner)df2 = sqlContext.createDataFrame(rdd2, df_log_entry.schema)打印 df2.count()

不幸的是,我什至无法测试我关于协同定位和避免改组的理论,因为我在尝试 partitionBy 时收到以下错误:ValueError: too many values to unpack

回溯(最近一次调用最后一次): 中的文件/tmp/zeppelin_pyspark-118755547579363441.py",第 346 行引发异常(traceback.format_exc())例外:回溯(最近一次调用最后一次): 中的文件/tmp/zeppelin_pyspark-118755547579363441.py",第 339 行执行(代码)文件<stdin>",第 15 行,在 <module> 中.文件/usr/lib/spark/python/pyspark/sql/dataframe.py",第 380 行,计数返回 int(self._jdf.count())文件/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py",第 1133 行,在 __call__ 中答案,self.gateway_client,self.target_id,self.name)文件/usr/lib/spark/python/pyspark/sql/utils.py",第 63 行,在 deco 中返回 f(*a, **kw)文件/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py",第 319 行,在 get_return_value 中格式(target_id,.",名称),值)Py4JJavaError:调用 o115.count 时发生错误.:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 6.0 中的任务 21 失败 4 次,最近失败:阶段 6.0 中丢失任务 21.3(TID 182,ip-172-31-49-209.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (最近一次调用):文件/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/worker.py",主线174,过程()文件/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/worker.py",第169行,第169行serializer.dump_stream(func(split_index, iterator), outfile)文件/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/serializers.py",第138行对于迭代器中的 obj:文件/usr/lib/spark/python/pyspark/rdd.py",第 1752 行,在 add_shuffle_key 中ValueError:解包的值太多在 org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)在 org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)在 org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)在 org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287)在 org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)在 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)在 org.apache.spark.rdd.RDD.iterator(RDD.scala:287)...

我一定是做错了什么,你能帮忙吗?

解决方案

发生这种情况是因为您没有在键值对 rdd 上应用 partitionBy.您的 rdd 必须在键值对中.此外,您的密钥类型应该是整数.我没有您的配置单元表的示例数据.因此,让我们使用下面的 hive 表来证明这一事实:

我使用 hive 表创建了以下数据框:

df = spark.table("udb.emp_details_table");+------+--------+--------+----------------+|emp_id|emp_name|emp_dept|emp_joining_date|+------+--------+--------+----------------+|1|AAA|人力资源|2018-12-06||1|BBB|人力资源|2017-10-26||2|XXX|管理员|2018-10-22||2|YYYY|管理员|2015-10-19||2|ZZZ|资讯科技|2018-05-14||3|GGG|人力资源|2018-06-30|+------+--------+--------+----------------+

现在,我希望对我的数据帧进行分区,并希望将相似的键保留在一个分区中.因此,我已将我的数据帧转换为 rdd,因为您只能在 rdd 上应用 partitionBy 进行重新分区.

 myrdd = df.rddnewrdd = myrdd.partitionBy(10,lambda k: int(k[0]))newrdd.take(10)

我遇到了同样的错误:

 文件/usr/hdp/current/spark2-client/python/pyspark/rdd.py",第 1767 行,在 add_shuffle_key 中对于 k, v 在迭代器中:ValueError:解包的值太多

因此,我们需要将 rdd 转换为键值对以使用 paritionBy

keypair_rdd = myrdd.map(lambda x : (x[0],x[1:]))

现在,您可以看到 rdd 已转换为键值对,因此您可以根据可用的键将数据分布在分区中.

[(u'1', (u'AAA', u'HR', datetime.date(2018, 12, 6))),(u'1', (u'BBB', u'HR', datetime.date(2017, 10, 26))),(u'2', (u'XXX', u'ADMIN', datetime.date(2018, 10, 22))),(u'2', (u'YYY', u'ADMIN', datetime.date(2015, 10, 19))),(u'2', (u'ZZZ', u'IT', datetime.date(2018, 5, 14))),(u'3', (u'GGG', u'HR', datetime.date(2018, 6, 30))]

现在在键值 rdd 上使用 paritionBy:

newrdd = keypair_rdd.partitionBy(5,lambda k: int(k[0]))

让我们看看分区.数据被分组并且相似的键现在被存储到相似的分区中.其中两个是空的.

<预><代码>>>>print("分区结构:{}".format(newrdd.glom().map(len).collect()))分区结构:[0, 2, 3, 1, 0]

现在假设我想自定义分区我的数据.所以我创建了下面的函数来将键 '1' 和 '3' 保存在类似的分区中.

def partitionFunc(key):随机导入如果键 == 1 或键 == 3:返回 0别的:返回 random.randint(1,2)newrdd = keypair_rdd.partitionBy(5,lambda k: partitionFunc(int(k[0])))>>>print("分区结构:{}".format(newrdd.glom().map(len).collect()))分区结构:[3, 3, 0, 0, 0]

正如您现在所看到的,键 1 和 3 存储在一个分区中并位于另一个分区中.

我希望这会有所帮助.您可以尝试通过您的数据框进行分区.确保将其转换为键值对并保持键为整数类型.

I read that RDDs with the same partitioner will be co-located. This is important to me because I want to join several large Hive tables that are not partitioned. My theory is that if I can get them partitioned (by a field call date_day) and co-located then I would avoid shuffling .

Here is what I am trying to do for each table:

def date_day_partitioner(key):
  return (key.date_day - datetime.date(2017,05,01)).days

df = sqlContext.sql("select * from hive.table")
rdd = df.rdd
rdd2 = rdd.partitionBy(100, date_day_partitioner)
df2 = sqlContext.createDataFrame(rdd2, df_log_entry.schema)

print df2.count()

Unfortunately, I can't even test my theory about co-location and avoiding shuffling, because I get the following error when I try partitionBy: ValueError: too many values to unpack

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-118755547579363441.py", line 346, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-118755547579363441.py", line 339, in <module>
    exec(code)
  File "<stdin>", line 15, in <module>
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 380, in count
    return int(self._jdf.count())
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o115.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 6.0 failed 4 times, most recent failure: Lost task 21.3 in stage 6.0 (TID 182, ip-172-31-49-209.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1752, in add_shuffle_key
ValueError: too many values to unpack
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
...

I must be doing something wrong, could you please help?

解决方案

It's happening because you are not applying partitionBy on key-value pair rdd. Your rdd must be in key-value pair. Also, your key type should be integer. I don't have sample data for your hive table. So let's demonstrate the fact using below hive table:

I have created a below dataframe using hive table :

df = spark.table("udb.emp_details_table");
+------+--------+--------+----------------+
|emp_id|emp_name|emp_dept|emp_joining_date|
+------+--------+--------+----------------+
|     1|     AAA|      HR|      2018-12-06|
|     1|     BBB|      HR|      2017-10-26|
|     2|     XXX|   ADMIN|      2018-10-22|
|     2|     YYY|   ADMIN|      2015-10-19|
|     2|     ZZZ|      IT|      2018-05-14|
|     3|     GGG|      HR|      2018-06-30|
+------+--------+--------+----------------+

Now, I wish to partition my dataframe and want to keep the similar keys in one partition. So, I have converted my dataframe to rdd as you can only apply partitionBy on rdd for re-partitioning.

    myrdd = df.rdd
    newrdd = myrdd.partitionBy(10,lambda k: int(k[0]))
    newrdd.take(10)

I got the same error:

 File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1767, in add_shuffle_key
    for k, v in iterator:
ValueError: too many values to unpack 

Hence, we need to convert our rdd into key-value pair to use paritionBy

keypair_rdd = myrdd.map(lambda x : (x[0],x[1:]))

Now,you can see that rdd has been converted to key value pair and you can therefore distribute your data in partitions according to keys available.

[(u'1', (u'AAA', u'HR', datetime.date(2018, 12, 6))), 
(u'1', (u'BBB', u'HR', datetime.date(2017, 10, 26))), 
(u'2', (u'XXX', u'ADMIN', datetime.date(2018, 10, 22))), 
(u'2', (u'YYY', u'ADMIN', datetime.date(2015, 10, 19))), 
(u'2', (u'ZZZ', u'IT', datetime.date(2018, 5, 14))), 
(u'3', (u'GGG', u'HR', datetime.date(2018, 6, 30)))]

Using a paritionBy on key-value rdd now:

newrdd = keypair_rdd.partitionBy(5,lambda k: int(k[0]))

Lets take a look at the partitions. Data is grouped and similar keys are stored into similar partitions now. Two of them are empty.

>>> print("Partitions structure: {}".format(newrdd.glom().map(len).collect()))
Partitions structure: [0, 2, 3, 1, 0]

Now lets say I want to custom partitioning my data. So I have created below function to keep keys '1' and '3' in similar partition.

def partitionFunc(key):
    import random
    if key == 1 or key == 3:
        return 0
    else:
        return random.randint(1,2)

newrdd = keypair_rdd.partitionBy(5,lambda k: partitionFunc(int(k[0])))

>>> print("Partitions structure: {}".format(newrdd.glom().map(len).collect()))
Partitions structure: [3, 3, 0, 0, 0]

As you can see now that keys 1 and 3 are stored in one partition and rest on other.

I hope this helps. You can try to partitionBy your dataframe. Make sure to convert it into key value pair and keeping key as type integer.

这篇关于Pyspark 2.1.0 中的自定义分区器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆