在pre-排序输入星火特征矢量变换 [英] Spark Feature Vector Transformation on Pre-Sorted Input
问题描述
我在HDFS上,看起来像这样的制表符分隔的文件中的一些数据:
标签| USER_ID |特征
------------------------------
POS机| 111 | www.abc.com
POS机| 111 | www.xyz.com
POS机| 111 |火狐
POS机| 222 | www.example.com
POS机| 222 | www.xyz.com
POS机| 222 | IE浏览器
NEG | 333 | www.jkl.com
NEG | 333 | www.xyz.com
NEG | 333 |铬
我需要改造它来创建一个特征向量每个USER_ID培养了 org.apache.spark.ml.classification.NaiveBayes
模式。
我目前的做法是主要是:
- 加载原始数据转换成一个数据帧
- 指数与StringIndexer功能
- 再往RDD和组由user_ID的和特征指数映射到一个稀疏的矢量。
起脚这是......在数据已经美元的USER_ID 点$ P排序。什么是利用这一点的最好方法?它的痛苦,我想大概是多少不必要的工作可能发生。
在情况稍微code是有助于了解我目前的做法,这里是图的本质是:
VAL featurization =(瓦尔斯:(字符串,可迭代[行]))=> {
//创建所有的特征指标的序列
//注:索引是在一个previous一步做了不显示
VAL SEQ = vals._2.map(X =>(x.getDouble(1).toInt,1.0D))。toSeq //创建稀疏矢量
VAL featureVector = Vectors.sparse(maxIndex,SEQ) //串标签转换成一个双
VAL标签= IF(vals._2.head.getString(2)==POS)1.0 0.0其他 (标签,vals._1,featureVector)
}d.rdd
.groupBy(_。的getString(1))
.MAP(featurization)
.toDF(标签,USER_ID,特色)
让我们开始与您的其他问题
如果我的数据在磁盘上保证是pre排序由将用于一组聚集或降低的关键,有没有什么办法火花来利用呢?
块引用>这要看情况。如果应用操作可以从地图端聚合受益,那么你可以通过让$ P $收获颇多psorted数据,而无需在code任何进一步的干预。数据共享相同的密钥应该位于同一分区,并且可以洗牌之前局部聚集
不幸的是它不会帮助很多在此特定情形。即使启用地图端聚合(
GROUPBY(关键)
不使用的,所以你就需要自定义实现)或聚集在特征向量(你会发现一些例子在回答 SPARK数据框:自定义的聚合函数和向量的列)没有太多的收获。你可以在这里和那里节省一些工作,但你还是要节点之间传输的所有指标。如果你想获得更多,你就必须做一点点更多的工作。我可以看到两种基本方法可以利用现有的顺序为:
使用定制的Hadoop的输入格式来产生唯一完整记录(标签,标识,所有的功能),而不是按行读数据线。如果你的数据有固定每个ID的行数,你甚至可以尝试使用
NLineInputFormat
并应用mapPartitions
来汇总记录后。这绝对是更详细的解决方案,但需要星火没有额外的洗牌。
读取数据和往常一样,但使用自定义分区为
GROUPBY
。至于我可以告诉使用rangePartitioner
应该工作得很好,但要确保你可以尝试以下操作:
- 使用
mapPartitionsWithIndex
找到每个分区的最小/最大ID。- 创建分区这使最小< = IDS<在电流(第i 的)分区和最大最大的推到分区的 I + 1 的
- 使用这个分区为
GROUPBY(关键)
这可能是更友好的解决方案,但要求至少有一些洗牌。如果预期数量的记录,以移动为低(小于<#记录,每个分区),您可以使用,即使处理这种不洗牌
mapPartitions
和广播
*虽然有分区可以更有效和更便宜的在实践中得到的。*可以使用类似这样的方法:<一href=\"http://stackoverflow.com/a/33072089/1560062\">http://stackoverflow.com/a/33072089/1560062
I have some data in a tab-delimited file on HDFS that looks like this:
label | user_id | feature ------------------------------ pos | 111 | www.abc.com pos | 111 | www.xyz.com pos | 111 | Firefox pos | 222 | www.example.com pos | 222 | www.xyz.com pos | 222 | IE neg | 333 | www.jkl.com neg | 333 | www.xyz.com neg | 333 | Chrome
I need to transform it to create a feature vector for each user_id to train a
org.apache.spark.ml.classification.NaiveBayes
model.My current approach is the essentially the following:
- Load the raw data into a DataFrame
- Index the features with StringIndexer
- Go down to the RDD and Group by user_id and map the feature indices into a sparse Vector.
The kicker is this... the data is already pre-sorted by user_id. What's the best way to take advantage of that? It pains me to think about how much needless work may be occurring.
In case a little code is helpful to understand my current approach, here is the essence of the map:
val featurization = (vals: (String,Iterable[Row])) => { // create a Seq of all the feature indices // Note: the indexing was done in a previous step not shown val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq // create the sparse vector val featureVector = Vectors.sparse(maxIndex, seq) // convert the string label into a Double val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0 (label, vals._1, featureVector) } d.rdd .groupBy(_.getString(1)) .map(featurization) .toDF("label","user_id","features")
解决方案Lets start with your other question
If my data on disk is guaranteed to be pre-sorted by the key which will be used for a group aggregation or reduce, is there any way for Spark to take advantage of that?
It depends. If operation you apply can benefit from map-side aggregation then you can gain quite a lot by having presorted data without any further intervention in your code. Data sharing the same key should located on the same partitions and can be aggregated locally before shuffle.
Unfortunately it won't help much in this particular scenario. Even if you enable map side aggregation (
groupBy(Key)
doesn't use is so you'll need custom implementation) or aggregate over feature vectors (you'll find some examples in my answer to SPARK DataFrame: custom aggregation function to sum a column of Vectors) there is not much to gain. You can save some work here and there but you still have to transfer all indices between nodes.If you want to gain more you'll have to do a little bit more work. I can see two basic ways you can leverage existing order:
Use custom Hadoop input format to yield only complete records (label, id, all features) instead of reading data line by line. If your data has fixed number of lines per id you could even try to use
NLineInputFormat
and applymapPartitions
to aggregate records afterwards.This is definitely more verbose solution but requires no additional shuffling in Spark.
Read data as usual but use custom partitioner for
groupBy
. As far as I can tell usingrangePartitioner
should work just fine but to be sure you can try following procedure:
- use
mapPartitionsWithIndex
to find minimum / maximum id per partition.- create partitioner which keeps minimum <= ids < maximum on the current (i-th) partition and pushes maximum to the partition i + 1
- use this partitioner for
groupBy(Key)
It is probably more friendly solution but requires at least some shuffling. If expected number of records to move is low (<< #records-per-partition) you can even handle this without shuffle using
mapPartitions
andbroadcast
* although having partitioned can be more useful and cheaper to get in practice.
* You can use an approach similar to this: http://stackoverflow.com/a/33072089/1560062
这篇关于在pre-排序输入星火特征矢量变换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!