从RDD中的2个值添加一个新的计算列 [英] Add a new calculated column from 2 values in RDD
问题描述
RDD [((String,Int)),Iterable [((String,DateTime,Int, int),(String,DateTime,String,String))]]]
字段到新的RDD,显示2 DateTime
字段之间的差值。
我该怎么做? / p>
您应该可以使用 map
来扩展2组合成3元组,大致如下:
joined.map {case(key,values)=>
val delta = computeDelta(values)
(key,values,delta)
}
或者更简洁:
joined.map {case(k,vs)=> (k,vs,computeDelta(vs))}
然后你的 computeDelta
函数只能提取类型为(String,DateTime,Int,Int)的第一个和第二个值,获取第二个项目(
DateTime
),并使用任何 DateTime
函数计算增量。方便。
如果您希望输出RDD仍然是一个配对RDD,那么您将需要将新的delta字段包装成一个元组,大致如下:
joined.mapValues {values =>
val delta = computeDelta(values)
(值,delta)
}
将保留原始的PairedRDD键,并给出类型为(Iterable [(String,DateTime,Int,Int)],Long)的值
(假设您正在计算类型为的长三角形
)
I have 2 paired RDDs that I joined them together using the same key and I now I want to add a new calculated column using 2 columns from the values part. The new joined RDD type is:
RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])]
I want to add another field to the new RDD which show the delta between the 2 DateTime
fields.
How can I do this?
You should be able to do this using map
to extend the 2-tuples into 3-tuples, roughly as follows:
joined.map{ case (key, values) =>
val delta = computeDelta(values)
(key, values, delta)
}
Or, more concisely:
joined.map{ case (k, vs) => (k, vs, computeDelta(vs)) }
Then your computeDelta
function can just extract the first and second values of type (String, DateTime, Int,Int)
, get the second item (DateTime
) from each and compute the delta using whatever DateTime
functions are convenient.
If you want your output RDD to still be a paired RDD, then you will need to wrap the new delta field into a tuple, roughly as follows:
joined.mapValues{ values =>
val delta = computeDelta(values)
(values, delta)
}
which will preserve the original PairedRDD keys, and give you values of type (Iterable[(String, DateTime, Int,Int)], Long)
(assuming you are calculating deltas of type Long
)
这篇关于从RDD中的2个值添加一个新的计算列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!