在pre-排序输入星火特征矢量变换 [英] Spark Feature Vector Transformation on Pre-Sorted Input

查看:171
本文介绍了在pre-排序输入星火特征矢量变换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在HDFS上,看起来像这样的制表符分隔的文件中的一些数据:

 标签| USER_ID |特征
------------------------------
  POS机| 111 | www.abc.com
  POS机| 111 | www.xyz.com
  POS机| 111 |火狐
  POS机| 222 | www.example.com
  POS机| 222 | www.xyz.com
  POS机| 222 | IE浏览器
  NEG | 333 | www.jkl.com
  NEG | 333 | www.xyz.com
  NEG | 333 |铬

我需要改造它来创建一个特征向量每个USER_ID培养了 org.apache.spark.ml.classification.NaiveBayes 模式。

我目前的做法是主要是:


  1. 加载原始数据转换成一个数据帧

  2. 指数与StringIndexer功能

  3. 再往RDD和组由user_ID的和特征指数映射到一个稀疏的矢量。

起脚这是......在数据已经美元的USER_ID 点$ P排序。什么是利用这一点的最好方法?它的痛苦,我想大概是多少不必要的工作可能发生。

在情况稍微code是有助于了解我目前的做法,这里是图的本质是:

  VAL featurization =(瓦尔斯:(字符串,可迭代[行]))=> {
  //创建所有的特征指标的序列
  //注:索引是在一个previous一步做了不显示
  VAL SEQ = vals._2.map(X =>(x.getDouble(1).toInt,1.0D))。toSeq  //创建稀疏矢量
  VAL featureVector = Vectors.sparse(maxIndex,SEQ)  //串标签转换成一个双
  VAL标签= IF(vals._2.head.getString(2)==POS)1.0 0.0其他  (标签,vals​​._1,featureVector)
}d.rdd
  .groupBy(_。的getString(1))
  .MAP(featurization)
  .toDF(标签,USER_ID,特色)


解决方案

让我们开始与您的其他问题


  

如果我的数据在磁盘上保证是pre排序由将用于一组聚集或降低的关键,有没有什么办法火花来利用呢?


这要看情况。如果应用操作可以从地图端聚合受益,那么你可以通过让$ P $收获颇多psorted数据,而无需在code任何进一步的干预。数据共享相同的密钥应该位于同一分区,并且可以洗牌之前局部聚集

不幸的是它不会帮助很多在此特定情形。即使启用地图端聚合( GROUPBY(关键)不使用的,所以你就需要自定义实现)或聚集在特征向量(你会发现一些例子在回答 SPARK数据框:自定义的聚合函数和向量的列)没有太多的收获。你可以在这里和那里节省一些工作,但你还是要节点之间传输的所有指标。

如果你想获得更多,你就必须做一点点更多的工作。我可以看到两种基本方法可以利用现有的顺序为:


  1. 使用定制的Hadoop的输入格式来产生唯一完整记录(标签,标识,所有的功能),而不是按行读数据线。如果你的数据有固定每个ID的行数,你甚至可以尝试使用 NLineInputFormat 并应用 mapPartitions 来汇总记录后。

    这绝对是更详细的解决方案,但需要星火没有额外的洗牌。


  2. 读取数据和往常一样,但使用自定义分区为 GROUPBY 。至于我可以告诉使用 rangePartitioner 应该工作得很好,但要确保你可以尝试以下操作:


    • 使用 mapPartitionsWithIndex 找到每个分区的最小/最大ID。

    • 创建分区这使最小< = IDS<在电流(第i 的)分区和最大最大的推到分区的 I + 1

    • 使用这个分区为 GROUPBY(关键)

    这可能是更友好的解决方案,但要求至少有一些洗牌。如果预期数量的记录,以移动为低(小于<#记录,每个分区),您可以使用,即使处理这种不洗牌 mapPartitions 广播 *虽然有分区可以更有效和更便宜的在实践中得到的。



*可以使用类似这样的方法:<一href=\"http://stackoverflow.com/a/33072089/1560062\">http://stackoverflow.com/a/33072089/1560062

I have some data in a tab-delimited file on HDFS that looks like this:

label | user_id | feature
------------------------------
  pos | 111     | www.abc.com
  pos | 111     | www.xyz.com
  pos | 111     | Firefox
  pos | 222     | www.example.com
  pos | 222     | www.xyz.com
  pos | 222     | IE
  neg | 333     | www.jkl.com
  neg | 333     | www.xyz.com
  neg | 333     | Chrome

I need to transform it to create a feature vector for each user_id to train a org.apache.spark.ml.classification.NaiveBayes model.

My current approach is the essentially the following:

  1. Load the raw data into a DataFrame
  2. Index the features with StringIndexer
  3. Go down to the RDD and Group by user_id and map the feature indices into a sparse Vector.

The kicker is this... the data is already pre-sorted by user_id. What's the best way to take advantage of that? It pains me to think about how much needless work may be occurring.

In case a little code is helpful to understand my current approach, here is the essence of the map:

val featurization = (vals: (String,Iterable[Row])) => {
  // create a Seq of all the feature indices
  // Note: the indexing was done in a previous step not shown
  val seq = vals._2.map(x => (x.getDouble(1).toInt,1.0D)).toSeq

  // create the sparse vector
  val featureVector = Vectors.sparse(maxIndex, seq)

  // convert the string label into a Double
  val label = if (vals._2.head.getString(2) == "pos") 1.0 else 0.0

  (label, vals._1, featureVector)
}

d.rdd
  .groupBy(_.getString(1))
  .map(featurization)
  .toDF("label","user_id","features")

解决方案

Lets start with your other question

If my data on disk is guaranteed to be pre-sorted by the key which will be used for a group aggregation or reduce, is there any way for Spark to take advantage of that?

It depends. If operation you apply can benefit from map-side aggregation then you can gain quite a lot by having presorted data without any further intervention in your code. Data sharing the same key should located on the same partitions and can be aggregated locally before shuffle.

Unfortunately it won't help much in this particular scenario. Even if you enable map side aggregation (groupBy(Key) doesn't use is so you'll need custom implementation) or aggregate over feature vectors (you'll find some examples in my answer to SPARK DataFrame: custom aggregation function to sum a column of Vectors) there is not much to gain. You can save some work here and there but you still have to transfer all indices between nodes.

If you want to gain more you'll have to do a little bit more work. I can see two basic ways you can leverage existing order:

  1. Use custom Hadoop input format to yield only complete records (label, id, all features) instead of reading data line by line. If your data has fixed number of lines per id you could even try to use NLineInputFormat and apply mapPartitions to aggregate records afterwards.

    This is definitely more verbose solution but requires no additional shuffling in Spark.

  2. Read data as usual but use custom partitioner for groupBy. As far as I can tell using rangePartitioner should work just fine but to be sure you can try following procedure:

    • use mapPartitionsWithIndex to find minimum / maximum id per partition.
    • create partitioner which keeps minimum <= ids < maximum on the current (i-th) partition and pushes maximum to the partition i + 1
    • use this partitioner for groupBy(Key)

    It is probably more friendly solution but requires at least some shuffling. If expected number of records to move is low (<< #records-per-partition) you can even handle this without shuffle using mapPartitions and broadcast* although having partitioned can be more useful and cheaper to get in practice.


* You can use an approach similar to this: http://stackoverflow.com/a/33072089/1560062

这篇关于在pre-排序输入星火特征矢量变换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆