PySpark - 时间重叠在RDD对象 [英] PySpark - Time Overlap for Object in RDD

查看:207
本文介绍了PySpark - 时间重叠在RDD对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是基于时间重叠组对象。

在我的 RDD 包含 START_TIME END_TIME

我可能会对此效率不高,但如果它与任何其他对象的任何时间重叠什么我打算做的是分配一个ID重叠根据​​每个对象。我有时间上重叠下来的逻辑。然后,我希望小组由 overlap_id

因此​​,首先,

  mapped_rdd = rdd.map(assign_overlap_id)
final_rdd = mapped_rdd.reduceByKey(combine_objects)

现在这涉及到我的问题。我该如何去写的assign_overlap_id功能?

 高清assign_overlap_id(X):
  ...
  ...
  回报(overlap_id,X)


解决方案

用朴素的解决方案星火SQL和数据帧:

斯卡拉:

 进口org.apache.spark.sql.functions.udf案例类间隔(START_TIME:长,END_TIME:长)VAL RDD = sc.parallelize(
    间隔(0,3)::间隔(1,4)::
    间隔(2,5)::间隔(3,4)::
    间隔(5,8)::间隔(7,10)::无
)VAL DF = sqlContext.createDataFrame(RDD)//简单的检查,如果给定的时间间隔重叠
高清重叠(start_first:长,end_first:龙,
        start_second:长,end_second:长):布尔= {
    (start_second> start_first&安培; start_second< end_first)|
    (end_second> start_first&安培; end_second< end_first)
}//注册UDF和数据帧的别名
//它看起来像星火SQL不支持
//别名在FROM子句[1]所以我们要
//注册DF两次
sqlContext.udf.register(重叠,重叠)
df.registerTempTable(DF1)
df.registerTempTable(DF2)//加入和过滤器
sqlContext.sql(
     SELECT * FROM DF1 DF2 JOIN
     WHERE重叠(df1.start_time,df1.end_time,df2.start_time,df2.end_time)
)。显示

和使用PySpark同样的事情

 从pyspark.sql.functions导入UDF
从pyspark.sql.types进口BooleanTypeRDD = sc.parallelize([
    (0,3),(1,4),
    (2,5),(3,4),
    (5,8),(7,10)
])DF = sqlContext.createDataFrame(RDD,('START_TIME','END_TIME'))高清重叠(start_first,end_first,start_second,end_second):
    返回((start_first< start_second< end_first)或
        (start_first< end_second< end_first))sqlContext.registerFunction(重叠,重叠BooleanType())
df.registerTempTable(DF1)
df.registerTempTable(DF2)sqlContext.sql(
     SELECT * FROM DF1 DF2 JOIN
     WHERE重叠(df1.start_time,df1.end_time,df2.start_time,df2.end_time)
)。显示()

与分组低电平转换通过窗口

一个有点聪明的办法是产生使用一些指定宽度的窗口候选人对。这是一个相当简化的解决方案:

斯卡拉:

  //生成桶的名单对于一个给定的时间间隔
高清genRange(间隔:间隔)=间隔匹配{
    案例间隔(START_TIME,END_TIME)=> {
      (START_TIME / 10L * 10L)至(((end_time时间/ 10)+1)* 10)由1
    }
}
//对于每一个间隔产生对(斗,间隔)
缬氨酸对= rdd.flatMap((ⅰ:时间间隔)=> genRange(ⅰ).MAP((R)=>(R,I)))//加入(在最坏的情况下它仍然是O(n ^ 2)
//但在实践中应该比一个好天真
// 笛卡尔积
VAL候选人=对。
    加入(对)。
    地图({
        情况下(K,(时间间隔(S1,E1),时间间隔(S2,E2)))=> (S1,E1,S2,E2)
   })。不同
//对于每一个候选对检查是否存在重叠
candidates.filter {情况下(S1,E1,S2,E2)=>重叠(S1,E1,S2,E2)}

的Python:

 高清genRange(START_TIME,END_TIME):
    返回的xrange(START_TIME / 10L * 10L,((end_time时间/ 10)+1)* 10)对= rdd.flatMap(拉姆达(S,E):((R,(S,E))在genRange R(S,E)))
考生=(对
    。加入(对)
    .MAP(拉姆达(K,((S1,E1),(S2,E2))):(S1,E1,S2,E2))
    。不同())candidates.filter(拉姆达(S1,E1,S2,E2):重叠(S1,E1,S2,E2))

虽然可以对一些datasest你应该考虑实施国家的最先进的算法,一些像的 NCLIST


  1. http://docs.datastax.com/en/datastax_enterprise/4.6/datastax_enterprise/spark/sparkSqlSupportedSyntax.html

My goal is to group objects based on time overlap.

Each object in my rdd contains a start_time and end_time.

I'm probably going about this inefficiently but what I'm planning on doing is assigning an overlap id to each object based on if it has any time overlap with any of the other objects. I have the logic for time overlap down. Then, I hope to group by that overlap_id.

So first,

mapped_rdd = rdd.map(assign_overlap_id)
final_rdd = mapped_rdd.reduceByKey(combine_objects)

Now this comes to my question. How can I go about writing the assign_overlap_id function?

def assign_overlap_id(x):
  ...
  ...
  return (overlap_id, x)

解决方案

Naive solution using Spark SQL and Data Frames:

Scala:

import org.apache.spark.sql.functions.udf

case class Interval(start_time: Long, end_time: Long)

val rdd = sc.parallelize(
    Interval(0, 3) :: Interval(1, 4) ::
    Interval(2, 5) :: Interval(3, 4) ::
    Interval(5, 8) :: Interval(7, 10) :: Nil
)

val df = sqlContext.createDataFrame(rdd)

// Simple check if a given intervals overlap
def overlaps(start_first: Long, end_first: Long,
        start_second: Long, end_second: Long):Boolean = {
    (start_second > start_first & start_second < end_first) |
    (end_second > start_first & end_second < end_first) 
}

// Register udf and data frame aliases
// It look like Spark SQL doesn't support
// aliases in FROM clause [1] so we have to
// register df twice
sqlContext.udf.register("overlaps", overlaps)
df.registerTempTable("df1")
df.registerTempTable("df2")

// Join and filter
sqlContext.sql("""
     SELECT * FROM df1 JOIN df2
     WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show

And the same thing using PySpark

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

rdd = sc.parallelize([
    (0, 3), (1, 4), 
    (2, 5), (3, 4),
    (5, 8), (7, 10)
])

df = sqlContext.createDataFrame(rdd, ('start_time', 'end_time'))

def overlaps(start_first, end_first, start_second, end_second):
    return ((start_first < start_second < end_first) or
        (start_first < end_second < end_first))

sqlContext.registerFunction('overlaps', overlaps, BooleanType())
df.registerTempTable("df1")
df.registerTempTable("df2")

sqlContext.sql("""
     SELECT * FROM df1 JOIN df2
     WHERE overlaps(df1.start_time, df1.end_time, df2.start_time, df2.end_time)
""").show()

Low level transformations with grouping by window

A little bit smarter approach is to generate candidate pairs using a window of some specified width. Here is a rather simplified solution:

Scala:

// Generates list of "buckets" for a given interval
def genRange(interval: Interval) = interval match {
    case Interval(start_time, end_time) => {
      (start_time / 10L * 10L) to (((end_time / 10) + 1) * 10) by 1
    }
}


// For each interval generate pairs (bucket, interval)
val pairs = rdd.flatMap( (i: Interval) => genRange(i).map((r) => (r, i)))

// Join (in the worst case scenario it is still O(n^2)
// But in practice should be better than a naive
// Cartesian product
val candidates = pairs.
    join(pairs).
    map({
        case (k, (Interval(s1, e1), Interval(s2, e2))) => (s1, e1, s2, e2)
   }).distinct


// For each candidate pair check if there is overlap
candidates.filter { case (s1, e1, s2, e2) => overlaps(s1, e1, s2, e2) }

Python:

def genRange(start_time, end_time):
    return xrange(start_time / 10L * 10L, ((end_time / 10) + 1) * 10)

pairs = rdd.flatMap(lambda (s, e): ((r, (s, e)) for r in genRange(s, e)))
candidates = (pairs
    .join(pairs)
    .map(lambda (k, ((s1, e1), (s2, e2))): (s1, e1, s2, e2))
    .distinct())

candidates.filter(lambda (s1, e1, s2, e2): overlaps(s1, e1, s2, e2))

While it can be sufficient on some datasest for a production ready solution you should consider implementing some state-of-the-art algorithm like NCList.

  1. http://docs.datastax.com/en/datastax_enterprise/4.6/datastax_enterprise/spark/sparkSqlSupportedSyntax.html

这篇关于PySpark - 时间重叠在RDD对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆