如何在某些条件下在 Spark DataFrame 中创建新列“计数" [英] how to create new column 'count' in Spark DataFrame under some condition

查看:56
本文介绍了如何在某些条件下在 Spark DataFrame 中创建新列“计数"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个关于连接日志的 DataFrame,其中列 IdtargetIPTime.此 DataFrame 中的每条记录都是与一个系统的连接事件.Id 表示本次连接,targetIP 表示本次目标IP 地址,Time 表示连接时间.有价值观:

I have a DataFrame about connection log with columns Id, targetIP, Time. Every record in this DataFrame is a connection event to one system. Id means this connection, targetIP means the target IP address this time, Time is the connection time. With Values:

<头>
ID时间targetIP
11192.163.0.1
22192.163.0.2
33192.163.0.1
45192.163.0.1
56192.163.0.2
67192.163.0.2
78192.163.0.2

我想在某些条件下创建一个新列:过去 2 个时间单位内与这次的目标 IP 地址的连接数.所以结果DataFrame应该是:

I want to create a new column under some condition: count of connections to this time's target IP address in the past 2 time units. So the result DataFrame should be:

<头>
ID时间targetIP计数
11192.163.0.10
22192.163.0.20
33192.163.0.11
45192.163.0.11
56192.163.0.20
67192.163.0.21
78192.163.0.22

例如ID=7targetIP192.163.0.2 过去2个时间单位连接到系统,即ID=5ID=6,它们的targetIP也是192.163.0.2.所以 ID=7 的计数是 2.

For example, ID=7, the targetIP is 192.163.0.2 Connected to system in past 2 time units, which are ID=5 and ID=6, and their targetIP are also 192.163.0.2. So the count about ID=7 is 2.

期待您的帮助.

推荐答案

您可以使用 count over Window bounded with range between - 2 和当前行,得到最后 2 个 IP 的计数时间单位.

You can use count over Window bounded with range between - 2 and current row, to get the count of IP in the last 2 time units.

使用 Spark SQL,您可以执行以下操作:

Using Spark SQL you can do something like this:

df.createOrReplaceTempView("connection_logs")

df1 = spark.sql("""
    SELECT  *,
            COUNT(*) OVER(PARTITION BY targetIP ORDER BY Time 
                          RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
                          ) -1 AS count
    FROM    connection_logs
    ORDER BY ID
""")

df1.show()

#+---+----+-----------+-----+
#| ID|Time|   targetIP|count|
#+---+----+-----------+-----+
#|  1|   1|192.163.0.1|    0|
#|  2|   2|192.163.0.2|    0|
#|  3|   3|192.163.0.1|    1|
#|  4|   5|192.163.0.1|    1|
#|  5|   6|192.163.0.2|    0|
#|  6|   7|192.163.0.2|    1|
#|  7|   8|192.163.0.2|    2|
#+---+----+-----------+-----+

或者使用 DataFrame API:

Or using DataFrame API:

from pyspark.sql import Window
from pyspark.sql import functions as F

time_unit = lambda x: x

w = Window.partitionBy("targetIP").orderBy(col("Time").cast("int")).rangeBetween(-time_unit(2), 0)

df1 = df.withColumn("count", F.count("*").over(w) - 1).orderBy("ID")

df1.show()

这篇关于如何在某些条件下在 Spark DataFrame 中创建新列“计数"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆