从clojure.lang.LazySeq转换为键入org.apache.spark.api.java.JavaRDD [英] Convert from clojure.lang.LazySeq to type org.apache.spark.api.java.JavaRDD

查看:197
本文介绍了从clojure.lang.LazySeq转换为键入org.apache.spark.api.java.JavaRDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我开发了Clojure的功能,填补了从最后一个非空值的空栏,我假定这工作,因为

I developed a function in clojure to fill in an empty column from the last non-empty value, I'm assuming this works, given

(:require [flambo.api :as f])

(defn replicate-val
  [ rdd input ]
  (let [{:keys [ col ]} input
    result (reductions (fn [a b]
                         (if (empty? (nth b col))
                           (assoc b col (nth a col))
                           b)) rdd )]
(println "Result type is: "(type result))))

得到这个:

;=> "Result type is:  clojure.lang.LazySeq"

问题是我如何把这个还给键入JavaRDD,使用flambo(火花包装)

The question is how do I convert this back to type JavaRDD, using flambo (spark wrapper)

我试过(F /图结果#(toJavaRDD%))的形式来尝试转换为 JavaRDD 键入

I tried (f/map result #(.toJavaRDD %)) in the let form to attempt to convert to JavaRDD type

我得到这个错误

"No matching method found: map for class clojure.lang.LazySeq"

这是意料之中的,因为结果是一个类型的 clojure.lang.LazySeq

问题是,如何我进行这种转换,或者我怎么能重构code,以适应这一点。

Question is how to I make this conversion, or how can I refactor the code to accomodate this.

下面是一个简单的输入RDD:

Here is a sample input rdd:

(type rdd) ;=> "org.apache.spark.api.java.JavaRDD"

不过貌似:

[["04" "2" "3"] ["04" "" "5"] ["5" "16" ""] ["07" "" "36"] ["07" "" "34"] ["07" "25" "34"]]

需要的输出是:

[["04" "2" "3"] ["04" "2" "5"] ["5" "16" ""] ["07" "16" "36"] ["07" "16" "34"] ["07" "25" "34"]]

感谢。

推荐答案

所有RDDS首先不是可迭代(不执行 ISEQ ),所以你不能使用减少。忽略访问previous记录的整体思路是相当棘手。首先,你不能直接从另一个分区访问值。而且只有转换不需要洗牌preserve秩序。

First of all RDDs are not iterable (don't implement ISeq) so you cannot use reductions. Ignoring that a whole idea of accessing previous record is rather tricky. First of all you cannot directly access values from an another partition. Moreover only transformations which don't require shuffling preserve order.

在这里最简单的方法是使用数据帧和窗口函数与明令但据我所知Flambo没有实现所需的方法。它总是可以使用原始SQL或访问的Java / Scala的API,但如果你想避免这种情况,你可以试试下面的管道。

The simplest approach here would be to use Data Frames and Window functions with explicit order but as far as I know Flambo doesn't implement required methods. It is always possible to use raw SQL or access Java/Scala API but if you want to avoid this you can try following pipeline.

首先,让我们创建与每个分区最后的值广播变量:

First lets create a broadcast variable with last values per partition:

(require '[flambo.broadcast :as bd])
(import org.apache.spark.TaskContext)

(def last-per-part (f/fn [it]
  (let [context (TaskContext/get) xs (iterator-seq it)]
  [[(.partitionId context) (last xs)]])))

(def last-vals-bd
 (bd/broadcast sc
   (into {} (-> rdd (f/map-partitions last-per-part) (f/collect)))))

接下来一些辅助的实际工作:

Next some helper for the actual job:

(defn fill-pair [col]
  (fn [x] (let [[a b] x] (if (empty? (nth b col)) (assoc b col (nth a col)) b))))

(def fill-pairs
  (f/fn [it] (let [part-id (.partitionId (TaskContext/get)) ;; Get partion ID
                   xs (iterator-seq it) ;; Convert input to seq
                   prev (if (zero? part-id) ;; Find previous element
                     (first xs) ((bd/value last-vals-bd) part-id))        
                   ;; Create seq of pairs (prev, current)
                   pairs (partition 2 1 (cons prev xs))
                   ;; Same as before
                   {:keys [ col ]} input
                   ;; Prepare mapping function
                   mapper (fill-pair col)]
               (map mapper pairs))))

最后,你可以使用填写对地图分区

(-> rdd (f/map-partitions fill-pairs) (f/collect))

这里隐藏假设是分区的顺序如下的值的顺序。它可能会或可能不会在一般的情况,但没有明确的排序,它可能是你可以得到最好的。

A hidden assumption here is that order of the partitions follows order of the values. It may or may not be in general case but without explicit ordering it is probably the best you can get.

另一种方法是 zipWithIndex ,价值交换秩序和偏移进行加盟。

Alternative approach is to zipWithIndex, swap order of values and perform join with offset.

(require '[flambo.tuple :as tp])

(def rdd-idx (f/map-to-pair (.zipWithIndex rdd) #(.swap %)))

(def rdd-idx-offset
  (f/map-to-pair rdd-idx
    (fn [t] (let [p (f/untuple t)] (tp/tuple (dec' (first p)) (second p))))))

(f/map (f/values (.rightOuterJoin rdd-idx-offset rdd-idx)) f/untuple)

接下来,您可以使用映射类似的方法和以前一样。

Next you can map using similar approach as before.

修改

使用原子快速注。这是什么问题也没有引用透明的,并且你不能利用给定实现的偶然性质的合同。有没有在需要在给定的顺序处理元素地图语义。如果内部实现的改变,可能不再有效。使用Clojure的

Quick note on using atoms. What is the problem there is lack of referential transparency and that you're leveraging incidental properties of a given implementation not a contract. There is nothing in the map semantics that requires elements to be processed in a given order. If internal implementation changes it may be no longer valid. Using Clojure

(defn foo [x] (let [aa @a] (swap! a (fn [&args] x)) aa))

(def a (atom 0))
(map foo (range 1 20))

相比于

(def a (atom 0))
(pmap foo (range 1 20))

这篇关于从clojure.lang.LazySeq转换为键入org.apache.spark.api.java.JavaRDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆