Shut down the server in TensorFlow

When we want to use distributed TensorFlow, we will create a parameter server using

tf.train.Server.join()

However, I cannot find a way to shut down the server, except to kill the processing. TensorFlow documentation for join () -

Blocks until the server has shut down.
This method currently blocks forever.

This bothers me quite a bit because I would like to create a lot of servers for computing and shut them down when it's all over.

Are there any possible solutions for this.

thank

+4
source share
3 answers

, session.run(dequeue_op) server.join(), , , .

, k k shared_name dequeue . , enqueue . session.run, Python , .

, : https://gist.github.com/yaroslavvb/82a5b5302449530ca5ff59df520c369e

( multi worker/multi shard, . https://gist.github.com/yaroslavvb/ea1b1bae0a75c4aae593df7eca72d9ca)

import subprocess
import tensorflow as tf
import time
import sys

flags = tf.flags
flags.DEFINE_string("port1", "12222", "port of worker1")
flags.DEFINE_string("port2", "12223", "port of worker2")
flags.DEFINE_string("task", "", "internal use")
FLAGS = flags.FLAGS

# setup local cluster from flags
host = "127.0.0.1:"
cluster = {"worker": [host+FLAGS.port1, host+FLAGS.port2]}
clusterspec = tf.train.ClusterSpec(cluster).as_cluster_def()

if __name__=='__main__':
  if not FLAGS.task:  # start servers and run client

      # launch distributed service
      def runcmd(cmd): subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT)
      runcmd("python %s --task=0"%(sys.argv[0]))
      runcmd("python %s --task=1"%(sys.argv[0]))
      time.sleep(1)

      # bring down distributed service
      sess = tf.Session("grpc://"+host+FLAGS.port1)
      queue0 = tf.FIFOQueue(1, tf.int32, shared_name="queue0")
      queue1 = tf.FIFOQueue(1, tf.int32, shared_name="queue1")
      with tf.device("/job:worker/task:0"):
          add_op0 = tf.add(tf.ones(()), tf.ones(()))
      with tf.device("/job:worker/task:1"):
          add_op1 = tf.add(tf.ones(()), tf.ones(()))

      print("Running computation on server 0")
      print(sess.run(add_op0))
      print("Running computation on server 1")
      print(sess.run(add_op1))

      print("Bringing down server 0")
      sess.run(queue0.enqueue(1))
      print("Bringing down server 1")
      sess.run(queue1.enqueue(1))

  else: # Launch TensorFlow server
    server = tf.train.Server(clusterspec, config=None,
                             job_name="worker",
                             task_index=int(FLAGS.task))
    print("Starting server "+FLAGS.task)
    sess = tf.Session(server.target)
    queue = tf.FIFOQueue(1, tf.int32, shared_name="queue"+FLAGS.task)
    sess.run(queue.dequeue())
    print("Terminating server"+FLAGS.task)
+9

gRPC TensorFlow. , gRPC, , ( : ...) , !

tf.train.Server . , , GitHub .

+3

Google, , , , , , .

import tensorflow as tf
import threading

def main(job_name, task):
    cluster = tf.train.ClusterSpec({
        'ps': ['localhost:22222', 'localhost:22223'],
        'worker': ['localhost: 22224','localhost: 22225','localhost: 22226']
    })

    server = tf.train.Server(cluster, job_name=job_name, task_index=task)

    if job_name == 'ps':
        # create a shared queue on the parameter server which is visible on /job:ps/task:%d
        with tf.device('/job:ps/task:%d' % task):
            queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % task)

        # wait for the queue to be filled
        with tf.Session(server.target) as sess:
            for i in range(cluster.num_tasks('worker')):
                sess.run(queue.dequeue())
                print('ps:%d received "done" from worker:%d' % (task, i))
            print('ps:%d quitting' % task)

    elif job_name == 'worker':
        queues = []
        # create a shared queue on the worker which is visible on /job:ps/task:%d
        for i in range(cluster.num_tasks('ps')):
            with tf.device('/job:ps/task:%d' % i):
                queues.append(tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue%d' % i))

        # fill the queue
        with tf.Session(server.target) as sess:
            for i in range(cluster.num_tasks('ps')):
                _, size = sess.run([queues[i].enqueue(task), queues[i].size()])
                print('Worker:%d sending "done" to ps:%d [elements=%d]' % (task, i, size))

if __name__ == '__main__':
    threads = [
        threading.Thread(target=main, args=('ps', 0)),
        threading.Thread(target=main, args=('ps', 1)),
        threading.Thread(target=main, args=('worker', 0)),
        threading.Thread(target=main, args=('worker', 1)),
        threading.Thread(target=main, args=('worker', 2))]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

"" " " , :

    # create a worker that does nothing
    elif job_name == 'worker':
        with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:%d' % task, cluster=cluster)):
            global_step = tf.train.get_or_create_global_step()
            no_op = tf.no_op()

        done_ops = []
        # create a shared queue on the worker which is visible on /job:ps/task:%d
        for i in range(cluster.num_tasks('ps')):
            with tf.device('/job:ps/task:%d' % i):
                done_queue = tf.FIFOQueue(cluster.num_tasks('worker'), tf.int32, shared_name='done_queue' + str(i))
                done_ops.append(done_queue.enqueue(task))

        hooks=[tf.train.StopAtStepHook(last_step=1),
               tf.train.FinalOpsHook([done_ops])]

        with tf.train.MonitoredTrainingSession(master=server.target,
                                               is_chief=(task == 0),
                                               hooks=hooks) as sess:
            sess.run([no_op])

Note that the version of MonitoredTrainingSession looks much slower when connecting all workers together.

0
source

Source: https://habr.com/ru/post/1656467/


All Articles