为武汉祈祷。
问题一ps server 不会主动停止,无论在什么情况下。这个问题从2016年提出,到现在,也没有一个简洁干净的解决方式,而这个问题会很严重,如果你使用的是租用资源,会白白花费很多钱钱。
我注意到,ps server不论是使用gpu还是cpu资源都不会主动停止,即使worker已经训练完停止了,甚至是遇到错误,ps server仍旧会运行。 这就会导致这个进程对节点资源的持续占有,即使没有使用GPU资源。 这种情况是按照全部使用计费的!!!我的客服工程师在初期错误程序出现这一情况后,没有告诉我ps不停止,并且他是知道会计费的,导致我的第一个成功的分布式程序空跑数小时,心疼我们租用的核时。但是所有的教程都没有警告过,所以我特别发了这篇博客。
根本原因是
if FLAGS.job_name == "ps":
server.join()
cluster=cluster)):
这回导致ps一直等待worker,一直等...
解决方法,参考:
https://stackoverflow.com/questions/39810356/shut-down-server-in-tensorflow
其实作者已经写的很详细了,我是参考I'll eat my hat这个作者的思路,下面贴上我完整的代码,作为一个应用实例,供参考:
def main(unused_argv):
tf.logging.set_verbosity(tf.logging.INFO)
tf.gfile.MakeDirs(FLAGS.train_logdir)
tf.logging.info('Training on %s set', FLAGS.train_split)
#distribute the training
ps_hosts=FLAGS.ps_hosts.split(",")
worker_hosts=FLAGS.worker_hosts.split(",")
cluster=tf.train.ClusterSpec({"ps":ps_hosts,"worker":worker_hosts})
server=tf.train.Server(cluster,job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
with tf.device('/job:ps/task:%d' % FLAGS.task_index):
queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % FLAGS.task_index)
# wait for the queue to be filled
with tf.Session(server.target) as sess:
for i in range(cluster.num_tasks('worker')):
sess.run(queue.dequeue())
print('ps:%d received "done" from worker:%d' % (FLAGS.task_index, i))
print('ps:%d quitting' % FLAGS.task_index)
elif FLAGS.job_name =="worker":
graph = tf.Graph()
with graph.as_default():
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % (FLAGS.task_index),
cluster=cluster)):#, ps_tasks=FLAGS.num_ps_tasks
done_ops = []
# create a shared queue on the worker which is visible on /job:ps/task:%d
for i in range(cluster.num_tasks('ps')):
with tf.device('/job:ps/task:%d' % i):
done_queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue' + str(i))
done_ops.append(done_queue.enqueue(FLAGS.task_index))
assert FLAGS.train_batch_size % FLAGS.num_clones == 0, (
'Training batch size not divisble by number of clones (GPUs).')
clone_batch_size = FLAGS.train_batch_size // FLAGS.num_clones
dataset = data_generator.Dataset(
dataset_name=FLAGS.dataset,
split_name=FLAGS.train_split,
dataset_dir=FLAGS.dataset_dir,
batch_size=clone_batch_size,
crop_size=[int(sz) for sz in FLAGS.train_crop_size],
min_resize_value=FLAGS.min_resize_value,
max_resize_value=FLAGS.max_resize_value,
resize_factor=FLAGS.resize_factor,
min_scale_factor=FLAGS.min_scale_factor,
max_scale_factor=FLAGS.max_scale_factor,
scale_factor_step_size=FLAGS.scale_factor_step_size,
model_variant=FLAGS.model_variant,
num_readers=2,
is_training=True,
should_shuffle=True,
should_repeat=True)
train_tensor, summary_op = _train_deeplab_model(
dataset.get_one_shot_iterator(), dataset.num_of_classes,
dataset.ignore_label)
# Soft placement allows placing on CPU ops without GPU implementation.
session_config = tf.ConfigProto(
allow_soft_placement=True, log_device_placement=False)
#liutian add on cloud
session_config.gpu_options.allow_growth = True
last_layers = model.get_extra_layer_scopes(
FLAGS.last_layers_contain_logits_only)
init_fn = None
#FLAGS.tf_initial_checkpoint = '/home/DATA/liutian/tmp/tfdeeplab/deeplab/datasets/pascal_voc_seg/init_models/deeplabv3_pascal_train_aug/model.ckpt'
if FLAGS.tf_initial_checkpoint:
init_fn = train_utils.get_model_init_fn(
FLAGS.train_logdir,
FLAGS.tf_initial_checkpoint,
FLAGS.initialize_last_layer,
last_layers,
ignore_missing_vars=True)
scaffold = tf.train.Scaffold(
init_fn=init_fn,
summary_op=summary_op,
)
stop_hook = tf.train.StopAtStepHook(
last_step=FLAGS.training_number_of_steps
)
hooks = [stop_hook,tf.train.FinalOpsHook([done_ops])]
profile_dir = FLAGS.profile_logdir
if profile_dir is not None:
tf.gfile.MakeDirs(profile_dir)
with tf.contrib.tfprof.ProfileContext(
enabled=profile_dir is not None, profile_dir=profile_dir):
with tf.train.MonitoredTrainingSession(
master=server.target,
is_chief=(FLAGS.task_index == 0),
config=session_config,
scaffold=scaffold,
checkpoint_dir=FLAGS.train_logdir,
summary_dir=FLAGS.train_logdir,
log_step_count_steps=FLAGS.log_steps,
save_summaries_steps=FLAGS.save_summaries_secs,
save_checkpoint_secs=FLAGS.save_interval_secs,
hooks=hooks) as sess:
while not sess.should_stop():
sess.run([train_tensor])
这样的话还有一个问题就是,如果代码有一定问题,那么不会主动退出。这个只能再想想办法了。
同样的问题在知乎大家也可以试试,但我没有采用。
https://www.zhihu.com/question/51181456?from=profile_question_card
问题二
这里要说一个比较偶然的错误,会导致worker都不停止。ps会输出unknownError:Could not start gRPC server.
这是由于端口被占用,也就是类似于:
节点名:2223 (比如192.18.49.1:2223,或者1:2223)
其中2223就是端口。如果2223被什么占用了,那么worker跑完就不会停止。
节点不释放,就会空耗资源,就会费钱。
解决方法是开始跑程序就要注意ps的输出,如果提示了unknownError:Could not start gRPC server.就要换个节点,比如
节点名:2333333
作者:TinaO-O