pyspark相关内容

在Cloudera VM中阅读教程CSV文件时出现异常

我正在尝试使用Cloudera虚拟机附带的Spark教程。但即使我使用正确的行结束编码,我不能执行脚本,因为我得到吨的错误。 本教程是Coursera 大数据分析简介课程的一部分。 可在此处找到。 这是我做的。安装IPython shell(如果尚未完成): sudo easy_install ipython == 1.2.1 打开/启动shell(使用1.2.0 ..
发布时间:2017-02-26 15:51:14 Python

将spark数据帧写为带分区的CSV

我试图在spark中写一个数据帧到hdfs位置,我希望如果我添加了'partitionBy'记号spark将创建分区 (类似于写镶木地板格式) 文件夹,形式为“partition_column_name = partition_value” (即 partition_date = 2016-05-03 )。 这样做,我运行以下命令: df.write.partitionBy('par ..
发布时间:2017-02-24 15:52:59 Python

如何在IPython Notebook中加载jar依赖

此网页鼓励我试用spark-csv在.Pypark中阅读.csv文件 我发现了一些帖子,如这说明如何使用 spark-csv 但是我不能通过包含.jar文件或包扩展来初始化ipython实例 也就是说,而不是 ipython notebook --profile = pyspark ,我尝试了 ipython notebook --profile = pyspark --packag ..
发布时间:2017-02-24 15:25:22 Office

使用Spark加载CSV文件

我是Spark的新手,我正在尝试使用Spark从文件中读取CSV数据。 这是我在做什么: sc.textFile('file.csv') .map (lambda line:(line.split(',')[0],line.split(',')[1])$ ​​b $ b .collect() / pre> 我希望这个调用给我一个文件的两个第一列的列表,但我得到这个错误: ..
发布时间:2017-02-24 15:13:49 Python

提高火花应用程序的速度

这是我的python-spark代码的一部分,它的一部分对我的需要运行太慢。 特别是这部分代码,我真的想提高它的速度,但不知道如何。 sqlContext.read(6百万行数据行)现在需要大约1分钟才能完成。 .format(“org.apache.spark.sql.cassandra”)。options(table =“axes”,keyspace = source).load() ..
发布时间:2016-11-13 16:05:35 Python

为什么我的Spark流媒体应用这么慢?

我有一个有4个节点的集群:3个Spark节点和1个Solr节点。我的CPU是8核,我的内存是32 GB,磁盘空间是SSD。我使用cassandra作为我的数据库。我的数据量是22GB后6小时,我现在有大约3,400万行,这应该在5分钟内阅读。 但它已经无法在这段时间内完成任务。我的未来计划是在5分钟内读取 100百万行。我不知道我可以增加或做得更好,以实现这个结果,以及实现我的未来目标。这是 ..

在Apache的星火多个字段排序

我在火花RDD。的RDD的每个元素是一个列表。 此外,所有的元素是类似的图案的列表,因此它有点像一个表。 我需要通过一些列排序的RDD,在一个特定的优先级顺序。 我怎样才能做到这一点? PS:这是我试过 我试图领域具有最高优先级,然后按它来进行排序,然后每个结果由现场与第二最高优先级排序。我这样做递归,并加入了结果。 但是,使用RDD.groupBy这么多次讲得非常非常缓慢。 解 ..
发布时间:2016-05-22 16:50:52 Python

如何合并元素明智的2 RDDS

假设我有两个RDDS像 第一个 1 2 3 4 五 第二 6 7 8 9 10 新RDD是 1 6 2 7 3 8 4 9 5 10 所以,这是基本元素明智合并......我们假定两个RDDS是大小相同的。 解决方案 您可以使用星火公司的拉链功能。按照Doc:>>> X = sc.parallelize(范围(0,5)) >>> Y = s ..
发布时间:2016-05-22 16:50:34 Python

pyspark:用船JAR依赖火花提交

我写了一个pyspark脚本读取两个JSON文件,协同组他们并将结果发送到elasticsearch集群;一切正常(大部分)如预期,当我在本地运行它,我下载了 elasticsearch-的Hadoop为 org.elasticsearch.hadoop.mr jar文件。 EsOutputFormat 和 org.elasticsearch.hadoop.mr.LinkedMapWritable ..
发布时间:2016-05-22 16:48:35 Python

什么是spark.python.worker.memory?

谁能给我这个星火参数的更多precise描述,以及它如何影响程序执行?我不能告诉正是这个参数的意义与文档“引擎盖下。” 解决方案 该参数会影响为Python工人的内存限制。如果一个Python工作进程的RSS比内存限制的,那么它会从内存到磁盘的数据溢出,这将减少内存利用率,但一般是昂贵的操作。 请注意,该值每Python的劳动者申请,并且会有多个工人每执行人。 如果你想采取引擎盖下看看 ..
发布时间:2016-05-22 16:46:37 其他开发

星火RDD groupByKey +加盟VS连接性能

我使用的,我与他人用户共享群集上的火花。因此,它是不可靠的,告诉我哪个code之一正是基​​于运行时间运行更加高效。因为当我运行更高效的code,别人可能运行大量数据的作品,使我的code执行时间较长。 所以我可以在这里问两个问题: 我用加入功能的加入2 RDDS ,我试图用 groupByKey()使用前加入,就像这样: rdd1.groupByKey()。加入(RDD2) 似乎 ..
发布时间:2016-05-22 16:45:19 其他开发

flatMap抛出错误-uni code项没有属性flatMap

给定的输入RDD或形式 1:6 7 2:5 我怎样才能形式的另一个RDD 1 6 1 7 2 5 等.. 失败消息UNI code项没有属性flatMap 高清get_str(X,Y): .. code到flatmap 返回运 文字= sc.textFile(输入) RES = text.map(拉姆达L:l.split(“:”))的地图。(拉姆达(X,Y):get ..
发布时间:2016-05-22 16:45:09 Python

如何指定分区mapPartition火花

我想要做的是计算每个列表分别所以举例来说,如果我有5名单([1,2,3,4,5,6],[2,3,4, 5,6],[3,4,5,6],[4,5,6],[5,6]),我想拿到5列出而不6我会做这样的事情: 数据= [1,2,3,4,5,6] + [2,3,4,5,6,7] + [3,4,5,6 ,7,8] + [4,5,6,7,8,9] + [5,6,7,8,9,10]高清function_1(it ..
发布时间:2016-05-22 16:44:59 Python

究竟是什么在K均值的initializationSteps参数++在星火MLLib?

我知道什么是k均值是,我也明白k均值++算法。我相信,唯一的变化是初始ķ中心被发现的方式。 在++版本,我们最初选择中心并使用我们选择剩余的K-1中心的概率分布。 在MLLib算法的k-means是什么 initializationSteps 参数? 解决方案 要为precise k均值++对于选择初始中心的算法,并没有描述整个培训过程。 使用 k均值|| MLLib k均值对于初始 ..

RDD保存在pyspark序列文件

我能够运行此脚本保存在文本格式文件,但是当我尝试运行saveAsSequenceFile它示数出来。如果任何一个有关于如何将RDD保存为序列文件的想法,请让我知道这个过程。我试图寻找“学习星火”的解决方案,以及官方星火文档。 这成功运行 dataRDD = sc.textFile(“/用户/ Cloudera的/ sqoop_import /部门”) dataRDD.saveAsTextFi ..
发布时间:2016-05-22 16:43:00 Python

火花地图只有一个任务,同时它应该是平行(PySpark)

我有一个RDD与每10标准坐标7M左右的条目。我也有一些中心,我想每个条目到最近的(欧氏距离)中心地图。的问题是,这仅产生一个任务,这意味着它不是并行。这是形式: 高清DoSomething的(点,中心): 在centers.value中心: 如果(距离(点,中心)。1): 返回(中心) 收益率(无)preppedData.map。(波长 ..
发布时间:2016-05-22 16:42:33 其他开发

pyspark:在一个map函数语法错误多操作

我在我的pyspark地图功能添加额外的操作。 最初的功能是: =结果input.map(拉姆达行:process_myData(行)) 这工作正常。然后我试图增加一个额外的操作象下面这样: =结果{input.map行拉姆达: ROW1 = row.replace(“ABC”,“高清”) process_myData(ROW1)} 然 ..
发布时间:2016-05-22 16:42:00 Python