Pyspark将间隔拆分为子间隔 [英] Pyspark split interval into sub intervals

查看:89
本文介绍了Pyspark将间隔拆分为子间隔的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据框,其中有3列,例如从",到",国家":

I have a dataframe with 3 columns "from", "to", "country" for ex:

from to country
1    105 abc
500  1000 def

我想通过将值分成大小= 10来创建数据框.因此,我应该将数据框设为

I want to create dataframe by splitting from and to values into sizes = 10. So i should get dataframe as

from to country
1    10 abc
11   20 abc
21   30 abc
31   40 abc
...
91   105 abc ( the left out values go in last bucket for that range)
500  510 def

以此类推...

推荐答案

from pyspark.sql.functions import udf, col, explode, array, struct, length
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType

#Creating the DataFrame
values = [(1,105,'abc'),(500,1000,'def')]
df = sqlContext.createDataFrame(values,['from','to','country'])

step_size=10
#Creating UDFs below
def make_list_from(start,end):
    return [i for i in list(range(start, end, step_size)) if (end-i) >= (step_size-1)]
make_list_from_udf = udf(make_list_from,ArrayType(IntegerType()))

def make_list_to(start,end):
    right_list=[i+step_size-1 for i in list(range(start, end, step_size)) if (end-i) >= (step_size-1)]
    right_list[len(right_list)-1]=end
    return right_list
make_list_to_udf = udf(make_list_to,ArrayType(IntegerType()))

#Creating Lists of sub-intervals
df = df.withColumn('my_list_from',make_list_from_udf(col('from'),col('to')))\
       .withColumn('my_list_to',make_list_to_udf(col('from'),col('to')))\
       .drop('from','to')
df.show()
+-------+--------------------+--------------------+
|country|        my_list_from|          my_list_to|
+-------+--------------------+--------------------+
|    abc|[1, 11, 21, 31, 4...|[10, 20, 30, 40, ...|
|    def|[500, 510, 520, 5...|[509, 519, 529, 5...|
+-------+--------------------+--------------------+

#Exploding the Lists
zip_ = udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("first", IntegerType()),
      StructField("second", IntegerType())
  ]))
)
df = (df
    .withColumn("tmp", zip_("my_list_from", "my_list_to"))
    # UDF output cannot be directly passed to explode
    .withColumn("tmp", explode("tmp"))
    .select(col("tmp.first").alias("from"), col("tmp.second").alias("to"), "country"))
df.show(100)
+----+----+-------+
|from|  to|country|
+----+----+-------+
|   1|  10|    abc|
|  11|  20|    abc|
|  21|  30|    abc|
|  31|  40|    abc|
|  41|  50|    abc|
|  51|  60|    abc|
|  61|  70|    abc|
|  71|  80|    abc|
|  81|  90|    abc|
|  91| 105|    abc|
| 500| 509|    def|
| 510| 519|    def|
| 520| 529|    def|
.
.
.
| 960| 969|    def|
| 970| 979|    def|
| 980| 989|    def|
| 990|1000|    def|
+----+----+-------+

这篇关于Pyspark将间隔拆分为子间隔的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆