如何在pyspark Hive SQL中获得等效的postgres命令'nth_value'? [英] How to get postgres command 'nth_value' equivalent in pyspark Hive SQL?

查看:39
本文介绍了如何在pyspark Hive SQL中获得等效的postgres命令'nth_value'?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在解决这个例子:https://www.windowfunctions.com/questions/grouping/5
在这里,他们使用 Oracle 或 postgres 命令 nth_value 来获得答案,但这并没有在 pyspark 使用的 Hive SQL 中实现,我想知道如何在 pyspark 中获得相同的结果.

I was solving this example : https://www.windowfunctions.com/questions/grouping/5
Here, they use Oracle or postgres command nth_value to get the answer, but this is not implemented in Hive SQL which is used by pyspark and I was wondering how to obtain the same result in pyspark.

  • 所有大于 4th 的权重都被分配到第四小的权重
  • 前三个最轻的权重被赋值为 99.9

select name, weight, 
coalesce(nth_value(weight, 4) over (order by weight), 99.9) as imagined_weight
from cats 
order by weight

问题:如何使用 pyspark 获得以下结果?

name    weight  imagined_weight
Tigger  3.8 99.9
Molly   4.2 99.9
Ashes   4.5 99.9
Charlie 4.8 4.8
Smudge  4.9 4.8
Felix   5.0 4.8
Puss    5.1 4.8
Millie  5.4 4.8
Alfie   5.5 4.8
Misty   5.7 4.8
Oscar   6.1 4.8
Smokey  6.1 4.8

数据

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark import SparkConf, SparkContext, SQLContext
spark = pyspark.sql.SparkSession.builder.appName('app').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sqc = sqlContext
# spark_df = sqlContext.createDataFrame(pandas_df)

df = pd.DataFrame({
    'name': [
        'Molly', 'Ashes', 'Felix', 'Smudge', 'Tigger', 'Alfie', 'Oscar',
        'Millie', 'Misty', 'Puss', 'Smokey', 'Charlie'
    ],
    'breed': [
        'Persian', 'Persian', 'Persian', 'British Shorthair',
        'British Shorthair', 'Siamese', 'Siamese', 'Maine Coon', 'Maine Coon',
        'Maine Coon', 'Maine Coon', 'British Shorthair'
    ],
    'weight': [4.2, 4.5, 5.0, 4.9, 3.8, 5.5, 6.1, 5.4, 5.7, 5.1, 6.1, 4.8],
    'color': [
        'Black', 'Black', 'Tortoiseshell', 'Black', 'Tortoiseshell', 'Brown',
        'Black', 'Tortoiseshell', 'Brown', 'Tortoiseshell', 'Brown', 'Black'
    ],
    'age': [1, 5, 2, 4, 2, 5, 1, 5, 2, 2, 4, 4]
})

schema = StructType([
    StructField('name', StringType(), True),
    StructField('breed', StringType(), True),
    StructField('weight', DoubleType(), True),
    StructField('color', StringType(), True),
    StructField('age', IntegerType(), True),
])

sdf = sqlContext.createDataFrame(df, schema)
sdf.createOrReplaceTempView("cats")

spark.sql('select * from cats limit 2').show()

我目前的尝试

# My attempt
q = """
select weight from (
  select name,weight, 
         ROW_NUMBER() over (ORDER BY weight) as row_no
  from cats group by weight,name
  ) res 
where res.row_no = 4
"""
spark.sql(q).show()

推荐答案

另一种选择是 row_number() 和一个条件窗口函数:

An alternative option is row_number() and a conditional window function:

select
    name,
    weight,
    coalesce(
        max(case when rn = 4 then weight end) over(order by rn),
        99.9
    ) imagined_weight
from (select c.*, row_number() over(order by weight) rn from cats c) c

这篇关于如何在pyspark Hive SQL中获得等效的postgres命令'nth_value'?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆