pyspark相关内容
在我的DataFrame中,有分别包含null和NaN值的列,例如: df = spark.createDataFrame([(1, float('nan')), (None, 1.0)], ("a", "b"))df.show()+----+---+|一个|乙|+----+---+|1|NaN||空|1.0|+----+---+ 它们之间有什么区别吗?如何处理? 解决方案 null
..
我正在尝试在 pyspark 环境中运行脚本,但到目前为止我还没有. 如何在 pyspark 中运行像 python script.py 这样的脚本? 解决方案 你可以这样做:./bin/spark-submit mypythonfile.py 从 Spark 2.0 开始,不支持通过 pyspark 运行 python 应用程序.
..
我有一个很大的 pyspark 数据框.我想得到它的相关矩阵.我知道如何使用 Pandas 数据框获取它.但是我的数据太大而无法转换为 Pandas.所以我需要用 pyspark 数据框得到结果.我搜索了其他类似的问题,答案对我不起作用.有谁能够帮我?谢谢! 数据示例:数据示例 解决方案 欢迎来到 SO! 示例数据 我准备了一些虚拟数据以便于复制(也许下次你也可以提供一些
..
PySpark 文档描述了两个函数: mapPartitions(f,preservesPartitioning=False)通过对这个 RDD 的每个分区应用一个函数来返回一个新的 RDD.>>>rdd = sc.parallelize([1, 2, 3, 4], 2)>>>def f(iterator): 产量总和(iterator)>>>rdd.mapPartitions(f).colle
..
在 Jupyter notebook 中使用 PySpark,与 Pandas DataFrames 的显示方式相比,Spark 的 DataFrame.show 的输出技术含量较低.我想“嗯,它可以完成工作",直到我得到这个: 输出未调整到笔记本的宽度,因此线条以丑陋的方式环绕.有没有办法自定义这个?更好的是,有没有办法获得 Pandas 风格的输出(显然不转换为 pandas.DataF
..
我想知道是否有一种简洁的方法可以在 pyspark 中的 DataFrame 上运行 ML(例如 KMeans),如果我有多个数字列中的功能. 即如在 Iris 数据集中: (a1=5.1, a2=3.5, a3=1.4, a4=0.2, id=u'id_1', label=u'Iris-setosa', binomial_label=1) 我想使用 KMeans 而不用手动添加特征向量
..
阅读 this 和 这个 让我觉得可以让 执行一个 python 文件spark-submit 但是我无法让它工作. 我的设置有点复杂.我需要将几个不同的 jar 与我的 python 文件一起提交,以便一切正常运行.我的 pyspark 命令如下: IPYTHON=1 ./pyspark --jars jar1.jar,/home/local/ANT/bogoyche/dev/rhine
..
我需要根据现有列创建一个新的 Spark DF MapType 列,其中列名是键,值是值. 例如 - 我有这个 DF: rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6),('d23d', 1.5, 2.0, 2.2),('as3d', 2.2, 4.3, 9.0)])schema = StructType([StructField('key', S
..
我想在我的 Spark 程序 (Pyspark) 开始时做一些清理工作.例如,我想删除以前运行 HDFS 的数据.在 pig 中,这可以使用诸如 之类的命令来完成 fs -copyFromLocal ....rmf/path/to-/hdfs 或在本地使用 sh 命令. 我想知道如何用 Pyspark 做同样的事情. 解决方案 您可以使用表单示例执行任意 shell 命令 su
..
假设我有一个 Spark DataFrame,我想将其另存为 CSV 文件.Spark 2.0.0之后,DataFrameWriter类直接支持保存为CSV文件. 默认行为是将输出保存在提供的路径内的多个 part-*.csv 文件中. 我将如何保存 DF : 路径映射到确切的文件名而不是文件夹 标题在第一行 另存为单个文件而不是多个文件. 处理它的一种方法是合并 DF
..
我需要使用 (rdd.)partitionBy(npartitions, custom_partitioner) DataFrame 上不可用的方法.所有 DataFrame 方法都只引用 DataFrame 结果.那么如何从DataFrame数据创建一个RDD呢? 注意:这是对 1.2.0 的更改(在 1.3.0 中). 更新来自@dpangmao的回答:方法是.rdd.我有兴趣
..
我当前的 Java/Spark 单元测试方法通过使用“本地"实例化 SparkContext 并运行使用 JUnit 进行单元测试. 必须组织代码以在一个函数中执行 I/O,然后使用多个 RDD 调用另一个函数. 这很好用.我有一个用 Java + Spark 编写的经过高度测试的数据转换. 我可以用 Python 做同样的事情吗? 我将如何使用 Python 运行 Spa
..
我有一个 PySpark 数据框 +--------------+--------------+----+----+|地址|日期|姓名|食物|+-------+--------------+----+----+|1111111|20151122045510|阴|gre ||1111111|20151122045501|阴|gre ||1111111|20151122045500|Yln|gra
..
当我们尝试从启用 SSL 的 Kafka 主题流式传输数据时,我们面临以下错误.你能帮我们解决这个问题吗? 19/11/07 13:26:54 INFO ConsumerFetcherManager:[ConsumerFetcherManager-1573151189884] 为分区 ArrayBuffer() 添加了提取器19/11/07 13:26:54 WARN ConsumerFetch
..
我在 Azure Databricks 的 jupyter notebook 文件中的 %python 下创建了 python 变量.如何访问相同的变量以在 %sql 下进行比较.示例如下: %pythonRunID_Goal = sqlContext.sql("SELECT CONCAT(SUBSTRING(RunID,1,6),SUBSTRING(RunID,1,6),'01_')FROM
..
假设我有一个 pyspark 数据框“数据",如下所示.我想按“期间"对数据进行分区.相反,我希望每个时期的数据都存储在它自己的分区上(请参阅下面“数据"数据框下方的示例). data = sc.parallelize([[1,1,0,14277.4,0], \[1,2,0,14277.4,0], \[2,1,0,4741.91,0], \[2,2,0,4693.03,0], \[3,1,2,9
..
我尝试通过 df.schema() 将架构作为通用架构并将所有 CSV 文件加载到其中.但是分配的架构失败,其他 CSV 文件的标题不匹配 任何建议将不胜感激.如在函数或火花脚本中 解决方案 据我所知.您想联合/合并具有不同模式的文件(尽管是一个主模式的子集)..我写了这个函数 UnionPro,我认为它正好适合你的要求 - 编辑 - 添加了 Pyspark 版本 def u
..
现在我必须在 pyspark (Spark 2.1.0) 中使用 sc.parallelize() 创建一个并行化集合. 我的驱动程序中的集合很大.并行的时候发现在master节点上占用了很多内存. 似乎在我将它并行化到每个工作节点之后,集合仍然保存在在主节点的spark的内存中.这是我的代码示例: #我的python代码sc = SparkContext()a = [1.0] *
..
我有一个 LIBSVM 缩放模型(使用 svm-scale 生成),我想将其移植到 PySpark.我天真地尝试了以下内容: scaler_path = "模型路径"a = MinMaxScaler().load(scaler_path) 但是我抛出了一个错误,需要一个元数据目录: Py4JJavaErrorTraceback(最近一次调用)
..
包含点(例如“id.orig_h")的pyspark数据帧将不允许groupby,除非首先由withColumnRenamed重命名.有解决方法吗?"`a.b`" 似乎没有解决. 解决方案 在我的 pyspark shell 中,以下代码段有效: from pyspark.sql.functions import *myCol = col("`id.orig_h`")结果 = df.gr
..