# 统计学习方法读书笔记7-K近邻习题

1147-柳同学

## 热门标签

, , , ,

### 1.课本课后习题

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from sklearn.neighbors import KNeighborsClassifier

# 实例点
x = np.array(
[[0.5, 0.9], [0.7, 2.8], [1.3, 4.6], [1.4, 2.8], [1.7, 0.8], [1.9, 1.4], [2, 4], [2.3, 3], [2.5, 2.5], [2.9, 2],
[2.9, 3], [3, 4.5], [3.3, 1.1], [4, 3.7], [4, 2.2], [4.5, 2.5], [4.6, 1], [5, 4]])

# 类别
y = np.array([0, 1, 0, 1, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0])

# 设置二维网格的边界
x_min, x_max = 0, 6
y_min, y_max = 0, 6

# 设置不同类别区域的颜色
cmap_light = ListedColormap(['#FFFFFF', '#BDBDBD'])

# 生成二维网格
h = .01
xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))

# 这里的参数n_neighbors就是k近邻法中的k
knn = KNeighborsClassifier(n_neighbors=2)
knn.fit(x, y)

# ravel()实现扁平化，比如将形状为3*4的数组变成1*12
# np.c_()在列方向上连接数组，要求被连接数组的行数相同
Z = knn.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)

plt.figure()

# 设置坐标轴的刻度
plt.xticks(tuple([x for x in range(6)]))
plt.yticks(tuple([y for y in range(6) if y != 0]))

# 填充不同分类区域的颜色
# Z与cmap相互对应
plt.pcolormesh(xx, yy, Z, cmap=cmap_light)

# 设置坐标轴标签
plt.xlabel('$x^{(1)}$')
plt.ylabel('$x^{(2)}$')

# 绘制实例点的散点图
plt.scatter(x[:, 0], x[:, 1], c=y)

plt.show()

k=2

k=1

### 2.视频课后习题

k近邻算法的模型复杂度主要体现在k值上
k值比较小时，容易造成过拟合
k值比较大时，容易造成欠拟合

#### 1.KNN自编程

#!usr/bin/env python
# -*- coding:utf-8 _*-
"""
@author: liujie
@software: PyCharm
@file: KNN自编程.py
@time: 2020/10/21 12:58
"""
# KNN-线性扫描

import numpy as np
# 计数函数
from collections import Counter
import matplotlib.pyplot as plt

class KNN():
# 所需参数初始化
def __init__(self,x_train,y_train,k=3):
self.k = k
self.x_train = x_train
self.y_train = y_train

def predict(self,x_test):
# 计算欧式距离-二范数
dist_list = [(np.linalg.norm(x_test-self.x_train[i],2),self.y_train[i])
for i in range(self.x_train.shape[0])]
# dist_list = [(np.sqrt(np.sum(np.square(x_test-self.x_train[i]))), self.y_train[i])
#              for i in range(len(self.x_train.shape[0]))]

# 对所有距离进行排序-sort函数，不过只需要前k个，但这是全部排序，浪费了时间
dist_list.sort(key=lambda x:x[0])
# 取前k个最小距离对应的类别
y_list = [dist_list[i][-1] for i in range(self.k)]
# 对上述k的点的分类进行统计
# Counter是一个计数函数，针对序列中出现次数最多的元素，其中most_common()可直接给出答案
y_count = Counter(y_list).most_common()

return y_count[0][0]

def draw(X_train,y_train,X_new):
# 正负实例点初始化
X_po=np.zeros(X_train.shape[1])
X_ne=np.zeros(X_train.shape[1])
# 区分正、负实例点
# np.vstack垂直（按照行顺序）的把数组给堆叠起来。
for i in range(y_train.shape[0]):
if y_train[i]==1:       # 正例
X_po=np.vstack((X_po,X_train[i]))
else:                   # 负例
X_ne=np.vstack((X_ne,X_train[i]))

# 实例点绘图
plt.plot(X_po[1:,0],X_po[1:,1],"g*",label="1")
plt.plot(X_ne[1:, 0], X_ne[1:, 1], "rx", label="-1")
plt.plot(X_new[:, 0], X_new[:, 1], "bo", label="test_points")

# 测试点坐标值标注
# plt.annotate()函数用于标注文字
for xy in zip(X_new[:, 0], X_new[:, 1]):
plt.annotate("test{}".format(xy),xy)
# 设置坐标轴
plt.axis([0,10,0,10])
plt.xlabel("x1")
plt.ylabel("x2")
# 显示图例
plt.legend()
# 显示图像
plt.show()

def main():
# 训练数据
x_train = np.array([[5,4],
[9,6],
[4,7],
[2,3],
[8,1],
[7,2]])
y_train = np.array([1,1,1,-1,-1,-1])
# 测试数据
x_test = np.array([[5,3]])
# 绘图
draw(x_train,y_train,x_test)
# 不同的k对分类结果的影响
for k in range(1,6,2):
# 构建KNN实例
clf = KNN(x_train,y_train,k=k)
# 对测试数据进行分类预测
y_predict = clf.predict(x_test)
print('k={},被分类为: {}'.format(k,y_predict))

if __name__ == '__main__':
main()

k=1,被分类为: 1
k=3,被分类为: -1
k=5,被分类为: -1

- 多数据处理
for循环
多进程

- 距离用列表排序，浪费时间与空间
用堆这个数据结构
# 线性扫描优化版本:
# 1.多数据处理：for循环，多进程
# 2.距离改用堆这个结构

import numpy as np
# Counter是一个计数函数
from collections import Counter
# 多进程
from concurrent import futures
import matplotlib.pyplot as plt
# 导入堆模块
import heapq
import time

class KNN:
def __init__(self, X_train, y_train, k=3):
# 所需参数初始化
self.k = k  # 所取k值
self.X_train = X_train
self.y_train = y_train

# 堆操作
def predict_single(self, x_new):
# 计算与前k个样本点欧氏距离，距离取负值是把原问题转化为取前k个最大的距离
dist_list = [(-np.linalg.norm(x_new - self.X_train[i], ord=2), self.y_train[i], i)
for i in range(self.k)]
# 利用前k个距离构建堆
heapq.heapify(dist_list)
# 遍历计算与剩下样本点的欧式距离
for i in range(self.k, self.X_train.shape[0]):
dist_i = (-np.linalg.norm(x_new - self.X_train[i], ord=2), self.y_train[i], i)
# 若dist_i 比 dis_list的最小值大，则用dis_i替换该最小值，执行下堆操作
# 最小堆
if dist_i[0] > dist_list[0][0]:
heapq.heappushpop(dist_list, dist_i)
# 若dist_i 比 dis_list的最小值小，堆保持不变，继续遍历
else:
continue
y_list = [dist_list[i][1] for i in range(self.k)]
# [-1,1,1,-1...]
# 对上述k个点的分类进行统计
y_count = Counter(y_list).most_common()
# {1:n,-1:m}
return y_count[0][0]

# 用多进程实现并行，处理多个值的搜索
def predict_many(self, X_new):
# 导入多进程
with futures.ProcessPoolExecutor(max_workers=4) as executor:
# 建立多进程任务
tasks = [executor.submit(self.predict_single, X_new[i]) for i in range(X_new.shape[0])]
# 驱动多进程运行
# 提取运行结果
res = [future.result() for future in done_iter]
return res

def draw(X_train, y_train, X_new):
# 正负实例点初始化
X_po = np.zeros(X_train.shape[1])
X_ne = np.zeros(X_train.shape[1])
# 区分正、负实例点
# np.vstack垂直（按照行顺序）的把数组给堆叠起来。
for i in range(y_train.shape[0]):
if y_train[i] == 1:  # 正例
X_po = np.vstack((X_po, X_train[i]))
else:  # 负例
X_ne = np.vstack((X_ne, X_train[i]))

# 实例点绘图
plt.plot(X_po[1:, 0], X_po[1:, 1], "g*", label="1")
plt.plot(X_ne[1:, 0], X_ne[1:, 1], "rx", label="-1")
plt.plot(X_new[:, 0], X_new[:, 1], "bo", label="test_points")

# 测试点坐标值标注
# plt.annotate()函数用于标注文字
for xy in zip(X_new[:, 0], X_new[:, 1]):
plt.annotate("test{}".format(xy), xy)
# 设置坐标轴
plt.axis([0, 10, 0, 10])
plt.xlabel("x1")
plt.ylabel("x2")
# 显示图例
plt.legend()
# 显示图像
plt.show()

def main():
start = time.time()
# 训练数据
X_train = np.array([[5, 4],
[9, 6],
[4, 7],
[2, 3],
[8, 1],
[7, 2]])
y_train = np.array([1, 1, 1, -1, -1, -1])
# 测试数据
X_new = np.array([[5, 3], [9, 2]])

# 绘图
draw(X_train, y_train, X_new)
# 不同的k(取奇数）对分类结果的影响
for k in range(1, 6, 2):
# 构建KNN实例
clf = KNN(X_train, y_train, k=k)
# 对测试数据进行分类预测
y_predict = clf.predict_many(X_new)
print("k={},被分类为：{}".format(k, y_predict))

end = time.time()
print("用时:{}s".format(round(end - start), 2))

if __name__ == "__main__":
main()
# 难理解-对于树的构造不熟
# 构建kd树，搜索待预测点所属区域
from collections import namedtuple
import numpy as np

# 建立节点类
class Node(namedtuple("Node","location left_child right_child")):
def __repr__(self):
return str(tuple(self))

# kd tree类
class KdTree():
def __init(self,k=1,p=2):
self.k=k
self.p=p
self.kdtree=None

#构建kd tree
def _fit(self,X,depth=0):
try:
k=X.shape[1]
except IndexError as e:
return None
# 这里可以展开，通过方差选择axis
axis=depth % k
X=X[X[:,axis].argsort()]
median=X.shape[0]//2
try:
X[median]
except IndexError:
return None
return Node(
location=X[median],
left_child=self._fit(X[:median],depth+1),
right_child=self._fit(X[median+1:],depth+1)
)

def _search(self,point,tree=None,depth=0,best=None):
if tree is None:
return best
k=point.shape[1]
# 更新 branch
if point[0][depth%k]<tree.location[depth%k]:
next_branch=tree.left_child
else:
next_branch=tree.right_child
if not next_branch is None:
best=next_branch.location
return self._search(point,tree=next_branch,depth=depth+1,best=best)

def fit(self,X):
self.kdtree=self._fit(X)
return self.kdtree

def predict(self,X):
res=self._search(X,self.kdtree)
return res

def main():
KNN=KdTree()
X_train=np.array([[5,4],
[9,6],
[4,7],
[2,3],
[8,1],
[7,2]])
KNN.fit(X_train)
X_new=np.array([[5,3]])
res=KNN.predict(X_new)
print(res)

if __name__=="__main__":
main()

#### 2.sklearn实现

import numpy as np
from sklearn.neighbors import KNeighborsClassifier

def main():
# 训练数据
x_train = np.array([[5,4],
[9,6],
[4,7],
[2,3],
[8,1],
[7,2]])
y_train = np.array([1,1,1,-1,-1,-1])
# 测试数据
x_test = np.array([[5,3]])
# 绘图
# draw(x_train,y_train,x_test)
# 不同的k对分类结果的影响
for k in range(1,6,2):
# 构建KNN实例
clf = KNeighborsClassifier(n_neighbors=k,weights='distance',n_jobs=-1)
# 训练模型
clf.fit(x_train,y_train)
# 预测-二维数据
y_predict = clf.predict(x_test)
# 属于不同分类的概率
print(clf.predict_proba(x_test))
print('k={},被分类为: {}'.format(k,y_predict))

if __name__ == '__main__':
main()
[[0. 1.]]
k=1,被分类为: [1]
[[0.43837481 0.56162519]]
k=3,被分类为: [1]
[[0.45986872 0.54013128]]
k=5,被分类为: [1]

kd tree : 二叉树搜索O(logN)

Vieu3.3主题

Q Q 登 录