pyspark相关内容

如何在 PySpark 中运行脚本

我正在尝试在 pyspark 环境中运行脚本,但到目前为止我还没有. 如何在 pyspark 中运行像 python script.py 这样的脚本? 解决方案 你可以这样做:./bin/spark-submit mypythonfile.py 从 Spark 2.0 开始,不支持通过 pyspark 运行 python 应用程序. ..
发布时间:2021-12-22 21:34:58 其他开发

如何获得pyspark数据帧的相关矩阵?

我有一个很大的 pyspark 数据框.我想得到它的相关矩阵.我知道如何使用 Pandas 数据框获取它.但是我的数据太大而无法转换为 Pandas.所以我需要用 pyspark 数据框得到结果.我搜索了其他类似的问题,答案对我不起作用.有谁能够帮我?谢谢! 数据示例:数据示例 解决方案 欢迎来到 SO! 示例数据 我准备了一些虚拟数据以便于复制(也许下次你也可以提供一些 ..
发布时间:2021-12-22 21:34:46 其他开发

改进 PySpark DataFrame.show 输出以适合 Jupyter 笔记本

在 Jupyter notebook 中使用 PySpark,与 Pandas DataFrames 的显示方式相比,Spark 的 DataFrame.show 的输出技术含量较低.我想“嗯,它可以完成工作",直到我得到这个: 输出未调整到笔记本的宽度,因此线条以丑陋的方式环绕.有没有办法自定义这个?更好的是,有没有办法获得 Pandas 风格的输出(显然不转换为 pandas.DataF ..
发布时间:2021-12-22 21:34:28 Python

在 python main 中使用 spark-submit

阅读 this 和 这个 让我觉得可以让 执行一个 python 文件spark-submit 但是我无法让它工作. 我的设置有点复杂.我需要将几个不同的 jar 与我的 python 文件一起提交,以便一切正常运行.我的 pyspark 命令如下: IPYTHON=1 ./pyspark --jars jar1.jar,/home/local/ANT/bogoyche/dev/rhine ..
发布时间:2021-12-22 21:33:59 其他开发

pyspark:从现有列创建 MapType 列

我需要根据现有列创建一个新的 Spark DF MapType 列,其中列名是键,值是值. 例如 - 我有这个 DF: rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6),('d23d', 1.5, 2.0, 2.2),('as3d', 2.2, 4.3, 9.0)])schema = StructType([StructField('key', S ..
发布时间:2021-12-22 21:33:49 Python

pyspark 和 HDFS 命令

我想在我的 Spark 程序 (Pyspark) 开始时做一些清理工作.例如,我想删除以前运行 HDFS 的数据.在 pig 中,这可以使用诸如 之类的命令来完成 fs -copyFromLocal ....rmf/path/to-/hdfs 或在本地使用 sh 命令. 我想知道如何用 Pyspark 做同样的事情. 解决方案 您可以使用表单示例执行任意 shell 命令 su ..
发布时间:2021-12-22 21:33:36 Python

将 Spark DataFrame 的内容保存为单个 CSV 文件

假设我有一个 Spark DataFrame,我想将其另存为 CSV 文件.Spark 2.0.0之后,DataFrameWriter类直接支持保存为CSV文件. 默认行为是将输出保存在提供的路径内的多个 part-*.csv 文件中. 我将如何保存 DF : 路径映射到确切的文件名而不是文件夹 标题在第一行 另存为单个文件而不是多个文件. 处理它的一种方法是合并 DF ..
发布时间:2021-12-22 21:33:24 其他开发

如何在pyspark中将DataFrame转换回正常的RDD?

我需要使用 (rdd.)partitionBy(npartitions, custom_partitioner) DataFrame 上不可用的方法.所有 DataFrame 方法都只引用 DataFrame 结果.那么如何从DataFrame数据创建一个RDD呢? 注意:这是对 1.2.0 的更改(在 1.3.0 中). 更新来自@dpangmao的回答:方法是.rdd.我有兴趣 ..
发布时间:2021-12-22 21:32:58 Python

如何对 PySpark 程序进行单元测试?

我当前的 Java/Spark 单元测试方法通过使用“本地"实例化 SparkContext 并运行使用 JUnit 进行单元测试. 必须组织代码以在一个函数中执行 I/O,然后使用多个 RDD 调用另一个函数. 这很好用.我有一个用 Java + Spark 编写的经过高度测试的数据转换. 我可以用 Python 做同样的事情吗? 我将如何使用 Python 运行 Spa ..
发布时间:2021-12-22 21:32:33 Python

未能找到话题的领导者;java.lang.NullPointerException NullPointerException 在 org.apache.kafka.common.utils.Utils.formatAddress

当我们尝试从启用 SSL 的 Kafka 主题流式传输数据时,我们面临以下错误.你能帮我们解决这个问题吗? 19/11/07 13:26:54 INFO ConsumerFetcherManager:[ConsumerFetcherManager-1573151189884] 为分区 ArrayBuffer() 添加了提取器19/11/07 13:26:54 WARN ConsumerFetch ..

如何使用 pyspark 管理跨集群的数据帧的物理数据放置?

假设我有一个 pyspark 数据框“数据",如下所示.我想按“期间"对数据进行分区.相反,我希望每个时期的数据都存储在它自己的分区上(请参阅下面“数据"数据框下方的示例). data = sc.parallelize([[1,1,0,14277.4,0], \[1,2,0,14277.4,0], \[2,1,0,4741.91,0], \[2,2,0,4693.03,0], \[3,1,2,9 ..
发布时间:2021-12-22 21:31:51 其他开发

Spark - 将具有不同架构(列名和序列)的数据帧合并/联合到具有主通用架构的数据帧

我尝试通过 df.schema() 将架构作为通用架构并将所有 CSV 文件加载到其中.但是分配的架构失败,其他 CSV 文件的标题不匹配 任何建议将不胜感激.如在函数或火花脚本中 解决方案 据我所知.您想联合/合并具有不同模式的文件(尽管是一个主模式的子集)..我写了这个函数 UnionPro,我认为它正好适合你的要求 - 编辑 - 添加了 Pyspark 版本 def u ..
发布时间:2021-12-22 21:31:27 Python

为什么 SparkContext.parallelize 使用驱动程序的内存?

现在我必须在 pyspark (Spark 2.1.0) 中使用 sc.parallelize() 创建一个并行化集合. 我的驱动程序中的集合很大.并行的时候发现在master节点上占用了很多内存. 似乎在我将它并行化到每个工作节点之后,集合仍然保存在在主节点的spark的内存中.这是我的代码示例: #我的python代码sc = SparkContext()a = [1.0] * ..
发布时间:2021-12-22 21:31:18 其他开发