pyspark vs scala中的FPgrowth计算协会 [英] FPgrowth computing association in pyspark vs scala

查看:303
本文介绍了pyspark vs scala中的FPgrowth计算协会的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 :

Using :

http://spark.apache.org/docs/1.6.1/mllib-frequent-pattern-mining.html

Python代码:

from pyspark.mllib.fpm import FPGrowth
model = FPGrowth.train(dataframe,0.01,10)

scala:

import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD

val data = sc.textFile("data/mllib/sample_fpgrowth.txt")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

val fpg = new FPGrowth()
  .setMinSupport(0.2)
  .setNumPartitions(10)
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}

val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
  println(
    rule.antecedent.mkString("[", ",", "]")
      + " => " + rule.consequent .mkString("[", ",", "]")
      + ", " + rule.confidence)
}

根据代码此处,它表明scala部分没有最小置信度.

From code here it shows that scala part doesn't have minimum confidence.

def trainFPGrowthModel(
      data: JavaRDD[java.lang.Iterable[Any]],
      minSupport: Double,
      numPartitions: Int): FPGrowthModel[Any] = {
    val fpg = new FPGrowth()
      .setMinSupport(minSupport)
      .setNumPartitions(numPartitions)

    val model = fpg.run(data.rdd.map(_.asScala.toArray))
    new FPGrowthModelWrapper(model)
  }

在pyspark的情况下,如何添加minConfidence以生成关联规则?我们可以看到scala有示例,而python没有示例.

How to add minConfidence to generate association rule in case of pyspark? We can see that scala has the example but python does not have the example.

推荐答案

火花> = 2.2

有一个DataFrame基本ml API提供了AssociationRules:

There is a DataFrame base ml API which provides AssociationRules:

from pyspark.ml.fpm import FPGrowth

data = ...

fpm = FPGrowth(minSupport=0.3, minConfidence=0.9).fit(data)
associationRules = fpm.associationRules.

火花< 2.2

目前,PySpark不支持提取关联规则(具有Python支持的基于DataFrameFPGrowth API正在开发中

As for now PySpark doesn't support extracting association rules (DataFrame based FPGrowth API with Python support is a work in progress SPARK-1450) but we can easily address that.

首先,您必须安装SBT(只需进入下载页面)并按照适用于您的操作系统的说明进行操作.

First you'll have to install SBT (just go the downloads page) and follow the instructions for your operating system.

接下来,您必须创建一个仅包含两个文件的简单Scala项目:

Next you'll have to create a simple Scala project with only two files:

.
├── AssociationRulesExtractor.scala
└── build.sbt

您可以稍后对其进行调整,以遵循已建立的目录结构

You can adjust it later to follow the established directory structure.

接下来,在build.sbt中添加以下内容(调整Scala版本和Spark版本以匹配您使用的版本):

Next add following to the build.sbt (adjust Scala version and Spark version to match the one you use):

name := "fpm"

version := "1.0"

scalaVersion := "2.10.6"

val sparkVersion = "1.6.2"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion
)

,然后进入AssociationRulesExtractor.scala:

package com.example.fpm

import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.rdd.RDD

object AssociationRulesExtractor {
  def apply(rdd: RDD[Rule[String]]) = {
    rdd.map(rule => Array(
      rule.confidence, rule.javaAntecedent, rule.javaConsequent
    ))
  }
}

打开您选择的终端仿真器,转到项目的根目录并调用:

Open terminal emulator of your choice, go to the root directory of the project and call:

sbt package

它将在目标目录中生成一个jar文件.例如,在Scala 2.10中,它将是:

It will generate a jar file in the target directory. For example in Scala 2.10 it will be:

target/scala-2.10/fpm_2.10-1.0.jar

启动PySpark shell或使用spark-submit并将路径传递给生成的jar文件,如--driver-class-path:

Start PySpark shell or use spark-submit and pass path to the generated jar file as to --driver-class-path:

bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar

在非本地模式下:

bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar --jars /path/to/fpm_2.10-1.0.jar

在集群模式下,jar应该出现在所有节点上.

In cluster mode jar should be present on all nodes.

添加一些便利包装:

from pyspark import SparkContext
from pyspark.mllib.fpm import FPGrowthModel
from pyspark.mllib.common import _java2py
from collections import namedtuple


rule = namedtuple("Rule", ["confidence", "antecedent", "consequent"])

def generateAssociationRules(model, minConfidence):
    # Get active context
    sc = SparkContext.getOrCreate()

    # Retrieve extractor object
    extractor = sc._gateway.jvm.com.example.fpm.AssociationRulesExtractor

    # Compute rules
    java_rules = model._java_model.generateAssociationRules(minConfidence)

    # Convert rules to Python RDD
    return _java2py(sc, extractor.apply(java_rules)).map(lambda x:rule(*x))

最后,您可以将这些助手用作函数:

Finally you can use these helpers as a function:

generateAssociationRules(model, 0.9)

或作为一种方法:

FPGrowthModel.generateAssociationRules = generateAssociationRules
model.generateAssociationRules(0.9)

此解决方案取决于内部PySpark方法,因此不能保证在各个版本之间都可移植.

This solution depends on internal PySpark methods so it is not guaranteed that it will be portable between versions.

这篇关于pyspark vs scala中的FPgrowth计算协会的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆