如何使用PySpark进行嵌套的for-each循环 [英] how to do a nested for-each loop with PySpark

查看:141
本文介绍了如何使用PySpark进行嵌套的for-each循环的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

想象一下一个大型数据集(> 40GB实木复合地板文件),其中包含成千上万个变量的值观察值,这些值以三元组的形式显示.(变量,时间戳,值).

Imagine a large dataset (>40GB parquet file) containing value observations of thousands of variables as triples (variable, timestamp, value).

现在考虑一个查询,您只对500个变量的子集感兴趣.您想检索特定时间点(观测窗口或时间范围)的那些变量的观测值(值->时间序列).这样具有开始和结束时间.

Now think of a query in which you are just interested in a subset of 500 variables. And you want to retrieve the observations (values --> time series) for those variables for specific points in time (observation window or timeframe). Such having a start and end time.

如果没有分布式计算(Spark),则可以这样编码:

Without distributed computing (Spark), you could code it like this:

for var_ in variables_of_interest:
    for incident in incidents:

        var_df = df_all.filter(
            (df.Variable == var_)
            & (df.Time > incident.startTime)
            & (df.Time < incident.endTime))

我的问题是:如何使用Spark/PySpark做到这一点?我在想:

My question is: how to do that with Spark/PySpark? I was thinking of either:

  1. 以某种方式将事件与变量结合起来,然后过滤数据框.
  2. 广播事件数据帧,并在过滤变量观测值(df_all)时在map函数中使用它.
  3. 以某种方式使用RDD.cartasian或RDD.mapParitions(请注意:实木复合地板文件已保存为变量分隔).

预期输出应为:

incident1 --> dataframe 1
incident2 --> dataframe 2
...

如果数据帧1包含事件1和2的时间范围内的所有变量及其观测值,则这些值在事件2的时间帧内.

Where dataframe 1 contains all variables and their observed values within the timeframe of incident 1 and dataframe 2 those values within the timeframe of incident 2.

我希望你有主意.

更新

我试图根据想法#1和zero323给出的答案中的代码编写一个解决方案.工作还算不错,但是我不知道如何在最后一步将其汇总/分组到事件中?我尝试为每个事件添加一个顺序号,但是在最后一步中出现了错误.如果您可以查看和/或完成代码,那就太好了.因此,我上传了示例数据和脚本.环境是Spark 1.4(PySpark):

I tried to code a solution based on idea #1 and the code from the answer given by zero323. Work's quite well, but I wonder how to aggregate/group it to the incident in the final step? I tried adding a sequential number to each incident, but then I got errors in the last step. Would be cool if you can review and/or complete the code. Therefore I uploaded sample data and the scripts. The environment is Spark 1.4 (PySpark):

  • Incidents: incidents.csv
  • Variable value observation data (77MB): parameters_sample.csv (put it to HDFS)
  • Jupyter Notebook: nested_for_loop_optimized.ipynb
  • Python Script: nested_for_loop_optimized.py
  • PDF export of Script: nested_for_loop_optimized.pdf

推荐答案

通常来说,只有第一种方法对我来说才是明智的.关于记录和分发数量的精确联接策略,但是您可以创建顶级数据框:

Generally speaking only the first approach looks sensible to me. Exact joining strategy on the number of records and distribution but you can either create a top level data frame:

ref = sc.parallelize([(var_, incident) 
    for var_ in variables_of_interest:
    for incident in incidents
]).toDF(["var_", "incident"])

,只需 join

same_var = col("Variable") == col("var_")
same_time = col("Time").between(
    col("incident.startTime"),
    col("incident.endTime")
)

ref.join(df.alias("df"), same_var &  same_time)

或对特定分区执行联接:

or perform joins against particular partitions:

incidents_ = sc.parallelize([
   (incident, ) for incident in incidents
]).toDF(["incident"])

for var_ in variables_of_interest:
    df = spark.read.parquet("/some/path/Variable={0}".format(var_))
    df.join(incidents_, same_time)

(可选)将一侧标记为足够小以进行广播.

这篇关于如何使用PySpark进行嵌套的for-each循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆