pyspark相关内容
我使用AWS EC2指南安装了Spark,并且我可以使用 bin / pyspark 脚本启动程序以获得spark提示,并且还可以执行快速启动但是,我不能为我的生活找出如何停止所有详细的 INFO 在每条命令后面进行日志记录。 在我的 log4j.properties中,我尝试了几乎所有可能的场景(注释掉,设置为OFF)在 conf 文件夹中的文件中,我从哪里启动应用程序以及在每个节点上执行
..
虽然问题是基于连续行的两个或多个列的值创建多个组,但我只是通过这种方式简化了问题。假设有像这样的pyspark数据框 >>> df = sqlContext.createDataFrame([ ... Row(SN = 1,age = 45,gender ='M',name ='Bob'), ... Row(SN = 2,年龄= 28,性别='M',姓名='Albert'), ...行
..
如何在 groupby collect_set 或 collect_list C $ C>。例如: df.groupby('key')。collect_set('values')。我得到一个错误: AttributeError:'GroupedData'对象没有属性'collect_set' 解决方案 您需要使用agg。示例:pyspark import 导入HiveContext
..
我创建了一个具有900万行和85K列的坐标矩阵cmat。我想执行cmat.T * cmat操作。 我首先将cmat转换为块矩阵bmat: pre $ b $ cmat.toBlockMatrix(1000,1000) 然而,我在执行multiply()时遇到错误: $ b mtm = bmat.transpose.multiply(bmat) Traceback(最近的最后一次
..
我有一个在bigquery中加载的数据表,我想通过一个pyspark .py文件将它导入到我的Spark集群中。 我在 Dataproc + BigQuery的例子 - 任何可用?,有一种方法来加载一个bigquery表的火花cluster with scala,但是有没有办法在pyspark脚本中做到这一点? 解决方案 这来自@MattJ in 这个问题。这是一个连接到Spark中
..
sqlContext = SQLContext(sc) sample = sqlContext.sql(“select ) sample.show() 上面的语句打印终端上的整个表,但我想用 for或while 访问该表中的每一行来执行进一步的计算。 解决方案你可以定义一个自定义函数并使用map。 $ $ $ $ c $ def defFunction(row
..
如何在pyspark中使用 filter()删除空的tweets?我已经完成了以下工作: $ b $ pre $ t $ c $ tweets = 结果给我13995.然而当我从mongodb导入数据时,它显示了11186 我似乎无法应用 filter()命令来删除空的tweets。如果你的数据像这样 解决方案 > tweets = sc.parallelize([“tit
..
问 题 spark-submit指令,比如我要提交的python文件是几个互相import的文件,并且有的是在文件夹里的 解决方案 http://stackoverflow.com/questions/29485175/spark-submit-failed-with-spark-streaming-workdcount-python-code
..
我遵循这个文章将一些数据发送到AWS ES,我使用了jar弹性搜索。这是我的脚本: from pyspark import SparkContext,SparkConf from pyspark.sql import SQLContext if __name_ ==“__main__”: conf = SparkConf()。setAppName(“WriteToES”) sc
..
我在Mac上使用码头图像 sequenceiq / spark 研究这些 spark示例,在学习过程中,我将该图像内的火花升级到1.6.1根据此答案,当我开始发生错误简单数据操作示例,以下是发生了什么: 当我运行 df = sqlContext.read.format(“jdbc”)。option(“url”,url).option(“dbtable”,“people”)。load()它引发
..
我正在使用PySpark。数据帧('canon_evt')中有一列('dt'),这是一个时间戳。我试图从DateTime值中删除秒。它最初是作为一个字符串从镶木地板读取的。然后我尝试通过 canon_evt = canon_evt.withColumn('dt',to_date(canon_evt.dt))将其转换为Timestamp canon_evt = canon_evt.wit
..
我有一个Spark SQL DataFrame 与数据,我想要得到的是在给定日期范围内的当前行之前的所有行。所以例如我想让所有的行从7天以前返回给定行。我想到,我需要使用窗口函数,如: 窗口\ .partitionBy('id')\ .orderBy('start') 这里有问题。我想要有一个 rangeBetween 7天,但Spark文档中没有什么可以找到。 S
..
假设我有以下数据集: a | b 1 | 0.4 1 | 0.8 1 | 0.5 2 | 0.4 2 | 0.1 我想添加一个名为“label”的新列,其中每个 a 中的值组。 a 组中 b 的最高值标记为1,所有其他标签为0。 输出将如下所示: a | b |标签 1 | 0.4 | 0 1 | 0.8 | 1 1 |
..
我有一个具有1亿行和5000多列的DF。我试图在colx和剩余的5000+列之间找到corr。 aggList1 = [mean(col).alias +'_m')for col in df.columns] #exclude keys df21 = df.groupBy('key1','key2','key3','key4')。agg(* aggList1) df = df.joi
..
我有这个JSON文件 { “a”:1, “ b“:2 } 这是使用Python json.dump方法获得的。 现在,我想使用pyspark将此文件读入Spark中的DataFrame。以下文档,我正在这样做 sc = SparkContext() sqlc = SQLContext(sc) df = sqlc.read.j
..
我有一个 Spark 1.5.0 DataFrame ,混合使用 null 和同一列中的空字符串。我想将所有列中的所有空字符串转换为Python中的 null ( None )。 DataFrame可能有数百列,所以我试图避免对每一列进行硬编码操作。 看到我下面的尝试,这会导致错误。 / p> 从pyspark.sql导入SQLContext sqlContext = SQLCon
..
我正在使用pyspark数据框分析一些数据,假设我有一个数据框 df ,我正在汇总: df.groupBy(“group”)\ .agg({“money”:“sum”})\ .show(100) 这将给我: group SUM(money#2L) A 137461285853 B 172185566943 C 271179590646
..
我在 linode.com 上有一个远程Ubuntu服务器,具有4个内核和8G RAM 我的远程Ubuntu服务器上有一个包含1个主控和1个从机的Spark-2集群。 我已经在我的MacBook上启动了PySpark shell ,通过以下方式连接到远程服务器上的我的主节点: $ PYSPARK_PYTHON = python3 /vagrant/spark-2.0 .0-bin-had
..
Q1。我试图从Spark数据框(13行)中获取一个简单的随机样本,使用带有参数的示例函数,其中包括:Replacement:false,fraction:0.6,但是它每次运行时都会提供不同大小的样本,尽管它可以正常工作我设置了第三个参数(seed)。为什么这样? Q2。随机数生成后的样本如何获取? 提前感谢 解决方案 随机数生成后的样本如何获取? 根据您要样本有两种
..
我试图使用 SQLContext.subtract()在Spark 1.6.1中从基于另一个数据帧的列从数据帧中删除行。让我们用一个例子: from pyspark.sql import Row df1 = sqlContext.createDataFrame([ Row(name ='Alice',age = 2), Row(name ='Bob',age = 1), ])
..