Tuesday, 1 June 2021

Tensorflow-1 distributed with low-level code and Ray

I'm trying to distribute the training of a Deep Reinforcement Learning system that I have built using Ray and Tensorflow 1. Meanwhile, I am using ray because I have a lot of code that parallelizes logic not directly related to the training, I would like to parallelize the training (namely the gradient reduction over different workers on different GPUs) using tf. distribute utilities, mainly, because It can use the NCCL communication library, which I supposed will boost the training speed compared with other approaches.

The problem is that I don't want to refactor my tensorflow code (written in old Tensorflow 1 at a low level, with custom training loops, I am not using any API like Keras), but I can't figure out how to use the tf.distribute, namely the MirrorStrategy, to distribute the training using Tensorflow 1 code.

I have found this guide about tf.distribute in Tensorflow 1, but even in the custom loop they are using the Keras API for the model and the optimizer building. I am trying to follow this guide as far as possible in order to build a simple example that uses the libraries/API that I am using in my main project, but I cannot make it works.

The example code is this:

import numpy as np
import tensorflow.compat.v1 as tf
import ray
tf.disable_v2_behavior()

@ray.remote(num_cpus=1, num_gpus=0)
class Trainer:
    def __init__(self, local_data):
        tf.disable_v2_behavior()
        self.current_w = 1.0
        self.local_data = local_data
        self.strategy = tf.distribute.MirroredStrategy()

        with self.strategy.scope():
            self.w = tf.Variable(((1.0)), dtype=tf.float32)
            self.x = tf.placeholder(shape=(None, 1), dtype=tf.float32)
            self.y = self.w * self.x
            self.grad = tf.gradients(self.y, [self.w])

            def train_step_opt():
                def grad_fn():
                    grad = tf.gradients(self.y, [self.w])
                    return grad
                per_replica_grad = self.strategy.experimental_run_v2(grad_fn)
                red_grad = self.strategy.reduce(
                            tf.distribute.ReduceOp.SUM, per_replica_grad, axis=None)
                minimize = self.w.assign_sub(red_grad[0])
                return minimize

            self.minimize = self.strategy.experimental_run_v2(train_step_opt)

    def train_step(self):
        with self.strategy.scope():
            with tf.Session() as sess:
                sess.run(self.minimize, feed_dict={self.x: self.local_data})
                self.current_w = sess.run(self.w)
        return self.current_w


ray.init()

data = np.arange(4) + 1
data = data.reshape((-1, 1))
data_w = [data[None, i] for i in range(4)]

trainers = [Trainer.remote(d) for d in data_w]

W = ray.get([t.train_step.remote() for t in trainers])[0]

print(W)

It is supposed to simply computes the derivative of a linear function in different processes, reduce all the derivatives in a single value and apply it to the unique parameter "w".

When I run it I get the following error:

Traceback (most recent call last):
  File "dtfray.py", line 49, in <module>
    r = ray.get([t.train_step.remote() for t in trainers])
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/ray/worker.py", line 1456, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TypeError): ray::Trainer.train_step() (pid=25401, ip=10.128.0.46)
  File "python/ray/_raylet.pyx", line 439, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 473, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 476, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 480, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 432, in ray._raylet.execute_task.function_executor
  File "dtfray.py", line 32, in __init__
    self.minimize = self.strategy.experimental_run_v2(train_step_opt)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/util/deprecation.py", line 324, in new_func
    return func(*args, **kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 957, in experimental_run_v2
    return self.run(fn, args=args, kwargs=kwargs, options=options)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 951, in run
    return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 2290, in call_for_each_replica
    return self._call_for_each_replica(fn, args, kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py", line 770, in _call_for_each_replica
    fn, args, kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py", line 201, in _call_for_each_replica
    coord.join(threads)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py", line 389, in join
    six.reraise(*self._exc_info_to_raise)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/six.py", line 703, in reraise
    raise value
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py", line 297, in stop_on_exception
    yield
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py", line 998, in run
    self.main_result = self.main_fn(*self.main_args, **self.main_kwargs)
  File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 282, in wrapper
    return func(*args, **kwargs)
  File "dtfray.py", line 22, in train_step_opt
    tf.distribute.get_replica_context().merge_call()
TypeError: merge_call() missing 1 required positional argument: 'merge_fn'


from Tensorflow-1 distributed with low-level code and Ray

No comments:

Post a Comment