1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
| class Network(object): def __init__(self, sizes=list(), cost=CrossEntropyCost, eta=3.0, mini_batch_size=25, epochs=20, lmbda=0.02): self.num_layers = len(sizes)
self.sizes = sizes
self.cost = cost
self.biases = [np.random.randn(y, 1) for y in sizes[1:]]
self.weights = [np.random.randn(y, x) / np.sqrt(x) for x, y in zip(sizes[:-1], sizes[1:])]
self.eta = eta self.mini_batch_size = mini_batch_size self.epochs = epochs self.lmbda = lmbda
def fit(self, train_data, test_data=None): self.sgd(train_data, test_data)
def predict(self, x_predictions): predictions = [np.argmax(self.feed_forward(x.reshape(784, 1))) for x in x_predictions] return predictions
def score(self, test_data): """评估算法:通过前向传播算法获取测试数据集的网络的输出值, 将输出值与测试数据集的标签进行比对,获取准确率""" n_test = len(test_data) test_results = [(np.argmax(self.feed_forward(x.reshape(784, 1))), np.argmax(y)) for (x, y) in test_data]
return sum(int(x == y) for (x, y) in test_results) / float(n_test)
def feed_forward(self, x): """前向传播算法:对每⼀层计算神经元的激活值:σ(wx + b)""" for b, w in zip(self.biases, self.weights): x = sigmoid(np.dot(w, x) + b)
return x
def sgd(self, train_data, test_data): """实现小批量随机梯度下降算法:train_data是训练数据集列表,每个元素为(x, y);test_data用来评估 每次小批量迭代后的准确率;""" n = len(train_data) for j in xrange(self.epochs): import random random.shuffle(train_data) mini_batches = [train_data[k:(k + self.mini_batch_size)] for k in xrange(0, n, self.mini_batch_size)]
for mini_batch in mini_batches: self.update_mini_batch(mini_batch, n)
if test_data: print "Epoch %s/%s accuracy: %s" % (j, self.epochs, self.score(test_data))
def update_mini_batch(self, mini_batch, train_dataset_length): """对每个mini_batch应用一次梯度下降,更新网络权重和偏置""" nabla_b = [np.zeros(b.shape) for b in self.biases] nabla_w = [np.zeros(w.shape) for w in self.weights]
for x, y in mini_batch: delta_nabla_b, delta_nabla_w = self.back_prop(x.reshape(784, 1), y.reshape(10, 1)) nabla_b = [nb + dnb for nb, dnb in zip(nabla_b, delta_nabla_b)] nabla_w = [nw + dnw for nw, dnw in zip(nabla_w, delta_nabla_w)]
self.weights = [(1 - self.eta * self.lmbda / train_dataset_length) * w - (self.eta / len(mini_batch)) * nw for w, nw in zip(self.weights, nabla_w)] self.biases = [b - (self.eta / len(mini_batch)) * nb for b, nb in zip(self.biases, nabla_b)]
def back_prop(self, x, y): """反向传播算法:逐层获取代价函数关于权重和偏置的偏导数。""" nabla_b = [np.zeros(b.shape) for b in self.biases] nabla_w = [np.zeros(w.shape) for w in self.weights]
activation = x activations = [x] zs = [] for b, w in zip(self.biases, self.weights): z = np.dot(w, activation) + b zs.append(z) activation = sigmoid(z) activations.append(activation)
delta = self.cost.delta(zs[-1], activations[-1], y) nabla_b[-1] = delta nabla_w[-1] = np.dot(delta, activations[-2].transpose())
for l in xrange(2, self.num_layers): z = zs[-l] delta = np.dot(self.weights[-l + 1].transpose(), delta) * sigmoid_prime(z) nabla_b[-l] = delta nabla_w[-l] = np.dot(delta, activations[-l - 1].transpose()) return nabla_b, nabla_w
def load(self, filename='NeuralNetwork.pickle'): with open(filename, 'rb') as f: data = pickle.load(f) self.weights = data["weights"] self.biases = data["biaes"] self.sizes = data["sizes"] self.cost = data["cost"] self.eta = data["eta"] self.mini_batch_size = data["mini_batch_size"] self.epochs = data["epochs"] print "Load model successfully!"
def save(self, filename='NeuralNetwork.pickle'): data = { "weights": self.weights, "biaes": self.biases, "sizes": self.sizes, "cost": self.cost, "eta": self.eta, "mini_batch_size": self.mini_batch_size, "epochs": self.epochs, } import pickle with open(filename, 'wb') as f: pickle.dump(data, f) print "Save mode successfully!"
|