如何将参数传递给ML Pipeline.fit方法? [英] How to pass params to a ML Pipeline.fit method?

查看:139
本文介绍了如何将参数传递给ML Pipeline.fit方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用

  • Google Dataproc + Spark
  • Google Bigquery
  • 使用Spark ML KMeans +管道创建作业

如下:

  1. 在bigquery中创建基于用户级别的功能表
    示例:功能表的外观

  1. Create user level based feature table in bigquery
    Example: How the feature table looks like

userid |x1 |x2 |x3 |x4 |x5 |x6 |x7 |x8 |x9 |x10
00013 |0.01 | 0 |0 |0 |0 |0 |0 |0.06 |0.09 | 0.001

userid |x1 |x2 |x3 |x4 |x5 |x6 |x7 |x8 |x9 |x10
00013 |0.01 | 0 |0 |0 |0 |0 |0 |0.06 |0.09 | 0.001

  1. 启动默认设置集群,使用gcloud命令行界面创建集群并运行作业,如此处
  2. 使用提供的入门代码,我读取了BQ表,将RDD转换为Dataframe并传递给KMeans模型/管道:

#!/usr/bin/python
"""BigQuery I/O PySpark example."""
import json
import pprint
import subprocess
import pyspark
import numpy as np
from pyspark.ml.clustering import KMeans
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors, _convert_to_vector
from pyspark.sql.types import Row
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used by the InputFormat.
# This assumes the Google Cloud Storage connector for Hadoop is configured.

bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
 conf = {# Input Parameters
 'mapred.bq.project.id': project,
 'mapred.bq.gcs.bucket': bucket,
 'mapred.bq.temp.gcs.path': input_directory,
 'mapred.bq.input.project.id': 'my-project',
 'mapred.bq.input.dataset.id': 'tempData',
 'mapred.bq.input.table.id': 'userFeatureInBQ'}

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
 'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
 'org.apache.hadoop.io.LongWritable',
 'com.google.gson.JsonObject',conf=conf)

# Tranform the userid-Feature table into feature_data RDD
 feature_data = (
 table_data
  .map(lambda (_, record): json.loads(record))
  .map(lambda   x:(x['x0'],x['x1'],x['x2'],x['x3'],x['x4'],
                  x['x5'],x['x6'],x['x7'],x['x8'],
                  x['x9'],x['x10'])))

# Function to convert each line in RDD into an array, return the vector
  def parseVector(values):
     array = np.array([float(v) for v in values])
     return _convert_to_vector(array)

# Convert the RDD into a row wise RDD
  data = feature_data.map(parseVector)
  row_rdd = data.map(lambda x: Row(x))

sqlContext = SQLContext(sc)

# cache the RDD to improve performance
row_rdd.cache()

# Create a Dataframe
df = sqlContext.createDataFrame(row_rdd, ["features"])

# cache the Dataframe
df.cache()

这是我打印到控制台的Schema和head():

Here is the Schema and head() which I print to the console:

|-- features: vector (nullable = true)
[Row(features=DenseVector([0.01,0,0,0,0,0,0,0.06,0.09,0.001]))]


  1. 按以下方式运行聚类KMeans算法
    • 多次运行模型
    • 使用不同的参数(即,更改#clusters和init_mode)
    • 计算错误或成本指标
    • 选择最佳的模型参数组合
    • 使用KMeans作为估算器创建管道
    • 使用paramMap传递多个参数
  1. Run the clustering KMeans algorithm in following manner
    • Run the model multiple times
    • With different parameters (Namely, change the #clusters and init_mode)
    • Calculate error or Cost metric
    • Choose best model-parameter combination
    • Create pipeline with KMeans as an estimator
    • Pass multiple parameters using paramMap

#Define the paramMap & model
paramMap = ({'k':3,'initMode':'kmeans||'},{'k':3,'initMode':'random'},
  {'k':4,'initMode':'kmeans||'},{'k':4,'initMode':'random'},
  {'k':5,'initMode':'kmeans||'},{'k':5,'initMode':'random'},
  {'k':6,'initMode':'kmeans||'},{'k':6,'initMode':'random'},
  {'k':7,'initMode':'kmeans||'},{'k':7,'initMode':'random'},
  {'k':8,'initMode':'kmeans||'},{'k':8,'initMode':'random'},
  {'k':9,'initMode':'kmeans||'},{'k':9,'initMode':'random'},
  {'k':10,'initMode':'kmeans||'},{'k':10,'initMode':'random'})

 km = KMeans()

 # Create a Pipeline with estimator stage
 pipeline = Pipeline(stages=[km])

 # Call & fit the pipeline with the paramMap
 models = pipeline.fit(df, paramMap)`
 print models


我得到以下带有警告的输出


I get the following output with a warning

7:03:24 WARN org.apache.spark.mllib.clustering.KMeans: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached. [PipelineModel_443dbf939b7bd3bf7bfc, PipelineModel_4b64bb761f4efe51da50, PipelineModel_4f858411ac19beacc1a4, PipelineModel_4f58b894f1d14d79b936, PipelineModel_4b8194f7a5e6be6eaf33, PipelineModel_4fc5b6370bff1b4d7dba, PipelineModel_43e0a196f16cfd3dae57, PipelineModel_47318a54000b6826b20e, PipelineModel_411bbe1c32db6bf0a92b, PipelineModel_421ea1364d8c4c9968c8, PipelineModel_4acf9cdbfda184b00328, PipelineModel_42d1a0c61c5e45cdb3cd, PipelineModel_4f0db3c394bcc2bb9352, PipelineModel_441697f2748328de251c, PipelineModel_4a64ae517d270a1e0d5a, PipelineModel_4372bc8db92b184c05b0]

7:03:24 WARN org.apache.spark.mllib.clustering.KMeans: The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached. [PipelineModel_443dbf939b7bd3bf7bfc, PipelineModel_4b64bb761f4efe51da50, PipelineModel_4f858411ac19beacc1a4, PipelineModel_4f58b894f1d14d79b936, PipelineModel_4b8194f7a5e6be6eaf33, PipelineModel_4fc5b6370bff1b4d7dba, PipelineModel_43e0a196f16cfd3dae57, PipelineModel_47318a54000b6826b20e, PipelineModel_411bbe1c32db6bf0a92b, PipelineModel_421ea1364d8c4c9968c8, PipelineModel_4acf9cdbfda184b00328, PipelineModel_42d1a0c61c5e45cdb3cd, PipelineModel_4f0db3c394bcc2bb9352, PipelineModel_441697f2748328de251c, PipelineModel_4a64ae517d270a1e0d5a, PipelineModel_4372bc8db92b184c05b0]

#Print the cluster centers:
for model in models:
    print vars(model)
    print model.stages[0].clusterCenters()
    print model.extractParamMap()

输出: [array([7.64676638e-07, 3.58531391e-01, 1.68879698e-03, 0.00000000e+00, 1.53477043e-02, 1.25822915e-02, 0.00000000e+00, 6.93060772e-07, 1.41766847e-03, 1.60941306e-02], array([2.36494105e-06, 1.87719732e-02, 3.73829379e-03, 0.00000000e+00, 4.20724542e-02, 2.28675684e-02, 0.00000000e+00, 5.45002249e-06, 1.17331153e-02, 1.24364600e-02])

此处是问题列表,需要以下帮助:

Here it the list of questions and need help with:

  • 我得到一个列表,其中只有2个群集中心作为所有模型的数组,
    • 当我尝试访问管道时,似乎KMeans模型默认为k = 2?为什么会这样?
    • 最后一个循环应该访问pipelineModel和第0阶段并运行clusterCenter()方法?这是正确的方法吗?
    • 为什么会出现未缓存数据的错误?
    • I get a list with only 2 cluster centers as arrays for all models,
      • It seems the KMeans models is defaulting to k=2 when I try to access the pipeline? Why would this happen?
      • The last loop is supposed to access the pipelineModel and the 0th stage and run the clusterCenter() method? Is this the right method?
      • Why do I get the error that data is uncached?
      • 这违反了使用管道并行运行KMeans模型和模型选择的目的,但是我尝试了以下代码:
      #computeError
      def computeCost(model, rdd):`
      """Return the K-means cost (sum of squared distances of
       points to their nearest center) for this model on the given data."""
          cost = callMLlibFunc("computeCostKmeansModel",
                                rdd.map(_convert_to_vector),
                     [_convert_to_vector(c) for c in model.clusterCenters()])
          return cost
      
      cost= np.zeros(len(paramMap))
      
      for i in range(len(paramMap)):
          cost[i] = cost[i] + computeCost(model[i].stages[0], feature_data)
      print cost
      

      这将在循环末尾打印出以下内容:

      This prints out the following at the end of the loop:

      [ 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687]

      [ 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687]

      • 每个模型计算出的成本/错误是否相同?同样,无法使用正确的参数访问pipelineModel.

      非常感谢您的帮助/指导!谢谢!

      Any help/ guidance is much appreciated! Thanks!

      推荐答案

      您的参数没有正确定义.它应该从特定参数映射到值,而不是从任意名称映射. k等于2,因为没有使用您传递的参数,并且每个模型都使用完全相同的默认参数.

      Your param is not properly defined. It should map from the specific parameters to the values, not from arbitrary names. You get k equal 2 because parameters you pass are not utilized and every model uses exactly the same default parameters.

      让我们以示例数据开头:

      Lets start with example data:

      import numpy as np
      from pyspark.mllib.linalg import Vector
      
      df = (sc.textFile("data/mllib/kmeans_data.txt")
        .map(lambda s: Vectors.dense(np.fromstring(s, dtype=np.float64, sep=" ")))
        .zipWithIndex()
        .toDF(["features", "id"]))
      

      Pipeline:

      from pyspark.ml.clustering import KMeans
      from pyspark.ml import Pipeline
      
      km = KMeans()
      
      pipeline = Pipeline(stages=[km])
      

      如上所述,参数映射应使用特定参数作为键.例如:

      As mentioned above parameter map should use specific parameters as the keys. For example:

      params = [
          {km.k: 2, km.initMode: "k-means||"},
          {km.k: 3, km.initMode: "k-means||"},
          {km.k: 4, km.initMode: "k-means||"}
      ]
      
      models = pipeline.fit(df, params=params)
      
      assert [len(m.stages[0].clusterCenters()) for m in models] == [2, 3, 4]
      

      注意:

      • 正确的initMode表示K均值||是k-means||而不是kmeans||.
      • 在管道中使用参数图并不意味着并行训练模型. Spark使训练过程与数据而不是参数并行化.只是一种方便的方法.
      • 您会收到有关未缓存数据的警告,因为K-Means的实际输入不是DataFrame而是已转换的RDD.
      • correct initMode for K-means|| is k-means|| not kmeans||.
      • using parameter map in a Pipeline doesn't mean that model are trained in parallel. Spark parallelizes training process over data not over params. It is nothing more than a convenience method.
      • you get the warning about not cached data because actual input to K-Means is not a DataFrame but transformed RDD.

      这篇关于如何将参数传递给ML Pipeline.fit方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆