如何识别 Spark Dataframe 中的离散状态(振荡)? [英] How to identify discrete states (oscillations) in Spark Dataframe?

查看:64
本文介绍了如何识别 Spark Dataframe 中的离散状态(振荡)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

用户 U1 在时间 t1、t2、t3 移动通过区域 Z1、Z2、Z3

A user U1 moves through the zones Z1, Z2, Z3 at time t1, t2, t3

用户 U1 在 t1、t2、t3、t4 处来回穿过 Z1、Z2 区域

A user U1 goes back and forth through the zones Z1, Z2 at t1, t2,t3, t4

这就是我所说的用户 « OSCILLATING ».

This is what I call a user « OSCILLATING ».

这被认为是一种振荡:用户 U1 从 Z1 到 Z2,然后再到 Z1.用户访问Z1不止一次,尽管他只访问了Z2一次.

This is considered as an oscillation: the user U1 goes from Z1 to Z2 and then to Z1. The user visits Z1 more than one Time eventhough he visited Z2 only once.

Z1 ==>Z2 ==>Z1

Z1 ==> Z2 ==> Z1

用户 U1 在时间 t1、t2、t3、t4 t5 分别从 Z1 到 Z3,然后到 Z2、Z3 和 Z1.

The user U1 goes from Z1 to Z3 then to Z2, Z3 and Z1, respectively at time t1,t2, t3,t4 t5.

用户在 3 个区域之间摆动.

The user is oscillation between the 3 zones.

对于前面的例子,我们认为这种运动是一种振荡,因为用户访问 Z1 和 Z3 的次数超过了一次,尽管他只访问了 Z2 一次.

As for the previous example, we consider this movement as an oscillation because the user visits and Z1 and Z3 more than one time eventhough he only visited Z2 only once.

为了便于计算,我们可以将用户在其中振荡的最大区域数设置为 5 个区域.

For ease of computation we can set the number of zones max that a user oscillates in to 5 zones.

我想创建一个跟踪振荡的列.

I would like to give to create a column that tracks the oscillation.

对于给定的用户,如果他正在振荡,则为行提供相同的振荡 ID.

For a given user, if he is oscillating, give the rows the same oscillating ID.

如果没有振荡,设置为NULL或设置为0

If there is no oscillation, set it to NULL or set it to 0

例如:

要复制/粘贴的示例数据:

Example DATA to copy/paste:

Zone, time, person, Oscillation_ID
A,    1,    ABC,         1
B,    2,    ABC,         1
A,    3,    ABC,         1
A,    4,    ABC,         1
B,    5,    ABC,         1
A,    6,    ABC,         1
C,    7,    ABC,         2
D,    8,    ABC,         2
E,    9,    ABC,         2
C,    10,    ABC,         2
E,    11,    ABC,         2
D,    12,    ABC,         2
C,    13,    ABC,         2
C,    14,    ABC,         2
D,    15,    ABC,         2
E,    16,    ABC,         2
C,    17,    ABC,         2
Z,    18,    ABC,         3
X,    19,    ABC,         4
Y,    20,    ABC,         5

因为我正在处理数十亿条记录,所以我需要一个有效的解决方案.

Because I am working with billions of records I would need an efficient solution.

我使用的是 Spark 2.3

I am using Spark 2.3

我接受 Scala 和 Python (pyspark) 两种解决方案.

I accept both scala and python (pyspark) solutions.

推荐答案

这是一个使用 Pandas UDF 的窗口函数在按人分组后为每一行分配一个振荡 id 的解决方案.

Here's a solution using a window function with Pandas UDF to assign an oscillation id to each row after grouping by person.

我没有限制振荡中的最大区域,因为这会引发一系列进一步的业务逻辑问题.

I haven't limited max zones in an oscillation as that throws up a bunch of further business logic questions.

我已经将行分组在同一振荡中,直到证明否则,即示例数据集中的最后两行处于同一振荡中.

I have treated rows to be grouped in the same oscillation until proven otherwise i.e. last two rows in your example dataset are in the same oscillation.

假设输入数据按时间排序:

Assuming the input data is ordered by time:

@pandas_udf(IntegerType())
def assign_oscillation(zones: pd.Series) -> int:
  current_oscillation_zones = []
  is_oscillation_frozen = False
  id = 1

  for zone in zones.tolist():
    if zone in current_oscillation_zones and not is_oscillation_frozen:
      is_oscillation_frozen = True
    elif zone not in current_oscillation_zones and is_oscillation_frozen:
      id += 1
      is_oscillation_frozen = False
      current_oscillation_zones = [zone]
    elif zone in current_oscillation_zones and is_oscillation_frozen:
      pass
    elif zone not in current_oscillation_zones and not is_oscillation_frozen:
      current_oscillation_zones.append(zone)
  return id

windowSpec = (Window.partitionBy(col('person'))
              .orderBy(col('time'))
              .rangeBetween(-sys.maxsize, 0))


df.withColumn('Oscillation_ID', assign_oscillation('Zone').over(windowSpec)).show()

我有 PySpark 3 和 Python 3.8.

I have PySpark 3 and Python 3.8.

PySpark 2 可能不支持窗口函数中的 PandasUDF.这是一个不太优雅的解决方案,使用 PandasUDF 中的生成器和 PySpark 2.3 兼容的 groupBy:

PandasUDF within a window function may not be supported in PySpark 2. Here's a less elegant solution using a generator within a PandasUDF with a groupBy that is PySpark 2.3 compatible:

def oscillation_generator(rows: pd.DataFrame) -> pd.DataFrame:
  current_oscillation = pd.DataFrame(data=None)
  current_oscillation_zones = []
  is_oscillation_frozen = False
  id = 1

  for _, row in rows.iterrows():
    if row['Zone'] in current_oscillation_zones[:-1] and not is_oscillation_frozen:
      is_oscillation_frozen = True
      row['Oscillation_ID'] = id
      current_oscillation = current_oscillation.append(row)
    elif row['Zone'] not in current_oscillation_zones and is_oscillation_frozen:
      yield current_oscillation
      id += 1
      is_oscillation_frozen = False
      row['Oscillation_ID'] = id
      current_oscillation = pd.DataFrame(data=[row])
      current_oscillation_zones = [row['Zone']]
    elif row['Zone'] in current_oscillation_zones and is_oscillation_frozen:
      row['Oscillation_ID'] = id
      current_oscillation = current_oscillation.append(row)
    elif row[
      'Zone'] not in current_oscillation_zones and not is_oscillation_frozen:
      current_oscillation_zones.append(row['Zone'])
      row['Oscillation_ID'] = id
      current_oscillation = current_oscillation.append(row)
  yield current_oscillation

@pandas_udf(StructType(
  [StructField('Zone', StringType()),
   StructField('time', IntegerType()),
   StructField('person', StringType()),
   StructField('Oscillation_ID', IntegerType())]), PandasUDFType.GROUPED_MAP)
def assign_oscillations(rows: pd.DataFrame) -> pd.DataFrame:
  oscillations = oscillation_generator(rows)
  return pd.concat([oscillation for oscillation in oscillations])


df.groupBy(['person']).apply(assign_oscillations).show()

这篇关于如何识别 Spark Dataframe 中的离散状态(振荡)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆