Thursday, 22 August 2019

Tensorflow CustomOp: multiprocessing not working for CPU

I am defining a custom Op in Tensorflow. The single thread version is working great, but I want to use multi-threads using work_sharder.h it first can find only one worker and then segfaults.

I am defining a shard function on the indexes of a flattened array:

 #include <stdio.h>
#include <cfloat>

#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/framework/tensor_shape.h"

#include "./work_sharder.h"

using namespace tensorflow;
typedef Eigen::ThreadPoolDevice CPUDevice;

REGISTER_OP("Minimal")
    .Input("input: float")
    .Output("shared_arr: float")
;

class MinimalOp : public OpKernel {
 public:
  explicit MinimalOp(OpKernelConstruction* context) : OpKernel(context) {}

  void Compute(OpKernelContext* context) override {

    const Tensor& input= context->input(0);
    auto input_flat = input.flat<float>();
    const int N = input_flat.size();

    // Create an output tensor of the right shape
    Tensor* shared_arr = NULL;
    OP_REQUIRES_OK(context, context->allocate_output(0, input.shape(),
                                                     &shared_arr));
    // This tensor is going to be shared among threads
    auto shared_arr_flat = shared_arr->flat<float>();

    // Shard function on ranges
    auto shard = [&input_flat, &shared_arr_flat]
                  (int64 start, int64 limit) {
        for (int i = 0; start < limit; i++) {
            if ((input_flat(i))<0.){
                shared_arr_flat(i) = 0.;
            }}};

    std::cout<<"Shard definition was okay\n";
    const DeviceBase::CpuWorkerThreads& worker_threads = *(context->device()->tensorflow_cpu_worker_threads());
    std::cout<<"Number of workers = "<<worker_threads.num_threads<<"\n";
    const int64 shard_cost = N;
    Shard(worker_threads.num_threads, worker_threads.workers,
            N, shard_cost, shard);

  }};

REGISTER_KERNEL_BUILDER(Name("Minimal").Device(DEVICE_CPU), MinimalOp);

It compiles perfectly. When running this multi-threaded code in python:

import tensorflow as tf
import numpy as np


minimal_module = tf.load_op_library("./minimal.so")
tf_minimal = minimal_module.minimal

input_tensor = tf.constant(np.random.normal(size=(100, 100)).astype("float32"))
returned_tensor = tf_minimal(input_tensor)
sess = tf.Session()
sess.run(returned_tensor)

it prints: Number of worker = 1 and segmentation fault. The output of g++ --version is:

Apple LLVM version 10.0.1 (clang-1001.0.46.3)
Target: x86_64-apple-darwin18.2.0
Thread model: posix

When using multiprocessing library in python it finds 12 workers.



from Tensorflow CustomOp: multiprocessing not working for CPU

No comments:

Post a Comment