Sunday, 5 September 2021

How can I implement this model?

Problem statement

I have 3 classes (A, B, and C).

I have 6 features:

train_x = [[ 6.442  6.338  7.027  8.789 10.009 12.566]
           [ 6.338  7.027  5.338 10.009  8.122 11.217]
           [ 7.027  5.338  5.335  8.122  5.537  6.408]
           [ 5.338  5.335  5.659  5.537  5.241  7.043]]

These features represent a 5-character string pattern comprising of 3-classes(e.g. AABBC, etc.).

Let, a 5-character string pattern is one-hot encoded as follows:

train_z = [[0. 0. 1. 0. 0. 1. 0. 0. 1. 0. 0. 1. 1. 0. 0.]    
           [0. 0. 1. 0. 0. 1. 0. 0. 1. 1. 0. 0. 1. 0. 0.]
           [0. 0. 1. 0. 0. 1. 1. 0. 0. 1. 0. 0. 1. 0. 0.]    
           [0. 0. 1. 1. 0. 0. 1. 0. 0. 1. 0. 0. 0. 0. 1.]]

My implementation

I have implemented the above problem using a sequential model as follows:

import os

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

import sys
import time
import random
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
import numpy as np

# <editor-fold desc="handle GPU">
# resolve GPU related issues.
try:
    physical_devices = tf.config.list_physical_devices("GPU")
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except Exception as e:
    print("GPU not found!")
# END of try
# </editor-fold>

# Directories and files
CLASS_INDEX = 4
FEATURE_START_INDEX = 6
OUTPUT_PATH = r"./"
INPUT_PATH = r"./"
INPUT_DATA_FILE = "dist-5.dat"
TRAINING_PROGRESS_FILE = "training.txt"
MODEL_FILE = "model.h5"

# classification size
CLASSES_COUNT = 3
FEATURES_COUNT = 6
OUTPUTS_COUNT = 15

# Network parameters.
LAYER_1_NEURON_COUNT = 128
LAYER_2_NEURON_COUNT = 128

# Training parameters.
LEARNING_RATE = 0.01
EPOCHS = 1000  # 500
BATCH_SIZE = 10
NO_OF_INPUT_LINES = 10000
VALIDATION_PART = 0.5
MODEL_SAVE_FREQUENCY = 10

# <editor-fold desc="encoding()">
# <editor-fold desc="def encode(letter)">
def encode(letter: str):
    if letter == 'A':
        return [1.0, 0.0, 0.0]
    elif letter == 'B':
        return [0.0, 1.0, 0.0]
    elif letter == 'C':
        return [0.0, 0.0, 1.0]
# </editor-fold>

# <editor-fold desc="encode_string()">
def encode_string_1(pattern_str: str):
    # Iterate over the string
    one_hot_binary_str = []
    for ch in pattern_str:
        one_hot_binary_str = one_hot_binary_str + encode(ch)
    # END of for loop
    return one_hot_binary_str
# END of function

def encode_string_2(pattern_str: str):
    # Iterate over the string
    one_hot_binary_str = []
    for ch in pattern_str:
        temp_encoded_vect = [encode(ch)]
        one_hot_binary_str = one_hot_binary_str + temp_encoded_vect
    # END of for loop
    return one_hot_binary_str
# END of function
# </editor-fold>

# <editor-fold desc="def load_data()">
def load_data_k(fname: str, class_index: int, feature_start_index: int, **selection):
    i = 0
    file = open(fname)
    if "top_n_lines" in selection:
        lines = [next(file) for _ in range(int(selection["top_n_lines"]))]
    elif "random_n_lines" in selection:
        tmp_lines = file.readlines()
        lines = random.sample(tmp_lines, int(selection["random_n_lines"]))
    else:
        lines = file.readlines()

    data_x, data_y, data_z = [], [], []
    for l in lines:
        row = l.strip().split()  # return a list of words from the line.
        x = [float(ix) for ix in row[feature_start_index:]]  # convert 3rd to 20th word into a vector of float numbers.
        y = encode(row[class_index])  # convert the 3rd word into binary.
        z = encode_string_1(row[class_index+1])
        data_x.append(x)  # append the vector into 'data_x'
        data_y.append(y)  # append the vector into 'data_y'
        data_z.append(z)  # append the vector into 'data_z'
    # END for l in lines

    num_rows = len(data_x)
    given_fraction = selection.get("validation_part", 1.0)
    if given_fraction > 0.9999:
        valid_x, valid_y, valid_z = data_x, data_y, data_z
    else:
        n = int(num_rows * given_fraction)
        valid_x, valid_y, valid_z = data_x[n:], data_y[n:], data_z[n:]
        data_x, data_y, data_z = data_x[:n], data_y[:n], data_z[:n]
    # END of if-else block

    tx = tf.convert_to_tensor(data_x, np.float32)
    ty = tf.convert_to_tensor(data_y, np.float32)
    tz = tf.convert_to_tensor(data_z, np.float32)
    vx = tf.convert_to_tensor(valid_x, np.float32)
    vy = tf.convert_to_tensor(valid_y, np.float32)
    vz = tf.convert_to_tensor(valid_z, np.float32)

    return tx, ty, tz, vx, vy, vz
# END of the function
# </editor-fold>
# </editor-fold>

# <editor-fold desc="def create_model()">
def create_model(n_hidden_1, n_hidden_2, num_outputs, num_features):
    # a simple sequential model
    model = tf.keras.Sequential()
    model.add(tf.keras.Input(shape=(num_features,)))
    model.add(tf.keras.layers.Dense(n_hidden_1, activation="relu"))
    model.add(tf.keras.layers.Dense(n_hidden_2, activation="relu"))
    model.add(tf.keras.layers.Dense(num_outputs))
    return model
# </editor-fold>

# custom loss to take into the dependency between the 3 bits
def loss(y_true, y_pred):
    l1 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, :3], y_pred[:, :3])
    l2 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 3:6], y_pred[:, 3:6])
    l3 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 6:9], y_pred[:, 6:9])
    l4 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 9:12], y_pred[:, 9:12])
    l5 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 12:], y_pred[:, 12:])
    return l1 + l2 + l3 + l4 + l5


if __name__ == "__main__":
    len_int = len(sys.argv)
    arg_str = None

    if len_int > 1:
        arg_str = sys.argv[1]
    else:
        arg_str = os.path.join(INPUT_PATH, INPUT_DATA_FILE)
    # END of if len_int > 1:

    # load training data from the disk
    train_x, train_y, train_z, validate_x,validate_y, validate_z = load_data_k(
        os.path.join(INPUT_PATH, INPUT_DATA_FILE),
        class_index=CLASS_INDEX,
        feature_start_index=FEATURE_START_INDEX,
        top_n_lines=NO_OF_INPUT_LINES,
        validation_part=VALIDATION_PART
    )

    #print(train_y)
    print("z = " + str(train_z))

    # create Stochastic Gradient Descent optimizer for the NN model
    opt_function = keras.optimizers.Adam(
        learning_rate=LEARNING_RATE
    )
    # create a sequential NN model
    model = create_model(
        LAYER_1_NEURON_COUNT,
        LAYER_2_NEURON_COUNT,
        OUTPUTS_COUNT,
        FEATURES_COUNT
    )
    #
    model.compile(optimizer=opt_function, loss=loss, metrics=['accuracy'])
    model.fit(train_x, train_z, epochs=EPOCHS,batch_size=BATCH_SIZE)

The problem

The problem with this source code is, the model is not converging i.e The accuracy is not increasing with increasing epochs.

The question

How can I implement this model?



from How can I implement this model?

No comments:

Post a Comment