Skip to content

Commit fc0e30a

Browse files
Merge pull request #155 from jiangzhonglian/master
更新 15章 理论和项目案例
2 parents 3a8513b + a901e7f commit fc0e30a

4 files changed

Lines changed: 125 additions & 56 deletions

File tree

docs/15.大数据与MapReduce.md

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,47 @@ Pegasos是指原始估计梯度求解器(Peimal Estimated sub-GrAdient Solver)
143143

144144
> 训练算法
145145
146-
[完整代码地址](https://github.com/apachecn/MachineLearning/blob/master/src/python/2.KNN/kNN.py): <https://github.com/apachecn/MachineLearning/blob/master/src/python/2.KNN/kNN.py>
147-
148-
149-
我们继续看 Python 版本的代码实现。
146+
```python
147+
def batchPegasos(dataSet, labels, lam, T, k):
148+
"""batchPegasos()
149+
150+
Args:
151+
dataMat 特征集合
152+
labels 分类结果集合
153+
lam 固定值,微调的空间
154+
T 迭代次数
155+
k 待处理列表大小
156+
Returns:
157+
w 权重向量
158+
"""
159+
m, n = shape(dataSet)
160+
w = zeros(n)
161+
dataIndex = range(m)
162+
for t in range(1, T+1):
163+
wDelta = mat(zeros(n)) # 重置 wDelta
164+
165+
# 它是学习率,代表了权重调整幅度的大小。(也可以理解为随机梯度的步长)
166+
# 输入T和K分别设定了迭代次数和待处理列表的大小。在T次迭代过程中,每次需要重新计算eta
167+
eta = 1.0/(lam*t)
168+
random.shuffle(dataIndex)
169+
for j in range(k): # 全部的训练集 内循环中执行批处理,将分类错误的值全部做累加后更新权重向量
170+
i = dataIndex[j]
171+
p = predict(w, dataSet[i, :]) # mapper 代码
172+
173+
# 如果预测正确,并且预测结果的绝对值>=1, 认为没问题。
174+
# 否则算是预测错误, 通过预测错误的结果,来累计更新w.
175+
if labels[i]*p < 1: # mapper 代码
176+
wDelta += labels[i]*dataSet[i, :].A # 累积变化
177+
# w通过不断的随机梯度的方式来优化
178+
w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改
179+
# print '-----', w
180+
# print '++++++', w
181+
return w
182+
```
183+
184+
[完整代码地址](https://github.com/apachecn/MachineLearning/blob/master/src/python/15.BigData_MapReduce/pegasos.py): <https://github.com/apachecn/MachineLearning/blob/master/src/python/15.BigData_MapReduce/pegasos.py>
185+
186+
[MR版本的代码位置](https://github.com/apachecn/MachineLearning/blob/master/src/python/15.BigData_MapReduce/mrSVM.py): <https://github.com/apachecn/MachineLearning/blob/master/src/python/15.BigData_MapReduce/mrSVM.py>
150187

151188
* * *
152189

src/python/15.BigData_MapReduce/mrSVM.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
# coding:utf8
33
'''
44
Created on 2017-04-07
5+
Update on 2017-06-20
56
MapReduce version of Pegasos SVM
67
Using mrjob to automate job flow
7-
@author: Peter/ApacheCN-xy
8+
@author: Peter/ApacheCN-xy/片刻
9+
《机器学习实战》更新地址:https://github.com/apachecn/MachineLearning
810
'''
911
from mrjob.job import MRJob
1012

@@ -17,14 +19,14 @@ class MRsvm(MRJob):
1719

1820
def __init__(self, *args, **kwargs):
1921
super(MRsvm, self).__init__(*args, **kwargs)
20-
self.data = pickle.load(open('input/15.BigData_MapReduce/svmDat27'))
22+
self.data = pickle.load(open('/opt/git/MachineLearning/input/15.BigData_MapReduce/svmDat27'))
2123
self.w = 0
2224
self.eta = 0.69
2325
self.dataList = []
2426
self.k = self.options.batchsize
2527
self.numMappers = 1
2628
self.t = 1 # iteration number
27-
29+
2830
def configure_options(self):
2931
super(MRsvm, self).configure_options()
3032
self.add_passthrough_option(
@@ -42,20 +44,20 @@ def map(self, mapperId, inVals): # 需要 2 个参数
4244
self.w = inVals[1]
4345
elif inVals[0] == 'x':
4446
self.dataList.append(inVals[1]) # 累积数据点计算
45-
elif inVals[0] == 't':
47+
elif inVals[0] == 't': # 迭代次数
4648
self.t = inVals[1]
4749
else:
48-
self.eta = inVals # 这用于 debug, eta未在map中使用
50+
self.eta = inVals # 这用于 debug, eta未在map中使用
4951

5052
def map_fin(self):
51-
labels = self.data[:,-1]
52-
X = self.data[:, 0:-1] # 将数据重新形成 X 和 Y
53-
if self.w == 0:
53+
labels = self.data[:, -1]
54+
X = self.data[:, :-1] # 将数据重新形成 X 和 Y
55+
if self.w == 0:
5456
self.w = [0.001] * shape(X)[1] # 在第一次迭代时,初始化 w
5557
for index in self.dataList:
56-
p = mat(self.w)*X[index, :].T # calc p=w*dataSet[key].T
58+
p = mat(self.w)*X[index, :].T # calc p=w*dataSet[key].T
5759
if labels[index]*p < 1.0:
58-
yield (1, ['u', index]) # 确保一切数据包含相同的key
60+
yield (1, ['u', index]) # 确保一切数据包含相同的key
5961
yield (1, ['w', self.w]) # 它们将在同一个 reducer
6062
yield (1, ['t', self.t])
6163

@@ -66,7 +68,7 @@ def reduce(self, _, packedVals):
6668
elif valArr[0] == 'w':
6769
self.w = valArr[1]
6870
elif valArr[0] == 't':
69-
self.t = valArr[1]
71+
self.t = valArr[1]
7072

7173
labels = self.data[:, -1]
7274
X = self.data[:, 0:-1]

src/python/15.BigData_MapReduce/pegasos.py

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,75 +10,102 @@
1010

1111

1212
def loadDataSet(fileName):
13-
dataMat = []; labelMat = []
13+
dataMat = []
14+
labelMat = []
1415
fr = open(fileName)
1516
for line in fr.readlines():
1617
lineArr = line.strip().split('\t')
17-
#dataMat.append([float(lineArr[0]), float(lineArr[1]), float(lineArr[2])])
18+
# dataMat.append([float(lineArr[0]), float(lineArr[1]), float(lineArr[2])])
1819
dataMat.append([float(lineArr[0]), float(lineArr[1])])
1920
labelMat.append(float(lineArr[2]))
20-
return dataMat,labelMat
21+
return dataMat, labelMat
2122

2223

2324
def seqPegasos(dataSet, labels, lam, T):
24-
m,n = shape(dataSet); w = zeros(n)
25+
m, n = shape(dataSet)
26+
w = zeros(n)
2527
for t in range(1, T+1):
2628
i = random.randint(m)
2729
eta = 1.0/(lam*t)
28-
p = predict(w, dataSet[i,:])
30+
p = predict(w, dataSet[i, :])
2931
if labels[i]*p < 1:
30-
w = (1.0 - 1/t)*w + eta*labels[i]*dataSet[i,:]
32+
w = (1.0 - 1/t)*w + eta*labels[i]*dataSet[i, :]
3133
else:
3234
w = (1.0 - 1/t)*w
3335
print w
3436
return w
3537

3638

3739
def predict(w, x):
38-
return w*x.T
40+
return w*x.T # 就是预测 y 的值
3941

4042

4143
def batchPegasos(dataSet, labels, lam, T, k):
42-
m,n = shape(dataSet); w = zeros(n);
44+
"""batchPegasos()
45+
46+
Args:
47+
dataMat 特征集合
48+
labels 分类结果集合
49+
lam 固定值,微调的空间
50+
T 迭代次数
51+
k 待处理列表大小
52+
Returns:
53+
w 权重向量
54+
"""
55+
m, n = shape(dataSet)
56+
w = zeros(n)
4357
dataIndex = range(m)
4458
for t in range(1, T+1):
45-
wDelta = mat(zeros(n)) # 重置 wDelta
59+
wDelta = mat(zeros(n)) # 重置 wDelta
60+
61+
# 它是学习率,代表了权重调整幅度的大小。(也可以理解为随机梯度的步长)
62+
# 输入T和K分别设定了迭代次数和待处理列表的大小。在T次迭代过程中,每次需要重新计算eta
4663
eta = 1.0/(lam*t)
4764
random.shuffle(dataIndex)
48-
for j in range(k):# 全部的训练集
65+
for j in range(k): # 全部的训练集 内循环中执行批处理,将分类错误的值全部做累加后更新权重向量
4966
i = dataIndex[j]
50-
p = predict(w, dataSet[i,:]) # mapper 代码
51-
if labels[i]*p < 1: # mapper 代码
52-
wDelta += labels[i]*dataSet[i,:].A # 累积变化
53-
w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改
54-
return w
67+
p = predict(w, dataSet[i, :]) # mapper 代码
5568

69+
# 如果预测正确,并且预测结果的绝对值>=1, 认为没问题。
70+
# 否则算是预测错误, 通过预测错误的结果,来累计更新w.
71+
if labels[i]*p < 1: # mapper 代码
72+
wDelta += labels[i]*dataSet[i, :].A # 累积变化
73+
# w通过不断的随机梯度的方式来优化
74+
w = (1.0 - 1/t)*w + (eta/k)*wDelta # 在每个 T上应用更改
75+
# print '-----', w
76+
# print '++++++', w
77+
return w
5678

5779

58-
datArr,labelList = loadDataSet('testSet.txt')
80+
datArr, labelList = loadDataSet('input/15.BigData_MapReduce/testSet.txt')
5981
datMat = mat(datArr)
60-
#finalWs = seqPegasos(datMat, labelList, 2, 5000)
82+
# finalWs = seqPegasos(datMat, labelList, 2, 5000)
6183
finalWs = batchPegasos(datMat, labelList, 2, 50, 100)
6284
print finalWs
6385

6486
import matplotlib
6587
import matplotlib.pyplot as plt
6688
fig = plt.figure()
6789
ax = fig.add_subplot(111)
68-
x1=[]; y1=[]; xm1=[]; ym1=[]
90+
x1 = []
91+
y1 = []
92+
xm1 = []
93+
ym1 = []
6994
for i in range(len(labelList)):
7095
if labelList[i] == 1.0:
71-
x1.append(datMat[i,0]); y1.append(datMat[i,1])
96+
x1.append(datMat[i, 0])
97+
y1.append(datMat[i, 1])
7298
else:
73-
xm1.append(datMat[i,0]); ym1.append(datMat[i,1])
99+
xm1.append(datMat[i, 0])
100+
ym1.append(datMat[i, 1])
74101
ax.scatter(x1, y1, marker='s', s=90)
75102
ax.scatter(xm1, ym1, marker='o', s=50, c='red')
76103
x = arange(-6.0, 8.0, 0.1)
77-
y = (-finalWs[0,0]*x - 0)/finalWs[0,1]
78-
#y2 = (0.43799*x)/0.12316
79-
y2 = (0.498442*x)/0.092387 #2 iterations
80-
ax.plot(x,y)
81-
ax.plot(x,y2,'g-.')
82-
ax.axis([-6,8,-4,5])
83-
ax.legend(('50 Iterations', '2 Iterations') )
104+
y = (-finalWs[0, 0]*x - 0)/finalWs[0, 1]
105+
# y2 = (0.43799*x)/0.12316
106+
y2 = (0.498442*x)/0.092387 # 2 iterations
107+
ax.plot(x, y)
108+
ax.plot(x, y2, 'g-.')
109+
ax.axis([-6, 8, -4, 5])
110+
ax.legend(('50 Iterations', '2 Iterations'))
84111
plt.show()

src/python/15.BigData_MapReduce/proximalSVM.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,39 @@
1+
#!/usr/bin/python
2+
# coding:utf8
13
'''
2-
Created on Feb 25, 2011
3-
4-
@author: Peter
4+
Created on 2011-02-25
5+
Update on 2017-06-20
6+
@author: Peter/ApacheCN-xy/片刻
7+
《机器学习实战》更新地址:https://github.com/apachecn/MachineLearning
58
'''
69
import numpy
710

811
def map(key, value):
912
# input key= class for one training example, e.g. "-1.0"
1013
classes = [float(item) for item in key.split(",")] # e.g. [-1.0]
1114
D = numpy.diag(classes)
12-
15+
1316
# input value = feature vector for one training example, e.g. "3.0, 7.0, 2.0"
1417
featurematrix = [float(item) for item in value.split(",")]
1518
A = numpy.matrix(featurematrix)
16-
19+
1720
# create matrix E and vector e
18-
e = numpy.matrix(numpy.ones(len(A)).reshape(len(A),1))
19-
E = numpy.matrix(numpy.append(A,-e,axis=1))
20-
21+
e = numpy.matrix(numpy.ones(len(A)).reshape(len(A), 1))
22+
E = numpy.matrix(numpy.append(A, -e, axis=1))
23+
2124
# create a tuple with the values to be used by reducer
2225
# and encode it with base64 to avoid potential trouble with '\t' and '\n' used
2326
# as default separators in Hadoop Streaming
24-
producedvalue = base64.b64encode(pickle.dumps( (E.T*E, E.T*D*e) )
25-
27+
producedvalue = base64.b64encode(pickle.dumps( (E.T*E, E.T*D*e))
28+
2629
# note: a single constant key "producedkey" sends to only one reducer
2730
# somewhat "atypical" due to low degree of parallism on reducer side
2831
print "producedkey\t%s" % (producedvalue)
29-
32+
3033
def reduce(key, values, mu=0.1):
3134
sumETE = None
3235
sumETDe = None
33-
36+
3437
# key isn't used, so ignoring it with _ (underscore).
3538
for _, value in values:
3639
# unpickle values
@@ -39,13 +42,13 @@ def reduce(key, values, mu=0.1):
3942
# create the I/mu with correct dimensions
4043
sumETE = numpy.matrix(numpy.eye(ETE.shape[1])/mu)
4144
sumETE += ETE
42-
45+
4346
if sumETDe == None:
4447
# create sumETDe with correct dimensions
4548
sumETDe = ETDe
4649
else:
4750
sumETDe += ETDe
48-
51+
4952
# note: omega = result[:-1] and gamma = result[-1]
5053
# but printing entire vector as output
5154
result = sumETE.I*sumETDe

0 commit comments

Comments
 (0)