0%

GBDT

GBDT算法是近年来被提及比较多的一个算法,这主要得益于其算法的性能,以及该算法在各类数据挖掘以及机器学习比赛中的卓越表现,有很多人对GBDT算法进行了开源代码的开发,比较火的是陈天奇的XGBoost和微软的LightGBM。GBDT在BAT等大厂中应用也比较多。

GBDT理论推导

GBDT公式推导文章

这边建议把公式自己推几次,能非常好的理解GBDT

代码实现与详解

首先需要一个CART回归树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# -*- coding: utf-8 -*-
import numpy as np
import copy

class node:
'''树的节点的类
'''
def __init__(self, fea=-1, value=None, results=None, right=None, left=None):
self.fea = fea # 用于切分数据集的属性的列索引值
self.value = value # 设置划分的值
self.results = results # 存储叶节点的值
self.right = right # 右子树
self.left = left # 左子树

class CART_RT(object):
'''CART算法的类
'''
def __init__(self,data_X,data_Y,min_sample, min_err):
'''初始化CART类参数
'''
self.data_X = data_X #待回归样本数据的特征
self.data_Y = data_Y #待回归样本数据的标签
self.min_sample = min_sample # 每个叶节点最多的样本数
self.min_err = min_err #最小方差

def fit(self):
'''构建树
input: data(list):训练样本
min_sample(int):叶子节点中最少的样本数
min_err(float):最小的error
output: node:树的根结点
'''
# 将样本特征与样本标签合成完整的样本
data = combine(self.data_X,self.data_Y)
# 构建决策树,函数返回该决策树的根节点
if len(data) <= self.min_sample:
return node(results=leaf(data))

# 1、初始化
best_err = err_cnt(data)
bestCriteria = None # 存储最佳切分属性以及最佳切分点
bestSets = None # 存储切分后的两个数据集

# 2、开始构建CART回归树
feature_num = len(data[0]) - 1
for fea in range(0, feature_num):
feature_values = {}
for sample in data:
feature_values[sample[fea]] = 1
for value in feature_values.keys():
# 2.1、尝试划分
(set_1, set_2) = split_tree(data, fea, value)
combine_set_1 = combine(set_1[0],set_1[1])
combine_set_2 = combine(set_2[0],set_2[1])
if len(combine_set_1) < 2 or len(combine_set_2) < 2:
continue
# 2.2、计算划分后的error值
now_err = err_cnt(combine_set_1) + err_cnt(combine_set_2)
# 2.3、更新最优划分
if now_err < best_err and len(combine_set_1) > 0 and len(combine_set_2) > 0:
best_err = now_err
bestCriteria = (fea, value)
bestSets = (set_1, set_2)

# 3、判断划分是否结束
if best_err > self.min_err:
right = CART_RT(bestSets[0][0],bestSets[0][1], self.min_sample, self.min_err).fit()
left = CART_RT(bestSets[1][0],bestSets[1][1], self.min_sample, self.min_err).fit()
return node(fea=bestCriteria[0], value=bestCriteria[1], right=right, left=left)
else:
return node(results=leaf(data)) # 返回当前的类别标签作为最终的类别标签

def combine(data_X,data_Y):
'''样本特征与标签合并
input:data_X(list):样本特征
data_Y(list):样本标签
output:data(list):样本数据
'''
m = len(data_X)
data = copy.deepcopy(data_X)
for i in range(m):
data[i].append(data_Y[i])
return data

def err_cnt(data):
'''回归树的划分指标
input: data(list):训练数据
output: m*s^2(float):总方差
'''
data = np.mat(data)
return np.var(data[:, -1]) * np.shape(data)[0]

def split_tree(data, fea, value):
'''根据特征fea中的值value将数据集data划分成左右子树
input: data(list):训练样本
fea(float):需要划分的特征index
value(float):指定的划分的值
output: (set_1, set_2)(tuple):左右子树的聚合
'''
set_1 = [] # 右子树的集合
set_2 = [] # 左子树的集合
tmp_11 = []
tmp_12 = []
tmp_21 = []
tmp_22 = []
for x in data:
if x[fea] >= value:
tmp_11.append(x[0:-1])
tmp_12.append(x[-1])
else:
tmp_21.append(x[0:-1])
tmp_22.append(x[-1])
set_1.append(tmp_11)
set_1.append(tmp_12)
set_2.append(tmp_21)
set_2.append(tmp_22)
return (set_1, set_2)

def leaf(dataSet):
'''计算叶节点的值
input: dataSet(list):训练样本
output: mean(data[:, -1])(float):均值
'''
data = np.mat(dataSet)
return np.mean(data[:, -1])

def predict(sample, tree):
'''对每一个样本sample进行预测
input: sample(list):样本
tree:训练好的CART回归树模型
output: results(float):预测值
'''
# 1、只是树根
if tree.results != None:
return tree.results
else:
# 2、有左右子树
val_sample = sample[tree.fea] # fea处的值
branch = None
# 2.1、选择右子树
if val_sample >= tree.value:
branch = tree.right
# 2.2、选择左子树
else:
branch = tree.left
return predict(sample, branch)

def cal_error(data_X,data_Y, tree):
''' 评估CART回归树模型
input: data(list):
tree:训练好的CART回归树模型
output: err/m(float):均方误差
'''
m = len(data_X) # 样本的个数
n = len(data_X[0]) # 样本中特征的个数
err = 0.0
for i in range(m):
tmp = []
for j in range(n):
tmp.append(data_X[i][j])
pre = predict(tmp, tree) # 对样本计算其预测值
# 计算残差
err += (data_Y[i] - pre) * (data_Y[i] - pre)
return err / m

把上面的代码文件命名为CART_regression_tree.py

然后进行回归算法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# -*- coding: utf-8 -*-
import CART_regression_tree
import numpy as np

def load_data(data_file):
'''导入训练数据
input: data_file(string):保存训练数据的文件
output: data(list):训练数据
'''
data_X = []
data_Y = []
f = open(data_file)
for line in f.readlines():
sample = []
lines = line.strip().split("\t")
data_Y.append(float(lines[-1]))
for i in range(len(lines) - 1):
sample.append(float(lines[i])) # 转换成float格式
data_X.append(sample)
f.close()
return data_X,data_Y

class GBDT_RT(object):
'''GBDT回归算法类
'''
def __init__(self):
self.trees = None ##用于存放GBDT的树
self.learn_rate = learn_rate ## 学习率,防止过拟合
self.init_value = None ##初始数值
self.fn = lambda x: x

def get_init_value(self,y):
'''计算初始数值为平均值
input:y(list):样本标签列表
output:average(float):样本标签的平均值
'''
average = sum(y)/len(y)
return average

def get_residuals(self,y,y_hat):
'''计算样本标签标签与预测列表的残差
input:y(list):样本标签列表
y_hat(list):预测标签列表
output:y_residuals(list):样本标签标签与预测列表的残差
'''
y_residuals = []
for i in range(len(y)):
y_residuals.append(y[i] - y_hat[i])
return y_residuals

def fit(self,data_X,data_Y,n_estimators,learn_rate,min_sample, min_err):
'''训练GBDT模型
input:self(object):GBDT_RT类
data_X(list):样本特征
data_Y(list):样本标签
n_estimators(int):GBDT中CART树的个数
learn_rate(float):学习率
min_sample(int):学习CART时叶节点的最小样本数
min_err(float):学习CART时最小方差
'''
## 初始化预测标签和残差
self.init_value = self.get_init_value(data_Y)

n = len(data_Y)
y_hat = [self.init_value] * n ##初始化预测标签
y_residuals = self.get_residuals(data_Y,y_hat)

self.trees = []
self.learn_rate = learn_rate
## 迭代训练GBDT
for j in range(n_estimators):
idx = range(n)
X_sub = [data_X[i] for i in idx] ## 样本特征列表
residuals_sub = [y_residuals[i] for i in idx] ## 标签残差列表

tree = CART_regression_tree.CART_RT(X_sub,residuals_sub, min_sample, min_err).fit()
res_hat = [] ##残差的预测值
for m in range(n):
res_hat.append(CART_regression_tree.predict(data_X[m],tree))
## 计算此时的预测值等于原预测值加残差预测值
y_hat = [y_hat[i] + self.learn_rate * res_hat[i] for i in idx]
y_residuals = self.get_residuals(data_Y,y_hat)
self.trees.append(tree)

def GBDT_predict(self,xi):
'''预测一个样本
'''
return self.fn(self.init_value + sum(self.learn_rate * CART_regression_tree.predict(xi,tree) for tree in self.trees))

def GBDT_predicts(self,X):
'''预测多个样本
'''
return [self.GBDT_predict(xi) for xi in X]

def error(Y_test,predict_results):
'''计算预测误差
input:Y_test(list):测试样本标签
predict_results(list):测试样本预测值
output:error(float):均方误差
'''
Y = np.mat(Y_test)
results = np.mat(predict_results)
error = np.square(Y - results).sum() / len(Y_test)
return error


if __name__ == '__main__':
print ("------------- 1.load data ----------------")
X_data,Y_data = load_data("sine.txt")
X_train = X_data[0:150]
Y_train = Y_data[0:150]
X_test = X_data[150:200]
Y_test = Y_data[150:200]
print('------------2.Parameters Setting-----------')
n_estimators = 4
learn_rate = 0.5
min_sample = 30
min_err = 0.3
print ("--------------3.build GBDT ---------------")
gbdt_rt = GBDT_RT()
gbdt_rt.fit(X_train,Y_train,n_estimators,learn_rate,min_sample, min_err)
print('-------------4.Predict Result--------------')
predict_results = gbdt_rt.GBDT_predicts(X_test)
print('--------------5.Predict Error--------------')
error = error(Y_test,predict_results)
print('Predict error is: ',error)

代码地址

Sklearn库实现

本文以波士顿房价预测任务为例来给出GBDT回归算法的sklearn实现过程。👇数据集解读

均方根误差

直接看公式不太容易读懂,我们把这个公式拆开来看

这很简单,就是真实值与预测值之间的差值,当然这样不行,有正有负,所以我们平方。平方后有一个特性,它对更大的误差的“惩罚更多”。

在训练模型时,批次中有许多样本,我们需要计算每一个的误差并求和

如果要比较不同大小批次的误差,则需要对样本数量进行归一化,即取平均值。

MSE 是 ML 回归模型(例如线性回归)中常用的统计度量和损失函数。当然,平均绝对误差(MAE)可以更好地处理异常值。

值得一提的是,MAE 相比 MSE 有个优点: MAE 对离群点不那么敏感,更有包容性。因为 MAE 计算的是误差 的绝对值,无论误差>1 还是误差<1,没有平方项的作用,惩罚力度都是一样的,所占权重一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import numpy as np
import seaborn as sns
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error

#1.加载数据集
from sklearn.datasets import load_boston
boston = load_boston()

#2.获取特征值和目标值
X , y = boston.data,boston.target

#3.获取特征名称
feature_name = boston.feature_names

#4.划分数据集
from sklearn.model_selection import train_test_split
X_train , X_test , y_train , y_test = train_test_split(X,y,test_size=0.2,random_state=0)

#5.设置GBDT模型参数
params = {'n_estimators': 500, # 弱分类器的个数
'max_depth': 3, # 弱分类器(CART回归树)的最大深度
'min_samples_split': 5, # 分裂内部节点所需的最小样本数
'learning_rate': 0.05, # 学习率
'loss': 'ls'} # 损失函数:均方误差损失函数

#6.模型实例化并训练
from sklearn.ensemble import GradientBoostingRegressor
GBDTreg = GradientBoostingRegressor(**params)
GBDTreg.fit(X_train,y_train)

#7.预测输出并可视化
y_predict = GBDTreg.predict(X_test)
mpl.rcParams['font.sans-serif'] = ['KaiTi', 'SimHei', 'FangSong']
# 汉字字体,优先使用楷体,如果找不到楷体,则使用黑体
mpl.rcParams['font.size'] = 12 # 字体大小

plt.plot(y_predict,label='预测房价')
plt.plot(y_test,label='真实房价')
plt.legend()
plt.show()

#8.计算预测房价与真实房价之间的均方误差
mse = mean_squared_error(y_test,y_predict)
print(mse)

#9.可视化随着弱学习器的增多时偏差的变化
test_score = np.zeros((params['n_estimators'],), dtype=np.float64)
for i, y_pred in enumerate(GBDTreg.staged_predict(X_test)):
test_score[i] = GBDTreg.loss_(y_test, y_predict)

fig = plt.figure(figsize=(6, 6))
plt.subplot(1, 1, 1)
plt.title('偏差随弱学习器个数的变化')
plt.plot(np.arange(params['n_estimators']) + 1, GBDTreg.train_score_, 'b-',
label='训练集偏差')
plt.plot(np.arange(params['n_estimators']) + 1, test_score, 'r-',
label='测试集偏差')
plt.legend(loc='upper right')
plt.xlabel('弱学习器的个数')
plt.ylabel('偏差')
fig.tight_layout()

#10.可视化特征的重要性
from sklearn.inspection import permutation_importance
feature_importance = GBDTreg.feature_importances_
sorted_idx = np.argsort(feature_importance)
pos = np.arange(sorted_idx.shape[0]) + .5
fig = plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.barh(pos, feature_importance[sorted_idx], align='center')
plt.yticks(pos, np.array(boston.feature_names)[sorted_idx])
plt.title('Feature Importance (MDI)')

result = permutation_importance(GBDTreg, X_test, y_test, n_repeats=10,
random_state=42, n_jobs=2)
sorted_idx = result.importances_mean.argsort()
plt.subplot(1, 2, 2)
plt.boxplot(result.importances[sorted_idx].T,
vert=False, labels=np.array(boston.feature_names)[sorted_idx])
plt.title("Permutation Importance (test set)")
fig.tight_layout()
plt.show()

-------------本文结束感谢您的阅读-------------
请作者喝一杯蜜雪冰城吧!