I'm trying to distribute the training of a Deep Reinforcement Learning system that I have built using Ray and Tensorflow 1. Meanwhile, I am using ray because I have a lot of code that parallelizes logic not directly related to the training, I would like to parallelize the training (namely the gradient reduction over different workers on different GPUs) using tf. distribute utilities, mainly, because It can use the NCCL communication library, which I supposed will boost the training speed compared with other approaches.
The problem is that I don't want to refactor my tensorflow code (written in old Tensorflow 1 at a low level, with custom training loops, I am not using any API like Keras), but I can't figure out how to use the tf.distribute, namely the MirrorStrategy, to distribute the training using Tensorflow 1 code.
I have found this guide about tf.distribute in Tensorflow 1, but even in the custom loop they are using the Keras API for the model and the optimizer building. I am trying to follow this guide as far as possible in order to build a simple example that uses the libraries/API that I am using in my main project, but I cannot make it works.
The example code is this:
import numpy as np
import tensorflow.compat.v1 as tf
import ray
tf.disable_v2_behavior()
@ray.remote(num_cpus=1, num_gpus=0)
class Trainer:
def __init__(self, local_data):
tf.disable_v2_behavior()
self.current_w = 1.0
self.local_data = local_data
self.strategy = tf.distribute.MirroredStrategy()
with self.strategy.scope():
self.w = tf.Variable(((1.0)), dtype=tf.float32)
self.x = tf.placeholder(shape=(None, 1), dtype=tf.float32)
self.y = self.w * self.x
self.grad = tf.gradients(self.y, [self.w])
def train_step_opt():
def grad_fn():
grad = tf.gradients(self.y, [self.w])
return grad
per_replica_grad = self.strategy.experimental_run_v2(grad_fn)
red_grad = self.strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_grad, axis=None)
minimize = self.w.assign_sub(red_grad[0])
return minimize
self.minimize = self.strategy.experimental_run_v2(train_step_opt)
def train_step(self):
with self.strategy.scope():
with tf.Session() as sess:
sess.run(self.minimize, feed_dict={self.x: self.local_data})
self.current_w = sess.run(self.w)
return self.current_w
ray.init()
data = np.arange(4) + 1
data = data.reshape((-1, 1))
data_w = [data[None, i] for i in range(4)]
trainers = [Trainer.remote(d) for d in data_w]
W = ray.get([t.train_step.remote() for t in trainers])[0]
print(W)
It is supposed to simply computes the derivative of a linear function in different processes, reduce all the derivatives in a single value and apply it to the unique parameter "w".
When I run it I get the following error:
Traceback (most recent call last):
File "dtfray.py", line 49, in <module>
r = ray.get([t.train_step.remote() for t in trainers])
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
return func(*args, **kwargs)
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/ray/worker.py", line 1456, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TypeError): ray::Trainer.train_step() (pid=25401, ip=10.128.0.46)
File "python/ray/_raylet.pyx", line 439, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 473, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 476, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 480, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 432, in ray._raylet.execute_task.function_executor
File "dtfray.py", line 32, in __init__
self.minimize = self.strategy.experimental_run_v2(train_step_opt)
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/util/deprecation.py", line 324, in new_func
return func(*args, **kwargs)
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 957, in experimental_run_v2
return self.run(fn, args=args, kwargs=kwargs, options=options)
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 951, in run
return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py", line 2290, in call_for_each_replica
return self._call_for_each_replica(fn, args, kwargs)
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py", line 770, in _call_for_each_replica
fn, args, kwargs)
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py", line 201, in _call_for_each_replica
coord.join(threads)
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py", line 389, in join
six.reraise(*self._exc_info_to_raise)
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/six.py", line 703, in reraise
raise value
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/training/coordinator.py", line 297, in stop_on_exception
yield
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/distribute/mirrored_strategy.py", line 998, in run
self.main_result = self.main_fn(*self.main_args, **self.main_kwargs)
File "/home/Adrian/miniconda3/envs/sukan_env/lib/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 282, in wrapper
return func(*args, **kwargs)
File "dtfray.py", line 22, in train_step_opt
tf.distribute.get_replica_context().merge_call()
TypeError: merge_call() missing 1 required positional argument: 'merge_fn'
from Tensorflow-1 distributed with low-level code and Ray
No comments:
Post a Comment