I am defining a custom Op in Tensorflow. The single thread version is working great, but I want to use multi-threads using work_sharder.h it first can find only one worker and then segfaults.
I am defining a shard function on the indexes of a flattened array:
#include <stdio.h>
#include <cfloat>
#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
#include "tensorflow/core/framework/op.h"
#include "tensorflow/core/framework/op_kernel.h"
#include "tensorflow/core/framework/tensor_shape.h"
#include "./work_sharder.h"
using namespace tensorflow;
typedef Eigen::ThreadPoolDevice CPUDevice;
REGISTER_OP("Minimal")
.Input("input: float")
.Output("shared_arr: float")
;
class MinimalOp : public OpKernel {
public:
explicit MinimalOp(OpKernelConstruction* context) : OpKernel(context) {}
void Compute(OpKernelContext* context) override {
const Tensor& input= context->input(0);
auto input_flat = input.flat<float>();
const int N = input_flat.size();
// Create an output tensor of the right shape
Tensor* shared_arr = NULL;
OP_REQUIRES_OK(context, context->allocate_output(0, input.shape(),
&shared_arr));
// This tensor is going to be shared among threads
auto shared_arr_flat = shared_arr->flat<float>();
// Shard function on ranges
auto shard = [&input_flat, &shared_arr_flat]
(int64 start, int64 limit) {
for (int i = 0; start < limit; i++) {
if ((input_flat(i))<0.){
shared_arr_flat(i) = 0.;
}}};
std::cout<<"Shard definition was okay\n";
const DeviceBase::CpuWorkerThreads& worker_threads = *(context->device()->tensorflow_cpu_worker_threads());
std::cout<<"Number of workers = "<<worker_threads.num_threads<<"\n";
const int64 shard_cost = N;
Shard(worker_threads.num_threads, worker_threads.workers,
N, shard_cost, shard);
}};
REGISTER_KERNEL_BUILDER(Name("Minimal").Device(DEVICE_CPU), MinimalOp);
It compiles perfectly. When running this multi-threaded code in python:
import tensorflow as tf
import numpy as np
minimal_module = tf.load_op_library("./minimal.so")
tf_minimal = minimal_module.minimal
input_tensor = tf.constant(np.random.normal(size=(100, 100)).astype("float32"))
returned_tensor = tf_minimal(input_tensor)
sess = tf.Session()
sess.run(returned_tensor)
it prints: Number of worker = 1 and segmentation fault. The output of g++ --version is:
Apple LLVM version 10.0.1 (clang-1001.0.46.3)
Target: x86_64-apple-darwin18.2.0
Thread model: posix
When using multiprocessing library in python it finds 12 workers.
from Tensorflow CustomOp: multiprocessing not working for CPU
No comments:
Post a Comment