google tutorials cifar10/cifar10_multi_gpu_train.py
1. Task [通过该源码的阅读将掌握什么?]
- [O] TF多GPU运行计算图的方式.To train CIFAR-10 using multiple GPUs with synchronous updates.
- [O] TF底层方法、高级特性实现神经网络的方式深入理解掌握
Overview
CIFAR-10 classification is a common benchmark problem in machine learning. The problem is to classify RGB 32x32 pixel images across 10 categories:
airplane, automobile, bird, cat, deer, dog, frog, horse, ship, and truck.
A binary to train CIFAR-10 using multiple GPUs with synchronous updates.
Accuracy: cifar10_multi_gpu_train.py achieves ~86% accuracy after 100K steps (256 epochs of data) as judged by cifar10_eval.py.
Speed: With batch_size 128.
System | Step Time (sec/batch) | Accuracy
1 Tesla K20m | 0.35-0.60 | ~86% at 60K steps (5 hours) 1 Tesla K40m | 0.25-0.35 | ~86% at 100K steps (4 hours) 2 Tesla K20m | 0.13-0.20 | ~84% at 30K steps (2.5 hours) 3 Tesla K20m | 0.13-0.18 | ~84% at 30K steps 4 Tesla K20m | ~0.10 | ~84% at 30K steps
Usage: Please see the tutorial and website for how to download the CIFAR-10 data set, compile the program and train the model.
http://tensorflow.org/tutorials/deep_cnn/
src code
mark | File | Purpose |
---|---|---|
[x] | cifar10_input.py | Reads the native CIFAR-10 binary file format. |
[x] | cifar10.py | Builds the CIFAR-10 model. |
[x] | cifar10_train.py | Trains a CIFAR-10 model on a CPU or GPU. |
[O] | cifar10_multi_gpu_train.py | Trains a CIFAR-10 model on multiple GPUs. |
[x] | cifar10_eval.py | Evaluates the predictive performance of a CIFAR-10 model. |
下面来分析cifar10_train.py
源码:cifar10_train.py link
src_0 - 命令行参数解析设定:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from datetime import datetime
import os.path
import re
import time
import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
import cifar10
parser = cifar10.parser
parser.add_argument('--train_dir', type=str, default='/tmp/cifar10_train',
help='Directory where to write event logs and checkpoint.')
parser.add_argument('--max_steps', type=int, default=1000000,
help='Number of batches to run.')
parser.add_argument('--num_gpus', type=int, default=1,
help='How many GPUs to use.')
parser.add_argument('--log_device_placement', type=bool, default=False,
help='Whether to log device placement.')
只需关注多设定的一个参数--num_gpus
,顺藤摸瓜
src_1 - tower_loss:
def tower_loss(scope, images, labels):
"""Calculate the total loss on a single tower running the CIFAR model.
Args:
scope: unique prefix string identifying the CIFAR tower, e.g. 'tower_0'
images: Images. 4D tensor of shape [batch_size, height, width, 3].
labels: Labels. 1D tensor of shape [batch_size].
Returns:
Tensor of shape [] containing the total loss for a batch of data
"""
# Build inference Graph.
logits = cifar10.inference(images)
# Build the portion of the Graph calculating the losses. Note that we will
# assemble the total_loss using a custom function below.
_ = cifar10.loss(logits, labels)
# Assemble all of the losses for the current tower only.
losses = tf.get_collection('losses', scope)
# Calculate the total loss for the current tower.
total_loss = tf.add_n(losses, name='total_loss')
# Attach a scalar summary to all individual losses and the total loss; do the
# same for the averaged version of the losses.
for l in losses + [total_loss]:
# Remove 'tower_[0-9]/' from the name in case this is a multi-GPU training
# session. This helps the clarity of presentation on tensorboard.
loss_name = re.sub('%s_[0-9]*/' % cifar10.TOWER_NAME, '', l.op.name)
tf.summary.scalar(loss_name, l)
return total_loss
scope命名空间区分了不同的tower,cifar10.loss
已在前一篇博文中讲述过了,它会将一个批次计算好的损失值存放在key值为 losses
的集合中,将 losses
集合中所有元素通过 tf.add_n
加起来就是当前这个tower的总损失值(注意:这个losses集合中可不单单只有计算交叉熵所得loss,还有其他地方如权重衰减值也加入了这个集合中,所有才要使用tf.add_n求和,具体请回看前篇cifar10.py的解析),最后的for循环是将 losses
的每一项 和 total_loss
这一项 输出到记录文件中。
src_2 - average_gradients:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def average_gradients(tower_grads):
"""Calculate the average gradient for each shared variable across all towers.
Note that this function provides a synchronization point across all towers.
Args:
tower_grads: List of lists of (gradient, variable) tuples. The outer list
is over individual gradients. The inner list is over the gradient
calculation for each tower.
Returns:
List of pairs of (gradient, variable) where the gradient has been averaged
across all towers.
"""
average_grads = []
for grad_and_vars in zip(*tower_grads):
# Note that each grad_and_vars looks like the following:
# ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))
grads = []
for g, _ in grad_and_vars:
# Add 0 dimension to the gradients to represent the tower.
expanded_g = tf.expand_dims(g, 0)
# Append on a 'tower' dimension which we will average over below.
grads.append(expanded_g)
# Average over the 'tower' dimension.
grad = tf.concat(axis=0, values=grads)
grad = tf.reduce_mean(grad, 0)
# Keep in mind that the Variables are redundant because they are shared
# across towers. So .. we will just return the first tower's pointer to
# the Variable.
v = grad_and_vars[0][1]
grad_and_var = (grad, v)
average_grads.append(grad_and_var)
return average_grads
tf.expand_dims
Inserts a dimension of 1 into a tensor’s shape. Given a tensor input, this operation inserts a dimension of 1 at the dimension index axis of input’s shape. The dimension index axis starts at zero; if you specify a negative number for axis it is counted backward from the end.
This operation is useful if you want to add a batch dimension to a single element. For example, if you have a single image of shape [height, width, channels], you can make it a batch of 1 image with expand_dims(image, 0), which will make the shape [1, height, width, channels].
Other examples:
# 't' is a tensor of shape [2] shape(expand_dims(t, 0)) ==> [1, 2] shape(expand_dims(t, 1)) ==> [2, 1] shape(expand_dims(t, -1)) ==> [2, 1] # 't2' is a tensor of shape [2, 3, 5] shape(expand_dims(t2, 0)) ==> [1, 2, 3, 5] shape(expand_dims(t2, 2)) ==> [2, 3, 1, 5] shape(expand_dims(t2, 3)) ==> [2, 3, 5, 1]
这段函数,读取了每一个tower结构训练完成一个批次时返回的梯度、变量,每一个grad_and_vars其shape都形如[[grad0_gpu0,var0_gpu0]...[grad0_gpuN,var0_gpuN]]
(视为一个 Nx2 的矩阵),可以看到,只有第0
列具有可加性,所以,通过解压赋值,以 g
获取第 0
列的值(‘g’ is a tensor of shape [grad0_gpuN]),在使用tf.expand_dims(g,0)
扩展维度后变为[1,grad0_gpuN]
,扩展维度的目的只是为了能够表明这是一个tower输出的,然后通过tf.concat
联结共享变量在各个tower上的梯度值,并求平均值,grad is a tensor of shape [1,grad_avg]
——————-咳咳,敲黑板划重点!!—————–
【注意】tensorflow 官方例程中实现的多GPU并行计算是采取的数据并行
模式。这意味着每个tower都是复用了同一个model,变量是在多个tower之间共享的,也就是说,变量是有冗余的!如同我们的命名中表现出来的,var0_gpu0、var0_gpu1、var0_gpuN,变量var0在N个GPU设备上共享。本例中,v
取了第一个tower(第一个GPU设备)的variable作为新的变量名,将前面求出的梯度平均值 grad
与 v
先构成元组grad_and_var
,谨记,这个元组的形如 ([1, grad_avg], var0_gpu0)
,这样就完成了一个批次的平均梯度计算。
src_3 - train():
def train():
"""Train CIFAR-10 for a number of steps."""
with tf.Graph().as_default(), tf.device('/cpu:0'):
# Create a variable to count the number of train() calls. This equals the
# number of batches processed * FLAGS.num_gpus.
global_step = tf.get_variable(
'global_step', [],
initializer=tf.constant_initializer(0), trainable=False)
# Calculate the learning rate schedule.
num_batches_per_epoch = (cifar10.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN /
FLAGS.batch_size)
decay_steps = int(num_batches_per_epoch * cifar10.NUM_EPOCHS_PER_DECAY)
# Decay the learning rate exponentially based on the number of steps.
lr = tf.train.exponential_decay(cifar10.INITIAL_LEARNING_RATE,
global_step,
decay_steps,
cifar10.LEARNING_RATE_DECAY_FACTOR,
staircase=True)
# Create an optimizer that performs gradient descent.
opt = tf.train.GradientDescentOptimizer(lr)
# Get images and labels for CIFAR-10.
images, labels = cifar10.distorted_inputs()
batch_queue = tf.contrib.slim.prefetch_queue.prefetch_queue(
[images, labels], capacity=2 * FLAGS.num_gpus)
# Calculate the gradients for each model tower.
tower_grads = []
with tf.variable_scope(tf.get_variable_scope()):
for i in xrange(FLAGS.num_gpus):
with tf.device('/gpu:%d' % i):
with tf.name_scope('%s_%d' % (cifar10.TOWER_NAME, i)) as scope:
# Dequeues one batch for the GPU
image_batch, label_batch = batch_queue.dequeue()
# Calculate the loss for one tower of the CIFAR model. This function
# constructs the entire CIFAR model but shares the variables across
# all towers.
loss = tower_loss(scope, image_batch, label_batch)
# Reuse variables for the next tower.
tf.get_variable_scope().reuse_variables()
# Retain the summaries from the final tower.
summaries = tf.get_collection(tf.GraphKeys.SUMMARIES, scope)
# Calculate the gradients for the batch of data on this CIFAR tower.
grads = opt.compute_gradients(loss)
# Keep track of the gradients across all towers.
tower_grads.append(grads)
# We must calculate the mean of each gradient. Note that this is the
# synchronization point across all towers.
grads = average_gradients(tower_grads)
# Add a summary to track the learning rate.
summaries.append(tf.summary.scalar('learning_rate', lr))
# Add histograms for gradients.
for grad, var in grads:
if grad is not None:
summaries.append(tf.summary.histogram(var.op.name + '/gradients', grad))
# Apply the gradients to adjust the shared variables.
apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)
# Add histograms for trainable variables.
for var in tf.trainable_variables():
summaries.append(tf.summary.histogram(var.op.name, var))
# Track the moving averages of all trainable variables.
variable_averages = tf.train.ExponentialMovingAverage(
cifar10.MOVING_AVERAGE_DECAY, global_step)
variables_averages_op = variable_averages.apply(tf.trainable_variables())
# Group all updates to into a single train op.
train_op = tf.group(apply_gradient_op, variables_averages_op)
# Create a saver.
saver = tf.train.Saver(tf.global_variables())
# Build the summary operation from the last tower summaries.
summary_op = tf.summary.merge(summaries)
# Build an initialization operation to run below.
init = tf.global_variables_initializer()
# Start running operations on the Graph. allow_soft_placement must be set to
# True to build towers on GPU, as some of the ops do not have GPU
# implementations.
sess = tf.Session(config=tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=FLAGS.log_device_placement))
sess.run(init)
# Start the queue runners.
tf.train.start_queue_runners(sess=sess)
summary_writer = tf.summary.FileWriter(FLAGS.train_dir, sess.graph)
for step in xrange(FLAGS.max_steps):
start_time = time.time()
_, loss_value = sess.run([train_op, loss])
duration = time.time() - start_time
assert not np.isnan(loss_value), 'Model diverged with loss = NaN'
if step % 10 == 0:
num_examples_per_step = FLAGS.batch_size * FLAGS.num_gpus
examples_per_sec = num_examples_per_step / duration
sec_per_batch = duration / FLAGS.num_gpus
format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f '
'sec/batch)')
print (format_str % (datetime.now(), step, loss_value,
examples_per_sec, sec_per_batch))
if step % 100 == 0:
summary_str = sess.run(summary_op)
summary_writer.add_summary(summary_str, step)
# Save the model checkpoint periodically.
if step % 1000 == 0 or (step + 1) == FLAGS.max_steps:
checkpoint_path = os.path.join(FLAGS.train_dir, 'model.ckpt')
saver.save(sess, checkpoint_path, global_step=step)
我们先只看从 Calculate the gradients for each model tower.部分开始的代码,先观察数据如何分配到实际设备上的,其中语句tf.get_variable_scope().reuse_variables()
注释说是为下一个tower重用变量做准备,然而我注释掉这行语句并没有观察到VariableScope object在各个tower上不一致的现象(暂标记未理解,哪位大神知道的话可以告诉我啊,点击右上方About查看交流方式联系我奥,嘻嘻)。
grads=tf.train.GradientDescentOptimizer(lr).compute_gradients(loss)
,这个,好像做了什么不言而喻啊。but,too young too naive,我们重点关注 compute_gradients
的返回值,
A list of (gradient, variable) pairs. Variable is always present, but gradient can be None.
看见没,看见没,它返回来元组对 (gradient, variable)
构成的 list !也许在看到这之前,你还对前一个定义的函数 def average_gradients(tower_grads)
中tower_grads的元素构成感到狐疑,现在想必是豁然开朗了。
src_4 - main() and run():
def main(argv=None): # pylint: disable=unused-argument
cifar10.maybe_download_and_extract()
if tf.gfile.Exists(FLAGS.train_dir):
tf.gfile.DeleteRecursively(FLAGS.train_dir)
tf.gfile.MakeDirs(FLAGS.train_dir)
train()
if __name__ == '__main__':
FLAGS = parser.parse_args()
tf.app.run()
此处参详上一篇对cifar10_train.py的介绍