如何使用(Py)Spark对数据集中的数据点之间的距离求和? [英] How to sum distances between data points in a dataset using (Py)Spark?

查看:105
本文介绍了如何使用(Py)Spark对数据集中的数据点之间的距离求和?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个时间段内用户的纬度/经度格式的位置数据集.我想计算这些用户的行进距离.样本数据集:

I have a dataset of locations in Lat/Lon format of users in a time period. I would like to calculate the distance these users traveled. Sample dataset:

|时间戳|用户|纬度|经度| | 1462838468 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838512 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838389 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838497 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1465975885 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 | | 1457723815 | 405C238E25FE0B9E7 ... | 37.177322 | -7.426781 | | 1457897289 | 405C238E25FE0B9E7 ... || 37.177922 | -7.447443 | | 1457899229 | 405C238E25FE0B9E7 ... | 37.177922 | -7.447443 | | 1457972626 | 405C238E25FE0B9E7 ... | 37.18059 | -7.46128 | | 1458062553 | 405C238E25FE0B9E7 ... || 37.177322 | -7.426781 | | 1458241825 | 405C238E25FE0B9E7 ... | 37.178172 | -7.444512 | | 1458244457 | 405C238E25FE0B9E7 ... | 37.178172 | -7.444512 | | 1458412513 | 405C238E25FE0B9E7 ... || 37.177322 | -7.426781 | | 1458412292 | 405C238E25FE0B9E7 ... | 37.177322 | -7.426781 | | 1465197963 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 | | 1465202192 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 | | 1465923817 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 | | 1465923766 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 | | 1465923748 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 | | 1465923922 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 |

| Timestamp| User| Latitude|Longitude| |1462838468|49B4361512443A4DA...|39.777982|-7.054599| |1462838512|49B4361512443A4DA...|39.777982|-7.054599| |1462838389|49B4361512443A4DA...|39.777982|-7.054599| |1462838497|49B4361512443A4DA...|39.777982|-7.054599| |1465975885|6E9E0581E2A032FD8...|37.118362|-8.205041| |1457723815|405C238E25FE0B9E7...|37.177322|-7.426781| |1457897289|405C238E25FE0B9E7...|37.177922|-7.447443| |1457899229|405C238E25FE0B9E7...|37.177922|-7.447443| |1457972626|405C238E25FE0B9E7...| 37.18059| -7.46128| |1458062553|405C238E25FE0B9E7...|37.177322|-7.426781| |1458241825|405C238E25FE0B9E7...|37.178172|-7.444512| |1458244457|405C238E25FE0B9E7...|37.178172|-7.444512| |1458412513|405C238E25FE0B9E7...|37.177322|-7.426781| |1458412292|405C238E25FE0B9E7...|37.177322|-7.426781| |1465197963|6E9E0581E2A032FD8...|37.118362|-8.205041| |1465202192|6E9E0581E2A032FD8...|37.118362|-8.205041| |1465923817|6E9E0581E2A032FD8...|37.118362|-8.205041| |1465923766|6E9E0581E2A032FD8...|37.118362|-8.205041| |1465923748|6E9E0581E2A032FD8...|37.118362|-8.205041| |1465923922|6E9E0581E2A032FD8...|37.118362|-8.205041|

我曾考虑过使用自定义的聚合函数,但似乎对此没有Python支持.而且,操作需要按特定顺序在相邻点上完成,所以我不知道自定义聚合器是否可以工作.

I have thought of using a custom aggregator function but it seems there is no Python support for this. Moreover the operations need to be done on adjacent points in a specific order, so I don't know if a custom aggregator would work.

我也看过reduceByKey,但是距离函数似乎无法满足操作员的要求.

I have also looked at reduceByKey but the operator requirements don't seem to be met by the distance function.

是否有一种方法可以在Spark中高效地执行此操作?

Is there a way to perform this operation in an efficient manner in Spark?

推荐答案

它看起来像是窗口函数的工作.假设我们将距离定义为:

It looks like a job for window functions. Assuming we define distance as:

from pyspark.sql.functions import acos, cos, sin, lit, toRadians

def dist(long_x, lat_x, long_y, lat_y):
    return acos(
        sin(toRadians(lat_x)) * sin(toRadians(lat_y)) + 
        cos(toRadians(lat_x)) * cos(toRadians(lat_y)) * 
            cos(toRadians(long_x) - toRadians(long_y))
    ) * lit(6371.0)

您可以将窗口定义为:

from pyspark.sql.window import Window

w = Window().partitionBy("User").orderBy("Timestamp")

并使用lag计算连续观察之间的距离:

and compute distances between consecutive observations using lag:

from pyspark.sql.functions import lag

df.withColumn("dist", dist(
    "longitude", "latitude",
    lag("longitude", 1).over(w), lag("latitude", 1).over(w)
).alias("dist"))

之后,您可以执行标准聚合.

After that you can perform standard aggregation.

这篇关于如何使用(Py)Spark对数据集中的数据点之间的距离求和?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆