如何在 PySpark 中打印用于预测特定行样本的决策路径/规则? [英] How to print the decision path / rules used to predict sample of a specific row in PySpark?

查看:23
本文介绍了如何在 PySpark 中打印用于预测特定行样本的决策路径/规则?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在 Spark DataFrame 中打印特定样本的决策路径?

How to print the decision path of a specific sample in a Spark DataFrame?

Spark Version: '2.3.1'

下面的代码打印了整个模型的决策路径,如何让它打印特定样本的决策路径?比如tagvalue ball等于2的那一行的决策路径

The below code prints the decision path of the whole model, how to make it print a decision path of a specific sample? For example, the decision path of the row where tagvalue ball equals 2

import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler

import findspark
findspark.init()

from pyspark import SparkConf
from pyspark.sql import SparkSession
import pandas as pd

import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id, col, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder.appName('demo')
    .master('local[*]')
    .getOrCreate()

data = pd.DataFrame({
    'ball': [0, 1, 2, 3],
    'keep': [4, 5, 6, 7],
    'hall': [8, 9, 10, 11],
    'fall': [12, 13, 14, 15],
    'mall': [16, 17, 18, 10],
    'label': [21, 31, 41, 51]
})

df = spark.createDataFrame(data)

df = df.withColumn("mono_ID", monotonically_increasing_id())
w = Window().orderBy("mono_ID")
df = df.select(row_number().over(w).alias("tagvalue"), col("*"))

assembler = VectorAssembler(
    inputCols=['ball', 'keep', 'hall', 'fall'], outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='label')

pipeline = Pipeline(stages=[assembler, dtc]).fit(df)
transformed_pipeline = pipeline.transform(df)

#ml_pipeline = pipeline.stages[1]

result = transformed_pipeline.filter(transformed_pipeline.tagvalue == 2)
result.select('tagvalue', 'prediction').show()


+--------+----------+
|tagvalue|prediction|
+--------+----------+
|       2|      31.0|
+--------+----------+

上面打印了标签值2prediction.现在我想要算法中的决策路径,导致该标签值的答案,而不是整个模型.

The above prints the prediction of tagvalue 2. Now I would like the decision path in the algorithm that led to that answer of that tag value rather than the whole model.

我知道以下内容,但打印的是整个模型决策路径而不是特定模型.

I am aware of the following but that prints the whole model decision path rather than a specific model.

ml_pipeline = pipeline.stages[1]
ml_pipeline.toDebugString

scikit 学习中存在的等价物,什么是spark中的等价物?

The equivalent of that exists in scikit learn, what is the equivalence in spark ?

如果您在 scikit learn 中运行以下代码,它将打印该特定示例的决策路径,这是直接从网站中提取的片段.

If you would run the following code in scikit learn, it will print the decision path for that specific sample, here is a snippet straight out of the website.

import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier

iris = load_iris()
X = iris.data
y = iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

estimator = DecisionTreeClassifier(max_leaf_nodes=3, random_state=0)
estimator.fit(X_train, y_train)

n_nodes = estimator.tree_.node_count
children_left = estimator.tree_.children_left
children_right = estimator.tree_.children_right
feature = estimator.tree_.feature
threshold = estimator.tree_.threshold

# First let's retrieve the decision path of each sample. The decision_path
# method allows to retrieve the node indicator functions. A non zero element of
# indicator matrix at the position (i, j) indicates that the sample i goes
# through the node j.

node_indicator = estimator.decision_path(X_test)

# Similarly, we can also have the leaves ids reached by each sample.

leave_id = estimator.apply(X_test)

# Now, it's possible to get the tests that were used to predict a sample or
# a group of samples. First, let's make it for the sample.

sample_id = 0
node_index = node_indicator.indices[node_indicator.indptr[sample_id]:
                                    node_indicator.indptr[sample_id + 1]]

print('Rules used to predict sample %s: ' % sample_id)
for node_id in node_index:
    if leave_id[sample_id] != node_id:
        continue

    if (X_test[sample_id, feature[node_id]] <= threshold[node_id]):
        threshold_sign = "<="
    else:
        threshold_sign = ">"

    print("decision id node %s : (X_test[%s, %s] (= %s) %s %s)" %
          (node_id, 
           sample_id, 
           feature[node_id],
           X_test[sample_id, feature[node_id]], 
           threshold_sign,
           threshold[node_id]))

输出是这样的

用于预测样本 0 的规则:决策 ID 节点 4 : (X_test[0, -2](= 5.1) > -2.0)

Rules used to predict sample 0: decision id node 4 : (X_test[0, -2] (= 5.1) > -2.0)

推荐答案

我稍微更改了您的数据框,以确保我们可以在说明中看到不同的功能
我将汇编程序更改为使用 feature_list,因此我们稍后可以轻松访问该列表
变化如下:

I changed your dataframe just slightly so that we could ensure we could see different features in the explanations
I changed the Assembler to use a feature_list, so we have easy access to that later
changes below:

#change1: ball goes from [0,1,2,3] ->[0,1,1,3] so we can see other features in explanations
#change2: added in multiple paths to the same prediction
#change3: added in a categorical variable
#change3: feature_list so we can re-use those indicies easily later
data = pd.DataFrame({
    'ball': [0, 1, 1, 3, 1, 0, 1, 3],
    'keep': [4, 5, 6, 7, 7, 4, 6, 7],
    'hall': [8, 9, 10, 11, 2, 6, 10, 11],
    'fall': [12, 13, 14, 15, 15, 12, 14, 15],
    'mall': [16, 17, 18, 10, 10, 16, 18, 10],
    'wall': ['a','a','a','a','a','a','c','e'],
    'label': [21, 31, 41, 51, 51, 51, 21, 31]
})

df = spark.createDataFrame(data)

df = df.withColumn("mono_ID", monotonically_increasing_id())
w = Window().orderBy("mono_ID")
df = df.select(row_number().over(w).alias("tagvalue"), col("*"))

indexer = StringIndexer(inputCol='wall', outputCol='wallIndex')
encoder = OneHotEncoder(inputCol='wallIndex', outputCol='wallVec')

#i added this line so feature replacement later is easy because of the indices
features = ['ball','keep','wallVec','hall','fall']
assembler = VectorAssembler(
    inputCols=features, outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='label')

pipeline = Pipeline(stages=[indexer, encoder, assembler, dtc]).fit(df)
transformed_pipeline = pipeline.transform(df)

以下是我发现能够处理决策树本身的方法:

Below is a method I've found to be able to work with the decision tree itself:

#get the pipeline back out, as you've done earlier, this changed to [3] because of the categorical encoders
ml_pipeline = pipeline.stages[3]

#saves the model so we can get at the internals that the scala code keeps private
ml_pipeline.save("mymodel_test")

#read back in the model parameters
modeldf = spark.read.parquet("mymodel_test/data/*")

import networkx as nx


#select only the columns that we NEED and collect into a list
noderows = modeldf.select("id","prediction","leftChild","rightChild","split").collect()


#create a graph for the decision tree; you Could use a simpler tree structure here if you wanted instead of a 'graph'
G = nx.Graph()

#first pass to add the nodes
for rw in noderows:
    if rw['leftChild'] < 0 and rw['rightChild'] < 0:
        G.add_node(rw['id'], cat="Prediction", predval=rw['prediction'])
    else: 
        G.add_node(rw['id'], cat="splitter", featureIndex=rw['split']['featureIndex'], thresh=rw['split']['leftCategoriesOrThreshold'], leftChild=rw['leftChild'], rightChild=rw['rightChild'], numCat=rw['split']['numCategories'])

#second pass to add the relationships, now with additional information
for rw in modeldf.where("leftChild > 0 and rightChild > 0").collect():
    tempnode = G.nodes()[rw['id']]
    G.add_edge(rw['id'], rw['leftChild'], reason="{0} less than {1}".format(features[tempnode['featureIndex']],tempnode['thresh']))
    G.add_edge(rw['id'], rw['rightChild'], reason="{0} greater than {1}".format(features[tempnode['featureIndex']],tempnode['thresh']))

现在让我们构建一个函数来处理所有这些东西
注意:这可以写得更干净

Now let's build a function to work with the all this stuff
Note: this could be written more cleanly

#function to parse the path based on the tagvalue and it's corresponding features
def decision_path(tag2search):
    wanted_row = transformed_pipeline.where("tagvalue = "+str(tag2search)).collect()[0]
    wanted_features = wanted_row['features']
    start_node = G.nodes()[0]
    while start_node['cat'] != 'Prediction':
        #do stuff with categorical variables
        if start_node['numCat'] > 0:
            feature_value = wanted_features[start_node['featureIndex']:start_node['featureIndex'] + start_node['numCat']]
            #this assumes that you'll name all your cat variables with the following syntax 'ball' -> 'ballVec' or 'wall' -> 'wallVec'
            feature_column = features[start_node['featureIndex']]
            original_column = feature_column[:-3]
            valToCheck = [x[original_column] for x in transformed_pipeline.select(feature_column, original_column).distinct().collect() if np.all(x[feature_column].toArray()==feature_value)][0]

            if (valToCheck == wanted_row[original_column]) :
                print("'{0}' value of {1} in [{2}]; ".format(original_column, wanted_row[original_column], valToCheck))
                start_node = G.nodes()[start_node['leftChild']]
            else:
                print("'{0}' value of {1} in [{2}]; ".format(original_column, wanted_row[original_column], valToCheck))
                start_node = G.nodes()[start_node['rightChild']]

        #path to do stuff with non-categorical variables
        else:
            feature_value = wanted_features[start_node['featureIndex']]
            if feature_value > start_node['thresh'][0]:
                print("'{0}' value of {1} was greater than {2}; ".format(features[start_node['featureIndex']], feature_value, start_node['thresh'][0]))
                start_node = G.nodes()[start_node['rightChild']]
            else:
                print("'{0}' value of {1} was less than or equal to {2}; ".format(features[start_node['featureIndex']], feature_value, start_node['thresh'][0]))
                start_node = G.nodes()[start_node['leftChild']]

    print("leads to prediction of {0}".format(start_node['predval']))

结果采用这种形式:

[decision_path(X) for X in range(1,8)]
    'fall' value of 8.0 was greater than 6.0; 
    'ball' value of 0.0 was less than or equal to 1.0; 
    'ball' value of 0.0 was less than or equal to 0.0; 
        leads to prediction of 21.0

    'fall' value of 9.0 was greater than 6.0; 
    'ball' value of 1.0 was less than or equal to 1.0; 
    'ball' value of 1.0 was greater than 0.0; 
    'keep' value of 5.0 was less than or equal to 5.0; 
        leads to prediction of 31.0

    'fall' value of 10.0 was greater than 6.0; 
    'ball' value of 1.0 was less than or equal to 1.0; 
    'ball' value of 1.0 was greater than 0.0; 
    'keep' value of 6.0 was greater than 5.0; 
    'wall' value of a in [a]; 
        leads to prediction of 21.0

    'fall' value of 11.0 was greater than 6.0; 
    'ball' value of 3.0 was greater than 1.0; 
    'wall' value of a in [a]; 
        leads to prediction of 31.0

    'fall' value of 2.0 was less than or equal to 6.0; 
        leads to prediction of 51.0

    'fall' value of 6.0 was less than or equal to 6.0; 
        leads to prediction of 51.0

    'fall' value of 10.0 was greater than 6.0; 
    'ball' value of 1.0 was less than or equal to 1.0; 
    'ball' value of 1.0 was greater than 0.0; 
    'keep' value of 6.0 was greater than 5.0; 
    'wall' value of c in [c]; 
        leads to prediction of 21.0

注意事项:

  • 如果你想只停留在 Spark 世界中,你可以使用 GraphFrames 而不是 networkx(我没有那么奢侈:()
  • 您可以根据需要修改措辞
  • 如果您需要杂质、杂质统计或增益,这些都在保存的模型信息数据框中
  • 我选择使用树而不是解析 .toDebugString 因为访问树听起来更重要(并且可扩展)
    • 在那一点上,只看 .toDebugString 和 sklearn.decision_path 输出,我觉得这些更容易理解/可读
    • If you want to stay exclusively in Spark-world you could use GraphFrames instead of networkx (I don't have that luxury :( )
    • You can modify the phrasing as you wish
    • If you need the impurity, impurityStats, or gain, those are all in the model information dataframe that gets saved
    • I chose to work with the tree instead of parsing the .toDebugString because having access to the tree sounded more foundationally important (and expandable)
      • On that note, just looking at the .toDebugString AND the sklearn.decision_path outputs, I feel that these are more easily understandable/readable

      这篇关于如何在 PySpark 中打印用于预测特定行样本的决策路径/规则?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆