pyspark:将多个数据帧字段传递给 udf [英] pyspark: passing multiple dataframe fields to udf
问题描述
我是 Spark 和 Python 的新手.任何帮助表示赞赏.
I am new to spark and python. Any help appreciated.
我有一个 UDF 并创建了一个带有美国 zipcd、纬度和经度的 spark 数据框
I am having a UDF and created a spark dataframe with US zipcd, latitude and Longitude
UDF:
import math
def distance(origin, destination):
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
* math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
d = radius * c
return d
示例 UDF 输出:
distance((101,121),(-121,-212))
15447.812243421227
15447.812243421227
数据框:
zip=spark.read.option("sep", ",").csv('wasb://hdiazurepoc@dsazurepoc.blob.core.windows.net/main/zip.txt')
zip1=zip.select(zip._c0,zip._c1.cast("Double"),zip._c2.cast("Double"))
示例 zip1 数据:
Sample zip1 data:
zip1.first()
行(_c0=u'00601', _c1=18.180555, _c2=-66.749961)
Row(_c0=u'00601', _c1=18.180555, _c2=-66.749961)
现在我正在尝试将纬度和经度从 df zip1 传递到 udf 距离,但出现需要浮点数"之类的错误.我相信 udf 不是从 df 字段中获取数据,而是将 df 列读取为常量值;因此我得到以下错误.
Now I am trying to pass the latitude and longitude from the df zip1 to the udf distance, but I am getting error like "a float is required". I believe the udf is not getting the data from df fields, instead its reading the df column as constant value; and hence I am getting below error.
z=zip1.select(distance((zip1._c1,100.23),(zip1._c2,-99.21)))
回溯(最近一次调用最后一次):
文件",第 1 行,在
文件",第 5 行,距离
类型错误:需要一个浮点数
Traceback (most recent call last):
File "", line 1, in
File "", line 5, in distance
TypeError: a float is required
请告诉我将 df 字段传递给 udf 的正确方法.
Please let me know the right way to pass the df fields to udf.
推荐答案
我不太确定您拥有的数据模式是什么.但下面的例子是使用 udf
得到你的例子的答案的正确方法.
I'm not so sure what is the data schema you have.
But the following example is the right way to use udf
to get the answer to your example.
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
def distance(origin, destination):
lat1, lon1 = origin
lat2, lon2 = destination
radius = 6371 # km
dlat = math.radians(lat2-lat1)
dlon = math.radians(lon2-lon1)
a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(lat1)) \
* math.cos(math.radians(lat2)) * math.sin(dlon/2) * math.sin(dlon/2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
d = radius * c
return d
df = spark.createDataFrame([([101, 121], [-121, -212])], ["origin", "destination"])
filter_udf = udf(distance, DoubleType())
df.withColumn("distance", filter_udf(df.origin, df.destination))
+----------+------------+------------------+
| origin| destination| distance|
+----------+------------+------------------+
|[101, 121]|[-121, -212]|15447.812243421227|
+----------+------------+------------------+
这篇关于pyspark:将多个数据帧字段传递给 udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!