pyspark相关内容

如何使用 Python 类处理 RDD?

我正在 Spark 中实现一个模型作为 python 类,并且任何时候我尝试将类方法映射到 RDD 时它都会失败.我的实际代码更复杂,但这个简化版本是问题的核心: 类模型(对象):def __init__(self):self.data = sc.textFile('path/to/data.csv')# 其他杂项设置def run_model(self):self.data = self.da ..
发布时间:2021-12-22 21:19:38 Python

在pyspark中按行连接字符串

我有一个 pyspark 数据框 医生 |病人约翰 |山姆约翰 |彼得约翰 |罗宾本 |玫瑰本 |灰色的 并且需要按行连接患者姓名,以便得到如下输出: 医生 |病人约翰 |山姆、彼得、罗宾本 |玫瑰色、灰色 有人可以帮助我在 pyspark 中创建此数据框吗? 提前致谢. 解决方案 我能想到的最简单的方法就是使用collect_list import pyspark.sql ..
发布时间:2021-12-22 21:19:29 Python

如何在 PySpark 中处理数据之前在所有 Spark 工作器上运行函数?

我正在使用 YARN 在集群中运行 Spark Streaming 任务.集群中的每个节点都运行多个 spark 工作线程.在流开始之前,我想对集群中所有节点上的所有工作线程执行“设置"功能. 流任务将传入的消息分类为垃圾邮件或非垃圾邮件,但在此之前,它需要将最新的预训练模型从 HDFS 下载到本地磁盘,如以下伪代码示例: def fetch_models():如果 hadoop.vers ..
发布时间:2021-12-22 21:19:14 Python

如何从 pyspark 设置 hadoop 配置值

SparkContext 的 Scala 版本有这个属性 sc.hadoopConfiguration 我已经成功地使用它来设置 Hadoop 属性(在 Scala 中) 例如 sc.hadoopConfiguration.set("my.mapreduce.setting","someVal") 然而,SparkContext 的 python 版本缺少那个访问器.有没有办法将 H ..
发布时间:2021-12-22 21:19:05 其他开发

计算pyspark中数据帧所有行之间的余弦相似度

我有一个数据集,其中包含工人的人口统计信息,例如年龄性别、地址等以及他们的工作地点.我从数据集创建了一个 RDD 并将其转换为 DataFrame. 每个 ID 有多个条目.因此,我创建了一个 DataFrame,其中只包含工人的 ID 和他/她工作过的各个办公地点. |----------|----------------||**ID** **Office_Loc** ||------ ..
发布时间:2021-12-22 21:18:26 Python

如何在同一个 Spark 项目中同时使用 Scala 和 Python?

是否可以将 Spark RDD 通过管道传输到 Python? 因为我需要一个 python 库来对我的数据进行一些计算,但是我的主要 Spark 项目是基于 Scala 的.有没有办法将它们混合或让 python 访问相同的 spark 上下文? 解决方案 您确实可以使用 Scala 和 Spark 以及常规 Python 脚本输出到 Python 脚本. test.py ..
发布时间:2021-12-22 21:18:21 Python

连接两个 PySpark 数据帧

我正在尝试将两个 PySpark 数据帧与一些仅位于其中的列连接起来: from pyspark.sql.functions import randn, randdf_1 = sqlContext.range(0, 10)+--+|身份证|+--+|0||1||2||3||4||5||6||7||8||9|+--+df_2 = sqlContext.range(11, 20)+--+|身份证|+ ..
发布时间:2021-12-22 21:18:15 Python

如何在 spark 中设置驱动程序的 python 版本?

我使用的是 spark 1.4.0-rc2,所以我可以将 python 3 与 spark 一起使用.如果我将 export PYSPARK_PYTHON=python3 添加到我的 .bashrc 文件中,我可以与 python 3 交互运行 spark.但是,如果我想在本地模式下运行独立程序,我会收到一个错误: 异常:worker 中的 Python 版本 3.4 与驱动程序 2.7 中的版 ..
发布时间:2021-12-22 21:15:53 其他开发

pySpark 映射多列

我需要能够使用多列比较两个数据框. pySpark 尝试 # 从字典中的引用表中获取 PrimaryLookupAttributeValue 值,以将它们与 df1 进行比较.primaryAttributeValue_List = [ p.PrimaryLookupAttributeValue for p in AttributeLookup.select('PrimaryLookupA ..
发布时间:2021-12-22 21:15:39 其他开发

熊猫分组并找到所有列的第一个非空值

我有如下的熊猫 DF, id 年龄 性别 国家 sales_year1 无 M 印度 20162 23 F 印度 20161 20 M 印度 20152 25 F 印度 20153 30 M 印度 20194 36 无 印度 2019 我想根据 id 分组,根据 sales_date 取最新的 1 行,所有非空元素. 预期输出, id 年龄 性别 国家 sales_year1 20 M ..
发布时间:2021-12-22 21:13:11 Python

Pyspark 向数据帧添加顺序和确定性索引

我需要使用三个非常简单的约束将索引列添加到数据框: 从0开始 按顺序进行 确定性 我确定我遗漏了一些明显的东西,因为我发现的示例对于这样一个简单的任务来说看起来非常复杂,或者使用了非顺序、非确定性越来越单调的 id.我不想使用索引进行压缩,然后必须将以前分隔的列分开,这些列现在位于单个列中,因为我的数据帧以 TB 为单位,这似乎没有必要.我不需要按任何东西分区,也不需要按任 ..
发布时间:2021-12-21 23:22:09 其他开发

PySpark 中的随机数生成

让我们从一个总是返回随机整数的简单函数开始: 将 numpy 导入为 np定义 f(x):返回 np.random.randint(1000) 和一个用零填充并使用 f 映射的 RDD: rdd = sc.parallelize([0] * 10).map(f) 由于上面的 RDD 不是持久化的,我希望每次收集时都会得到不同的输出: >rdd.collect()[255, 512, 51 ..
发布时间:2021-12-21 16:10:27 Python

如何充分利用集群中的所有Spark节点?

我已经在 Spark 的独立模式下使用 ec2-script 启动了一个 10 节点集群.我正在从 PySpark shell 中访问 s3 存储桶中的数据,但是当我在 RDD 上执行转换时,只使用了一个节点.例如,下面将从 CommonCorpus 中读取数据: bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MA ..
发布时间:2021-12-21 11:16:11 其他开发

无法在 Google DataProc 的 jupyter 中添加 jars pyspark

我在 DataProc 上有一个 Jupyter 笔记本,我需要一个 jar 来运行一些作业.我知道编辑 spark-defaults.conf 并使用 --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar从命令行提交作业 - 它们都运行良好.但是,如果我想直接将jar添加到jupyter notebook,我尝试了以下方法,它们都失败了 ..