在某些情况下如何在Spark DataFrame中创建新列"count" [英] how to create new column 'count' in Spark DataFrame under some condition

查看:100
本文介绍了在某些情况下如何在Spark DataFrame中创建新列"count"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个关于连接日志的DataFrame,其中包含 Id targetIP Time 列.此DataFrame中的每个记录都是到一个系统的连接事件.Id表示此连接, targetIP 表示这次的目标IP地址,Time是连接时间.带有值:

I have a DataFrame about connection log with columns Id, targetIP, Time. Every record in this DataFrame is a connection event to one system. Id means this connection, targetIP means the target IP address this time, Time is the connection time. With Values:

<身体>
ID 时间 targetIP
1 1 192.163.0.1
2 2 192.163.0.2
3 3 192.163.0.1
4 5 192.163.0.1
5 6 192.163.0.2
6 7 192.163.0.2
7 8 192.163.0.2

我想在某种情况下创建一个新列:过去2个时间单位中与该时间目标IP地址的连接数.因此,结果DataFrame应该是:

I want to create a new column under some condition: count of connections to this time's target IP address in the past 2 time units. So the result DataFrame should be:

<身体>
ID 时间 targetIP 计数
1 1 192.163.0.1 0
2 2 192.163.0.2 0
3 3 192.163.0.1 1
4 5 192.163.0.1 1
5 6 192.163.0.2 0
6 7 192.163.0.2 1
7 8 192.163.0.2 2

例如, ID = 7 targetIP 192.163.0.2 在过去2个时间单位(> ID = 5 ID = 6 ,以及它们的 targetIP 也是 192.163.0.2 .因此,有关 ID = 7 的计数为2.

For example, ID=7, the targetIP is 192.163.0.2 Connected to system in past 2 time units, which are ID=5 and ID=6, and their targetIP are also 192.163.0.2. So the count about ID=7 is 2.

期待您的帮助.

推荐答案

您可以在范围介于-2和当前行之间的Window上使用 count 来获取最后2个IP的数量时间单位.

You can use count over Window bounded with range between - 2 and current row, to get the count of IP in the last 2 time units.

使用Spark SQL,您可以执行以下操作:

Using Spark SQL you can do something like this:

df.createOrReplaceTempView("connection_logs")

df1 = spark.sql("""
    SELECT  *,
            COUNT(*) OVER(PARTITION BY targetIP ORDER BY Time 
                          RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
                          ) -1 AS count
    FROM    connection_logs
    ORDER BY ID
""")

df1.show()

#+---+----+-----------+-----+
#| ID|Time|   targetIP|count|
#+---+----+-----------+-----+
#|  1|   1|192.163.0.1|    0|
#|  2|   2|192.163.0.2|    0|
#|  3|   3|192.163.0.1|    1|
#|  4|   5|192.163.0.1|    1|
#|  5|   6|192.163.0.2|    0|
#|  6|   7|192.163.0.2|    1|
#|  7|   8|192.163.0.2|    2|
#+---+----+-----------+-----+

或使用DataFrame API:

Or using DataFrame API:

from pyspark.sql import Window
from pyspark.sql import functions as F

time_unit = lambda x: x

w = Window.partitionBy("targetIP").orderBy(col("Time").cast("int")).rangeBetween(-time_unit(2), 0)

df1 = df.withColumn("count", F.count("*").over(w) - 1).orderBy("ID")

df1.show()

这篇关于在某些情况下如何在Spark DataFrame中创建新列"count"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆