Source code for src.Evolutionary_Algorithm

import numpy as np

from src.Create_Model import create_model
from src.Evaluate_Model import evaluate_tflite_model
from src.Create_Model import train_model
from src.Model_Checker import model_has_problem
from src.TFLITE_Converter import convert_to_tflite
from src.Compile_Edge_TPU import compile_edgetpu

from src.Fitness_Function import calculate_fitness
import pickle
import os


[docs]def create_first_population(population, num_classes=5): """ Generates the initial set of models for a genetic algorithm. Parameters ---------- population: int The quantity of models to create. num_classes: int, optional The number of output classes in the model. Defaults to 5. Returns ------- first_population_array : np.ndarray A 3D numpy array representing the initial population of models. """ # Generate a 3D numpy array of random binary digits, # where the first dimension is the number of models, # and the second and third dimensions are the characteristics of each model. first_population_array = np.random.randint(0, 2, (population, 9, 18)) # Loop over each model in the population for i in range(population): # Create a model based on the characteristics encoded in the array model = create_model(first_population_array[i], num_classes=num_classes) # If the model has a problem, delete it and create a new one with different random characteristics while model_has_problem(model): del model first_population_array[i] = np.random.randint(0, 2, (9, 18)) model = create_model(first_population_array[i], num_classes=num_classes) # Delete the model to free up memory del model return first_population_array
[docs]def select_models(train_ds, val_ds, test_ds, time, population_array, generation, epochs=30, num_classes=5): """ Trains, evaluates, and selects the top performing models from a population based on their fitness scores. Parameters ---------- train_ds: tf.data.Dataset The dataset used for training the models. val_ds: tf.data.Dataset The dataset used for validating the models during training. test_ds: tf.data.Dataset The dataset used for testing the trained models. time: datetime or str A timestamp utilized in directory names for saving results. population_array: np.ndarray A 3D numpy array representing the population of models. generation: int The generation number of the models, used in directory names for saving results. epochs: int, optional The number of epochs to train each model. Defaults to 30. num_classes: int, optional The number of output classes in the model. Defaults to 5. Returns ------- best_models_arrays, max_fitness, average_fitness : tuple A tuple containing a list of the best model arrays, the maximum fitness, and the average fitness of the population. """ # Initialize lists to store the fitness, accuracy, and inference time of each model fitness_list = [] tflite_accuracy_list = [] tpu_time_list = [] # Define directory names for saving results result_dir = f'results_{time}' generation_dir = result_dir + f'/generation_{generation}' best_models_arrays_dir = generation_dir + '/best_model_arrays.pkl' fitness_list_dir = generation_dir + '/fitness_list.pkl' tflite_accuracy_list_dir = generation_dir + '/tflite_accuracy_list.pkl' tpu_time_list_dir = generation_dir + '/tpu_time_list.pkl' # Create the directories if they do not exist if not os.path.exists(result_dir): os.makedirs(result_dir) if not os.path.exists(generation_dir): os.makedirs(generation_dir) # Loop over each model in the population for i in range(population_array.shape[0]): # Create and train a model based on the characteristics encoded in the array model = create_model(population_array[i], num_classes=num_classes) model, _ = train_model(train_ds, val_ds, model=model, epochs=epochs) try: # Convert the model to TensorFlow Lite format and compile it for Edge TPU _, tflite_name = convert_to_tflite(keras_model=model, generation=generation, i=i, time=time) edgetpu_name = compile_edgetpu(tflite_name) # Evaluate the TensorFlow Lite model and get its accuracy and inference time tflite_accuracy, tpu_time = evaluate_tflite_model(tflite_model=edgetpu_name, tfl_int8=True) except: # If any error occurs during the conversion or evaluation process, set the accuracy to 0 and the inference time to a large value tflite_accuracy = 0 tpu_time = 9999 # Calculate the fitness of the model fitness = calculate_fitness(tflite_accuracy, tpu_time) # Add the accuracy, fitness, and inference time to their respective lists tflite_accuracy_list.append(tflite_accuracy) fitness_list.append(fitness) tpu_time_list.append(tpu_time) # Save the accuracy, fitness, and inference time lists to files with open(tflite_accuracy_list_dir, 'wb') as f: pickle.dump(tflite_accuracy_list, f) with open(tpu_time_list_dir, 'wb') as f: pickle.dump(tpu_time_list, f) # Calculate the maximum and average fitness of the population max_fitness = np.max(fitness_list) average_fitness = np.average(fitness_list) # Select the indices of the 5 models with the highest fitness best_models_indices = sorted(range(len(fitness_list)), key=lambda j: fitness_list[j], reverse=True)[:5] # Use the indices to select the model arrays from the population best_models_arrays = [population_array[k] for k in best_models_indices] # Save the array of the best models to a file with open(best_models_arrays_dir, 'wb') as f: pickle.dump(best_models_arrays, f) # Return a tuple containing the array of the best models, the maximum fitness, and the average fitness return best_models_arrays, max_fitness, average_fitness
[docs]def crossover(parent_arrays): """ Perform a crossover operation on a list of parent arrays to generate a child array. Parameters: ----------- parent_arrays : list of np.ndarray A list of parent arrays. Returns: ----------- child_array : np.ndarray A child array that is a combination of the parent arrays. """ # Generate a same-sized array filled with random integers between 0 and 4 (inclusive), # which will be used as indices to select elements from the parent arrays parent_indices = np.random.randint(0, 5, size=parent_arrays[0].shape) # Use the indices array to select elements from the parent arrays and form a new child array child_array = np.choose(parent_indices, parent_arrays) return child_array
[docs]def mutate(model_array, mutate_prob=0.05): """ Perform a mutation operation on a given model array. Parameters: ------------ model_array : np.ndarray The model array to be mutated. mutate_prob : float, optional The probability of mutation for each element in the array. Defaults to 0.05. Returns: ------------ mutated_array : np.ndarray The mutated model array. """ # Generate a same-sized array filled with random floats between 0 and 1 (inclusive) prob = np.random.uniform(size=(9, 18)) # Perform mutation operation: if the randomly generated number for a position is less than mutation probability, # flip the bit at that position in the model array; else, keep the original bit mutated_array = np.where(prob < mutate_prob, np.logical_not(model_array), model_array) return mutated_array
[docs]def create_next_population(parent_arrays, population=20, num_classes=5): """ Create the next generation of model arrays by performing crossover and mutation operations. Parameters: ----------- parent_arrays : list of np.ndarray A list of parent arrays. population : int, optional The size of the population to be generated. Defaults to 20. num_classes : int, optional The number of classes for the model. Defaults to 5. Returns: ----------- next_population_array : np.ndarray The next generation of model arrays. """ # Initialize the next generation with random integers between 0 and 1 next_population_array = np.random.randint(0, 2, (population, 9, 18)) # For each individual in the population for individual in range(population): # Perform crossover operation using parent arrays next_population_array[individual] = crossover(parent_arrays) # Perform mutation operation with a mutation probability of 0.03 next_population_array[individual] = mutate(next_population_array[individual], mutate_prob=0.03) # For each individual in the population for individual in range(population): # Create a model using the individual's model array model = create_model(next_population_array[individual], num_classes=num_classes) # If the model has a problem while model_has_problem(model): # Delete the model del model # Perform crossover operation using parent arrays next_population_array[individual] = crossover(parent_arrays) # Perform mutation operation with a mutation probability of 0.03 next_population_array[individual] = mutate(next_population_array[individual], mutate_prob=0.03) # Create a new model using the updated individual's model array model = create_model(next_population_array[individual], num_classes=num_classes) # Delete the model after checking del model # Return the next generation of model arrays return next_population_array
[docs]def start_evolution(train_ds, val_ds, test_ds, generations, population, num_classes, epochs, population_array=None, time=None): """ Start the evolutionary process for model optimization. Parameters: ----------- train_ds : tensorflow Dataset The training dataset. val_ds : tensorflow Dataset The validation dataset. test_ds : tensorflow Dataset The testing dataset. generations : int The number of generations to evolve through. population : int The size of the population in each generation. num_classes : int The number of classes in the target variable. epochs : int The number of epochs to train each model. population_array : np.ndarray, optional The initial population array. time : datetime or str, optional The timestamp to append to the result directory name. Returns: ----------- population_array, max_fitness_history, average_fitness_history, best_models_arrays : tuple The final population array, the history of maximum fitness score, the history of average fitness score, and the best model arrays. """ # Initialize fitness histories max_fitness_history = [] average_fitness_history = [] # If no initial population is provided, create one if population_array is None: population_array = create_first_population(population=32, num_classes=num_classes) # Define the results directory and create it if it does not exist result_dir = f'results_{time}' if not os.path.exists(result_dir): os.makedirs(result_dir) # For each generation for generation in range(generations): # Select the best models, calculate the maximum and average fitness best_models_arrays, max_fitness, average_fitness = select_models(train_ds=train_ds, val_ds=val_ds, test_ds=test_ds, time=time, population_array=population_array, generation=generation, epochs=epochs, num_classes=num_classes) # Create the next generation population population_array = create_next_population(parent_arrays=best_models_arrays, population=population, num_classes=num_classes) # Record the maximum and average fitness max_fitness_history.append(max_fitness) average_fitness_history.append(average_fitness) # Define the file paths for saving results next_population_array_dir = result_dir + '/next_population_array.pkl' max_fitness_history_dir = result_dir + '/max_fitness_history.pkl' average_fitness_history_dir = result_dir + '/average_fitness_history.pkl' best_model_arrays_dir = result_dir + '/best_model_arrays.pkl' # Save the results with open(next_population_array_dir, 'wb') as f: pickle.dump(population_array, f) with open(max_fitness_history_dir, 'wb') as f: pickle.dump(max_fitness_history, f) with open(average_fitness_history_dir, 'wb') as f: pickle.dump(average_fitness_history, f) with open(best_model_arrays_dir, 'wb') as f: pickle.dump(best_models_arrays, f) # Return the final population array, the history of maximum fitness score, # the history of average fitness score, and the best model arrays return population_array, max_fitness_history, average_fitness_history, best_models_arrays