pyspark相关内容
我正在尝试使用Cloudera虚拟机附带的Spark教程。但即使我使用正确的行结束编码,我不能执行脚本,因为我得到吨的错误。 本教程是Coursera 大数据分析简介课程的一部分。 可在此处找到。 这是我做的。安装IPython shell(如果尚未完成): sudo easy_install ipython == 1.2.1 打开/启动shell(使用1.2.0
..
我试图在spark中写一个数据帧到hdfs位置,我希望如果我添加了'partitionBy'记号spark将创建分区 (类似于写镶木地板格式) 文件夹,形式为“partition_column_name = partition_value” (即 partition_date = 2016-05-03 )。 这样做,我运行以下命令: df.write.partitionBy('par
..
此网页鼓励我试用spark-csv在.Pypark中阅读.csv文件 我发现了一些帖子,如这说明如何使用 spark-csv 但是我不能通过包含.jar文件或包扩展来初始化ipython实例 也就是说,而不是 ipython notebook --profile = pyspark ,我尝试了 ipython notebook --profile = pyspark --packag
..
我是Spark的新手,我正在尝试使用Spark从文件中读取CSV数据。 这是我在做什么: sc.textFile('file.csv') .map (lambda line:(line.split(',')[0],line.split(',')[1])$ b $ b .collect() / pre> 我希望这个调用给我一个文件的两个第一列的列表,但我得到这个错误:
..
这是我的python-spark代码的一部分,它的一部分对我的需要运行太慢。 特别是这部分代码,我真的想提高它的速度,但不知道如何。 sqlContext.read(6百万行数据行)现在需要大约1分钟才能完成。 .format(“org.apache.spark.sql.cassandra”)。options(table =“axes”,keyspace = source).load()
..
我在cassandra有一些测试数据。我试图从spark中获取这个数据,但我得到一个错误,如: py4j.protocol.Py4JJavaError:调用o25.load。 java.io.IOException:无法在{127.0.1.1}打开与Cassandra的本地连接:9042 这是我到目前为止所做的: 开始 ./ bin / test>
..
我有一个有4个节点的集群:3个Spark节点和1个Solr节点。我的CPU是8核,我的内存是32 GB,磁盘空间是SSD。我使用cassandra作为我的数据库。我的数据量是22GB后6小时,我现在有大约3,400万行,这应该在5分钟内阅读。 但它已经无法在这段时间内完成任务。我的未来计划是在5分钟内读取 100百万行。我不知道我可以增加或做得更好,以实现这个结果,以及实现我的未来目标。这是
..
我在火花RDD。的RDD的每个元素是一个列表。 此外,所有的元素是类似的图案的列表,因此它有点像一个表。 我需要通过一些列排序的RDD,在一个特定的优先级顺序。 我怎样才能做到这一点? PS:这是我试过 我试图领域具有最高优先级,然后按它来进行排序,然后每个结果由现场与第二最高优先级排序。我这样做递归,并加入了结果。 但是,使用RDD.groupBy这么多次讲得非常非常缓慢。 解
..
假设我有两个RDDS像 第一个 1 2 3 4 五 第二 6 7 8 9 10 新RDD是 1 6 2 7 3 8 4 9 5 10 所以,这是基本元素明智合并......我们假定两个RDDS是大小相同的。 解决方案 您可以使用星火公司的拉链功能。按照Doc:>>> X = sc.parallelize(范围(0,5)) >>> Y = s
..
我写了一个pyspark脚本读取两个JSON文件,协同组他们并将结果发送到elasticsearch集群;一切正常(大部分)如预期,当我在本地运行它,我下载了 elasticsearch-的Hadoop为 org.elasticsearch.hadoop.mr jar文件。 EsOutputFormat 和 org.elasticsearch.hadoop.mr.LinkedMapWritable
..
谁能给我这个星火参数的更多precise描述,以及它如何影响程序执行?我不能告诉正是这个参数的意义与文档“引擎盖下。” 解决方案 该参数会影响为Python工人的内存限制。如果一个Python工作进程的RSS比内存限制的,那么它会从内存到磁盘的数据溢出,这将减少内存利用率,但一般是昂贵的操作。 请注意,该值每Python的劳动者申请,并且会有多个工人每执行人。 如果你想采取引擎盖下看看
..
我使用的,我与他人用户共享群集上的火花。因此,它是不可靠的,告诉我哪个code之一正是基于运行时间运行更加高效。因为当我运行更高效的code,别人可能运行大量数据的作品,使我的code执行时间较长。 所以我可以在这里问两个问题: 我用加入功能的加入2 RDDS ,我试图用 groupByKey()使用前加入,就像这样: rdd1.groupByKey()。加入(RDD2) 似乎
..
给定的输入RDD或形式 1:6 7 2:5 我怎样才能形式的另一个RDD 1 6 1 7 2 5 等.. 失败消息UNI code项没有属性flatMap 高清get_str(X,Y): .. code到flatmap 返回运 文字= sc.textFile(输入) RES = text.map(拉姆达L:l.split(“:”))的地图。(拉姆达(X,Y):get
..
我想要做的是计算每个列表分别所以举例来说,如果我有5名单([1,2,3,4,5,6],[2,3,4, 5,6],[3,4,5,6],[4,5,6],[5,6]),我想拿到5列出而不6我会做这样的事情: 数据= [1,2,3,4,5,6] + [2,3,4,5,6,7] + [3,4,5,6 ,7,8] + [4,5,6,7,8,9] + [5,6,7,8,9,10]高清function_1(it
..
我知道什么是k均值是,我也明白k均值++算法。我相信,唯一的变化是初始ķ中心被发现的方式。 在++版本,我们最初选择中心并使用我们选择剩余的K-1中心的概率分布。 在MLLib算法的k-means是什么 initializationSteps 参数? 解决方案 要为precise k均值++对于选择初始中心的算法,并没有描述整个培训过程。 使用 k均值|| MLLib k均值对于初始
..
我能够运行此脚本保存在文本格式文件,但是当我尝试运行saveAsSequenceFile它示数出来。如果任何一个有关于如何将RDD保存为序列文件的想法,请让我知道这个过程。我试图寻找“学习星火”的解决方案,以及官方星火文档。 这成功运行 dataRDD = sc.textFile(“/用户/ Cloudera的/ sqoop_import /部门”) dataRDD.saveAsTextFi
..
我有一个RDD与每10标准坐标7M左右的条目。我也有一些中心,我想每个条目到最近的(欧氏距离)中心地图。的问题是,这仅产生一个任务,这意味着它不是并行。这是形式: 高清DoSomething的(点,中心): 在centers.value中心: 如果(距离(点,中心)。1): 返回(中心) 收益率(无)preppedData.map。(波长
..
我在我的pyspark地图功能添加额外的操作。 最初的功能是: =结果input.map(拉姆达行:process_myData(行)) 这工作正常。然后我试图增加一个额外的操作象下面这样: =结果{input.map行拉姆达: ROW1 = row.replace(“ABC”,“高清”) process_myData(ROW1)} 然
..
我想创建一个用户定义的聚合函数,我可以从蟒蛇调用。我试图按照答案
..
我想筛选Pyspark数据框用类似SQL的在条款,如 SC = SparkContext() SQLC = SQLContext(SC) DF = sqlc.sql(“SELECT * FROM my_df WHERE field1的在一个') 其中, A 是元组(1,2,3)。我得到这个错误: 了java.lang.RuntimeException:[1.67]失败:``('',
..