分布式运算白花钱警告:使用tensorflow分布式必须注意ps server空耗资源

Abina ·
更新时间:2024-09-20
· 601 次阅读

为武汉祈祷。

问题一

ps server 不会主动停止,无论在什么情况下。这个问题从2016年提出,到现在,也没有一个简洁干净的解决方式,而这个问题会很严重,如果你使用的是租用资源,会白白花费很多钱钱。

我注意到,ps server不论是使用gpu还是cpu资源都不会主动停止,即使worker已经训练完停止了,甚至是遇到错误,ps server仍旧会运行。 这就会导致这个进程对节点资源的持续占有,即使没有使用GPU资源。 这种情况是按照全部使用计费的!!!我的客服工程师在初期错误程序出现这一情况后,没有告诉我ps不停止,并且他是知道会计费的,导致我的第一个成功的分布式程序空跑数小时,心疼我们租用的核时。但是所有的教程都没有警告过,所以我特别发了这篇博客。

根本原因是

if FLAGS.job_name == "ps": server.join() cluster=cluster)):

这回导致ps一直等待worker,一直等...

解决方法,参考:

https://stackoverflow.com/questions/39810356/shut-down-server-in-tensorflow

其实作者已经写的很详细了,我是参考I'll eat my hat这个作者的思路,下面贴上我完整的代码,作为一个应用实例,供参考:

def main(unused_argv): tf.logging.set_verbosity(tf.logging.INFO) tf.gfile.MakeDirs(FLAGS.train_logdir) tf.logging.info('Training on %s set', FLAGS.train_split) #distribute the training ps_hosts=FLAGS.ps_hosts.split(",") worker_hosts=FLAGS.worker_hosts.split(",") cluster=tf.train.ClusterSpec({"ps":ps_hosts,"worker":worker_hosts}) server=tf.train.Server(cluster,job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == "ps": with tf.device('/job:ps/task:%d' % FLAGS.task_index): queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % FLAGS.task_index) # wait for the queue to be filled with tf.Session(server.target) as sess: for i in range(cluster.num_tasks('worker')): sess.run(queue.dequeue()) print('ps:%d received "done" from worker:%d' % (FLAGS.task_index, i)) print('ps:%d quitting' % FLAGS.task_index) elif FLAGS.job_name =="worker": graph = tf.Graph() with graph.as_default(): with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % (FLAGS.task_index), cluster=cluster)):#, ps_tasks=FLAGS.num_ps_tasks done_ops = [] # create a shared queue on the worker which is visible on /job:ps/task:%d for i in range(cluster.num_tasks('ps')): with tf.device('/job:ps/task:%d' % i): done_queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue' + str(i)) done_ops.append(done_queue.enqueue(FLAGS.task_index)) assert FLAGS.train_batch_size % FLAGS.num_clones == 0, ( 'Training batch size not divisble by number of clones (GPUs).') clone_batch_size = FLAGS.train_batch_size // FLAGS.num_clones dataset = data_generator.Dataset( dataset_name=FLAGS.dataset, split_name=FLAGS.train_split, dataset_dir=FLAGS.dataset_dir, batch_size=clone_batch_size, crop_size=[int(sz) for sz in FLAGS.train_crop_size], min_resize_value=FLAGS.min_resize_value, max_resize_value=FLAGS.max_resize_value, resize_factor=FLAGS.resize_factor, min_scale_factor=FLAGS.min_scale_factor, max_scale_factor=FLAGS.max_scale_factor, scale_factor_step_size=FLAGS.scale_factor_step_size, model_variant=FLAGS.model_variant, num_readers=2, is_training=True, should_shuffle=True, should_repeat=True) train_tensor, summary_op = _train_deeplab_model( dataset.get_one_shot_iterator(), dataset.num_of_classes, dataset.ignore_label) # Soft placement allows placing on CPU ops without GPU implementation. session_config = tf.ConfigProto( allow_soft_placement=True, log_device_placement=False) #liutian add on cloud session_config.gpu_options.allow_growth = True last_layers = model.get_extra_layer_scopes( FLAGS.last_layers_contain_logits_only) init_fn = None #FLAGS.tf_initial_checkpoint = '/home/DATA/liutian/tmp/tfdeeplab/deeplab/datasets/pascal_voc_seg/init_models/deeplabv3_pascal_train_aug/model.ckpt' if FLAGS.tf_initial_checkpoint: init_fn = train_utils.get_model_init_fn( FLAGS.train_logdir, FLAGS.tf_initial_checkpoint, FLAGS.initialize_last_layer, last_layers, ignore_missing_vars=True) scaffold = tf.train.Scaffold( init_fn=init_fn, summary_op=summary_op, ) stop_hook = tf.train.StopAtStepHook( last_step=FLAGS.training_number_of_steps ) hooks = [stop_hook,tf.train.FinalOpsHook([done_ops])] profile_dir = FLAGS.profile_logdir if profile_dir is not None: tf.gfile.MakeDirs(profile_dir) with tf.contrib.tfprof.ProfileContext( enabled=profile_dir is not None, profile_dir=profile_dir): with tf.train.MonitoredTrainingSession( master=server.target, is_chief=(FLAGS.task_index == 0), config=session_config, scaffold=scaffold, checkpoint_dir=FLAGS.train_logdir, summary_dir=FLAGS.train_logdir, log_step_count_steps=FLAGS.log_steps, save_summaries_steps=FLAGS.save_summaries_secs, save_checkpoint_secs=FLAGS.save_interval_secs, hooks=hooks) as sess: while not sess.should_stop(): sess.run([train_tensor])

这样的话还有一个问题就是,如果代码有一定问题,那么不会主动退出。这个只能再想想办法了。

同样的问题在知乎大家也可以试试,但我没有采用。

https://www.zhihu.com/question/51181456?from=profile_question_card

问题二

这里要说一个比较偶然的错误,会导致worker都不停止。ps会输出unknownError:Could not start gRPC server.

这是由于端口被占用,也就是类似于:

节点名:2223 (比如192.18.49.1:2223,或者1:2223)

其中2223就是端口。如果2223被什么占用了,那么worker跑完就不会停止。

节点不释放,就会空耗资源,就会费钱。

解决方法是开始跑程序就要注意ps的输出,如果提示了unknownError:Could not start gRPC server.就要换个节点,比如

节点名:2333333


作者:TinaO-O



分布式 分布 tensorflow ps

需要 登录 后方可回复, 如果你还没有账号请 注册新账号