Monday 14 August 2023

Optimizing tensorflow tf.data.Dataset when creating it from remote source via from_generator()

Using tensorflow 2.9, python 3.7.

My goal is to feed tf.data.dataset into tensorflow model while reducing memory consumption.

In S3 there are parquet files that I want to use and create tf.data.Dataset for training deep learning model on keras. First I create a generator by iterating through parquet files:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()

def gen(parquet_paths, batch_size, x_features, label):
    curr_row = 0
    for pq_path in parquet_paths:
        parquet_file = pq.ParquetDataset(pq_path, filesystem=fs)
        df = parquet_file.read().to_pandas()
        df = df[x_features+[label]].astype('float32')
        len_df = len(df)

        print(len_df)

        while curr_row < len_df:
            batch_df = df.iloc[curr_row:curr_row+batch_size, :]
            X = batch_df[x_features].values
            y = batch_df[label].values
            curr_row += batch_size
            yield (X, y)

Then the generator is wrapped via tf.data.Dataset.from_generator

def wrapper(parquet_paths, batch_size, x_features, label):
    return tf.data.Dataset.from_generator(lambda: gen(parquet_paths, batch_size, x_features, label), (float32, float32))


train_ds = wrapper(train_pq_path, 512, x_features, label)
valid_ds = wrapper(val_pq_path, 99999, x_features, label)

model.fit(train_ds, validation_data=valid_ds, epochs=10)

I want to optimize and add shuffle functionality in my data pipeline. What are some improvements I could do from here? On

Try # 1:

  • downloading parquet file to local machine then iterating through it and yielding improves speed however I'm also constrained by amount of disk storage therefore it won't be scalable method.

Try # 2:

  • From tensorflow documentation it says using .from_generator() to create tf.data.dataset has limited portability and scalability since it must run in the same python process that created the generator and is subject to python GIL. I've tried to get rid of it but with memory constraints I cannot find a way to create tf.data.dataset other than from a generator.

Try # 3: Leveraging tf.data.Dataset.interleave() to parallelize data extraction as I'm assuming network I/O is also one of the bottleneck. Here is the changed code:

def read_pq_file(parquet_path):
    pq_file = pd.ParquetDataset(parquet_path)
    df pq_file.read().to_pandas()
    X = df[x_features].astype('float32').values
    y = df[label].astype('float32').values
    yield (X, y)

def read_and_preprocess(parquet_path):
    dataset = tf.data.Dataset.from_generator(
        lambda: read_pq_file(parquet_path), (tf.float32, tf.float32))

# train_pq_path = [.... list of parquet file paths ..]
train_dataset_paths = tf.data.Dataset.from_tensor_slices(train_pq_path)
train_dataset = train_dataset_paths.interleave(read_and_preprocess, num_parallel_calls=2, cycle_length=2, deterministic=False).batch(batch_size)

model.fit(train_dataset)

However this outputs following error: InvalidArgumentError: TypeError: not a path-like object



from Optimizing tensorflow tf.data.Dataset when creating it from remote source via from_generator()

No comments:

Post a Comment