pyspark-sql相关内容
使用此代码查找模态: 将 numpy 导入为 npnp.random.seed(1)df2 = sc.parallelize([(int(x), ) for x in np.random.randint(50, size=10000)]).toDF(["x"])cnts = df2.groupBy("x").count()模式 = cnts.join(cnts.agg(max("count").
..
我有如下数据 -----------------------------地方 |关键|权重-----------------------------亚马逊 |狮子|[ 34, 23, 56 ]北|熊 |[ 90, 45]亚马逊 |狮子|[ 38, 30, 50 ]亚马逊 |熊 |[ 45 ]亚马逊 |熊 |[ 40 ] 我试图得到如下结果 --------------------------
..
使用此代码查找模态: 将 numpy 导入为 npnp.random.seed(1)df2 = sc.parallelize([(int(x), ) for x in np.random.randint(50, size=10000)]).toDF(["x"])cnts = df2.groupBy("x").count()模式 = cnts.join(cnts.agg(max("count").
..
我需要帮助来使用 PySpark 查找 Hive 表的唯一分区列名称.该表可能有多个分区列,最好输出应返回 Hive 表的分区列列表. 如果结果还包括分区列的数据类型,那就太好了. 任何建议都会有所帮助. 解决方案 可以使用desc完成,如下图: df=spark.sql("""desc test_dev_db.partition_date_table""")>>>df.sh
..
我目前正在根据 ID 列表生成数据帧 - 基于一个 ID 的每个查询都会返回一个非常大型 PostgreSQL 表的可管理子集.然后我根据需要写出的文件结构对该输出进行分区.问题是我达到了速度限制,而且我的执行者资源利用率严重不足. 我不确定这是否是重新思考我的架构的问题,或者是否有一些简单的方法可以解决这个问题,但基本上我想获得更多的任务并行化,但未能让我所有的 16 个执行者都忙同时尝试
..
我是 Spark 新手.我有一个简单的 pyspark 脚本.它读取一个 json 文件,将其展平并将其作为 parquet 压缩文件写入 S3 位置. 读取和转换步骤运行得非常快,并使用了 50 个执行程序(我在 conf 中设置).但写入阶段耗时较长,只写入一个大文件(480MB). 保存的文件数量是如何决定的?写操作可以以某种方式加速吗? 谢谢,拉姆. 解决方案 输
..
PySpark 有这个奇怪的问题.它似乎正在尝试将前一个字段的架构应用于下一个字段,因为它正在处理. 我能想到的最简单的测试用例: %pyspark从 pyspark.sql.types 导入(日期类型,结构类型,结构域,字符串类型,)从日期时间导入日期从 pyspark.sql 导入行架构 = 结构类型([StructField("date", DateType(), True),S
..
我已经开始在我的一个项目中使用 pyspark.我正在测试不同的命令来探索库的功能,但我发现了一些我不明白的东西. 拿这个代码: from pyspark import SparkContext从 pyspark.sql 导入 HiveContext从 pyspark.sql.dataframe 导入数据框sc = SparkContext(sc)hc = HiveContext(sc)h
..
我需要使用 pyspark 相对于其他数据帧 df_col 更改数据帧 df 的列名 df +----+---+----+----+|代码|身份证|姓名|工作|+----+---+----+----+|ASD|101|约翰|开发||KLJ|102|本|产品|+----+---+----+----+ df_col +-----------+-----------+|col_current|
..
我们可以使用pyspark中的window function找到时间序列数据的滚动/移动平均值. 我正在处理的数据没有任何timestamp 列,但它确实有一个严格递增 列frame_number.数据看起来像这样. d = [{'session_id':1,'frame_number':1,'rtd':11.0,'rtd2':11.0,},{'session_id':1,'frame_n
..
我正在使用 AWS Glue 中的 python3.6 环境在 pyspark 中工作.我有这张桌子: +----+-----+-----+-----+|年|月|总计|循环|+----+-----+-----+-----+|2012|1|20|loop1||2012|2|30|loop1||2012|1|10|loop2||2012|2|5|loop2||2012|1|50|loop3||201
..
我有两个嵌套数组,一个是字符串,另一个是浮点数.我想基本上把它压缩起来,每行有一个 (value, var) 组合.我试图只用一个数据框来做到这一点,而不必求助于 rdds 或 udfs,认为这会更干净、更快. 我可以将值数组、每行变量转换为一个值、变量、每行 1 个的结构,但是由于我的数组大小不同,我必须在不同的范围内运行我的数组理解.所以我想我可以在列中指定长度并使用它.但是因为我将使用
..
我想维护日期排序顺序,对多列使用 collect_list,所有列都具有相同的日期顺序.我将在同一个数据框中需要它们,以便我可以利用它们来创建时间序列模型输入.以下是“train_data"的示例: 我正在使用带有 PartitionBy 的窗口,以通过每个 Syscode_Stn 的 tuning_evnt_start_dt 来确保排序顺序.我可以使用以下代码创建一列: from pys
..
假设我有一个像这样的数据框: ID 媒体1 imgix.com/20830dk2 imgix.com/202398pwe3 imgix.com/lvw0923dk4 imgix.com/082kldcm4 imgix.com/lks032m4 imgix.com/903248 我想结束: ID 媒体1 imgix.com/20830dk2 imgix.com/202398pwe3 imgix
..
对于数据帧中的每个特征向量,我需要 k 个最近的邻居.我正在使用来自 pyspark 的 BucketedRandomProjectionLSHModel. 创建模型的代码 brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=12345, bucketLength=n)模型 = brp
..
我有两个表,table_a 和 table_b,table_a 包含 216646500 行,7155998163 字节;table_b 包含 1462775 行,2096277141 字节 table_a 的 schema 是: c_1, c_2, c_3, c_4 ;table_b 的 schema 是:c_2, c_5, c_6, ...(大约 10 列) 我想做一个 left_
..
在读取不一致的模式编写的镶木地板文件组时,我们在模式合并方面遇到了问题.在切换到手动指定架构时,我收到以下错误.任何指针都会有所帮助. java.lang.UnsupportedOperationException:未实现的类型:StringType在 org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnRea
..
我在 schema = StructType([ \StructField("CountryName", StringType(), True), \StructField("CountryCode", StringType(), True), \StructField("IndicatorName", StringType(), True), \StructField("IndicatorCo
..
我从 Oracle 中提取了数据,并且该表中有一个带有 CLOB DataType 的列,我将其设为 String 以获取 HDFS 中的数据.现在我必须拆除 CLOB 数据并在 Hive 中为其创建一个单独的表. 我有 txt 格式的 HDFS 文件.我可以分离 CLOB 数据并希望为 CLOB 制作 DataFrame CLOB 采用以下格式: [name] Bob [年龄] 2
..
在读取不一致的模式编写的镶木地板文件组时,我们在模式合并方面遇到了问题.在切换到手动指定架构时,我收到以下错误.任何指针都会有所帮助. java.lang.UnsupportedOperationException:未实现的类型:StringType在 org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnRea
..