pyspark:将多个数据帧字段传递给 udf [英] pyspark: passing multiple dataframe fields to udf

查看:49
本文介绍了pyspark:将多个数据帧字段传递给 udf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spark 和 Python 的新手.任何帮助表示赞赏.

I am new to spark and python. Any help appreciated.

我有一个 UDF 并创建了一个带有美国 zipcd、纬度和经度的 spark 数据框

I am having a UDF and created a spark dataframe with US zipcd, latitude and Longitude

UDF:

import math
def distance(origin, destination):
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
    * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
d = radius * c
return d

示例 UDF 输出:

distance((101,121),(-121,-212)) 

15447.812243421227

15447.812243421227

数据框:

zip=spark.read.option("sep", ",").csv('wasb://hdiazurepoc@dsazurepoc.blob.core.windows.net/main/zip.txt')
zip1=zip.select(zip._c0,zip._c1.cast("Double"),zip._c2.cast("Double"))

示例 zip1 数据:

Sample zip1 data:

zip1.first()        

行(_c0=u'00601', _c1=18.180555, _c2=-66.749961)

Row(_c0=u'00601', _c1=18.180555, _c2=-66.749961)

现在我正在尝试将纬度和经度从 df zip1 传递到 udf 距离,但出现需要浮点数"之类的错误.我相信 udf 不是从 df 字段中获取数据,而是将 df 列读取为常量值;因此我得到以下错误.

Now I am trying to pass the latitude and longitude from the df zip1 to the udf distance, but I am getting error like "a float is required". I believe the udf is not getting the data from df fields, instead its reading the df column as constant value; and hence I am getting below error.

z=zip1.select(distance((zip1._c1,100.23),(zip1._c2,-99.21)))

回溯(最近一次调用最后一次):
文件",第 1 行,在
文件",第 5 行,距离
类型错误:需要一个浮点数

Traceback (most recent call last):
File "", line 1, in
File "", line 5, in distance
TypeError: a float is required

请告诉我将 df 字段传递给 udf 的正确方法.

Please let me know the right way to pass the df fields to udf.

推荐答案

我不太确定您拥有的数据模式是什么.但下面的例子是使用 udf 得到你的例子的答案的正确方法.

I'm not so sure what is the data schema you have. But the following example is the right way to use udf to get the answer to your example.

from pyspark.sql.functions import *
from pyspark.sql.types import *
import math

def distance(origin, destination):
    lat1, lon1 = origin
    lat2, lon2 = destination
    radius = 6371 # km
    dlat = math.radians(lat2-lat1)
    dlon = math.radians(lon2-lon1)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
    * math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c
    return d

df = spark.createDataFrame([([101, 121], [-121, -212])], ["origin", "destination"])
filter_udf = udf(distance, DoubleType())
df.withColumn("distance", filter_udf(df.origin, df.destination))

+----------+------------+------------------+
|    origin| destination|          distance|
+----------+------------+------------------+
|[101, 121]|[-121, -212]|15447.812243421227|
+----------+------------+------------------+

这篇关于pyspark:将多个数据帧字段传递给 udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆