在新列上过滤Spark DataFrame [英] Filtering Spark DataFrame on new column

查看:82
本文介绍了在新列上过滤Spark DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

上下文:我的数据集太大而无法容纳内存,因此我正在训练Keras RNN.我正在AWS EMR集群上使用PySpark来分批训练模型,该模型的大小足以存储在内存中.我无法实现使用 elephas 分发的模型,并且我怀疑这与我的模型是有状态的有关.不过我不确定.

Context: I have a dataset too large to fit in memory I am training a Keras RNN on. I am using PySpark on an AWS EMR Cluster to train the model in batches that are small enough to be stored in memory. I was not able to implement the model as distributed using elephas and I suspect this is related to my model being stateful. I'm not entirely sure though.

数据框对每个用户都有一行,并且从安装之日起0到29天之间已经过去了几天.查询数据库后,我对该数据框进行了许多操作:

The dataframe has a row for every user and days elapsed from the day of install from 0 to 29. After querying the database I do a number of operations on the dataframe:

query = """WITH max_days_elapsed AS (
        SELECT user_id,
            max(days_elapsed) as max_de
        FROM table
        GROUP BY user_id
        )
        SELECT table.*
        FROM table
            LEFT OUTER JOIN max_days_elapsed USING (user_id)
        WHERE max_de = 1
            AND days_elapsed < 1"""

df = read_from_db(query) #this is just a custom function to query our database

#Create features vector column
assembler = VectorAssembler(inputCols=features_list, outputCol="features")
df_vectorized = assembler.transform(df)

#Split users into train and test and assign batch number
udf_randint = udf(lambda x: np.random.randint(0, x), IntegerType())
training_users, testing_users = df_vectorized.select("user_id").distinct().randomSplit([0.8,0.2],123)
training_users = training_users.withColumn("batch_number", udf_randint(lit(N_BATCHES)))

#Create and sort train and test dataframes
train = df_vectorized.join(training_users, ["user_id"], "inner").select(["user_id", "days_elapsed","batch_number","features", "kpi1", "kpi2", "kpi3"])
train = train.sort(["user_id", "days_elapsed"])
test = df_vectorized.join(testing_users, ["user_id"], "inner").select(["user_id","days_elapsed","features", "kpi1", "kpi2", "kpi3"])
test = test.sort(["user_id", "days_elapsed"])

我遇到的问题是,如果没有缓存训练,我似乎无法在batch_number上进行过滤.我可以过滤数据库中原始数据集中的任何列,但不能查询数据库后在pyspark中生成的任何列:

The problem I am having is that I cannot seem to be able to filter on batch_number without caching train. I can filter on any of the columns that are in the original dataset in our database, but not on any column I have generated in pyspark after querying the database:

此: train.filter(train ["days_elapsed"] == 0).select("days_elapsed").distinct.show()仅返回0.

但是,所有这些都返回0到9之间的所有批号,而没有任何过滤:

But, all of these return all of the batch numbers between 0 and 9 without any filtering:

  • train.filter(train ["batch_number"] == 0).select("batch_number").distinct().show()
  • train.filter(train.batch_number == 0).select("batch_number").distinct().show()
  • train.filter("batch_number = 0").select("batch_number").distinct().show()
  • train.filter(col("batch_number")== 0).select("batch_number").distinct().show()

这也不起作用:

train.createOrReplaceTempView("train_table")
batch_df = spark.sql("SELECT * FROM train_table WHERE batch_number = 1")
batch_df.select("batch_number").distinct().show()

如果我先进行train.cache(),所有这些工作.这是绝对必要的,还是有一种无需缓存的方法?

All of these work if I do train.cache() first. Is that absolutely necessary or is there a way to do this without caching?

推荐答案

火花> = 2.3 (?-取决于SPARK-22629的进度)

Spark >= 2.3 (? - depending on a progress of SPARK-22629)

应该可以使用 asNondeterministic 方法禁用某些优化.

It should be possible to disable certain optimization using asNondeterministic method.

火花<2.3

不要使用UDF生成随机数.首先,引用文档:

Don't use UDF to generate random numbers. First of all, to quote the docs:

用户定义的函数必须是确定性的.由于优化,与查询中存在的重复调用相比,可以消除重复调用,甚至可以多次调用该函数.

The user-defined functions must be deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query.

即使不是UDF,也存在Spark的细微之处,这使得在处理单个记录时几乎不可能实现此权利.

Even if it wasn't for UDF, there are Spark subtleties, which make it almost impossible to implement this right, when processing single records.

Spark已经提供了 rand :

Spark already provides rand:

根据U [0.0,1.0]生成具有独立且分布均匀的(i.i.d.)样本的随机列.

Generates a random column with independent and identically distributed (i.i.d.) samples from U[0.0, 1.0].

<代码> randn

从标准正态分布中生成具有独立且均匀分布(i.i.d.)样本的列.

Generates a column with independent and identically distributed (i.i.d.) samples from the standard normal distribution.

可用于构建更复杂的生成器功能.

which can be used to build more complex generator functions.

注意:

您的代码可能还有其他问题,但这从一开始就令人无法接受(在PySpark中生成随机数 pyspark.生成随机数的Transformer始终生成相同的数字).

There can be some other issues with your code but this makes it unacceptable from the beginning (Random numbers generation in PySpark, pyspark. Transformer that generates a random number generates always the same number).

这篇关于在新列上过滤Spark DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆