diff --git a/inc/genetic.h b/inc/genetic.h index 829fd84..62def79 100644 --- a/inc/genetic.h +++ b/inc/genetic.h @@ -1,15 +1,19 @@ #include -#include namespace genetic { +template struct ReadonlySpan; +template struct Span; +template struct Stats; +template struct Strategy; + +template Stats run(Strategy); + template struct Strategy { int num_threads; // Number of worker threads that will be evaluating cell // fitness. - int num_retries; // Number of times worker threads will try to grab work pool - // lock before sleeping - int batch_size; // Number of cells a worker thread tries to evaluate in a row - // before locking the pool again. + int batch_size; // Number of cells a worker thread tries to work on in a row + // before accessing/locking the work queue again. int num_cells; // Size of the population pool int num_generations; // Number of times (epochs) to run the algorithm bool test_all; // Sets whether or not every cell is tested every generation @@ -21,14 +25,17 @@ template struct Strategy { float crossover_mutation_chance; // Chance to mutate a child cell int crossover_parent_num; // Number of unique high-scoring parents in a // crossover call. - int crossover_children_num; // Number of children produced in a crossover - bool enable_mutation; // Cells may be mutated before fitness evaluation + int crossover_children_num; // Number of children to expect the user to + // produce in the crossover function. + bool enable_mutation; // Cells may be mutated + // before fitness evaluation float mutation_chance; // Chance to mutate cells before fitness evaluation // User defined functions T (*make_default_cell)(); - void (*mutate)(T &cell); - void (*crossover)(const std::span &parents, std::span &out_children); + void (*mutate)(T &cell_to_modify); + void (*crossover)(const ReadonlySpan &parents, + const Span &out_children); float (*fitness)(const T &cell); }; @@ -37,6 +44,18 @@ template struct Stats { std::vector average_fitness; }; -template Stats run(Strategy); +template struct ReadonlySpan { + T *_data; + int len; + + const T &operator[](int i); +}; + +template struct Span { + T *_data; + int len; + + T &operator[](int i); +}; } // namespace genetic diff --git a/src/genetic.cpp b/src/genetic.cpp index 21b8b90..d5778b9 100644 --- a/src/genetic.cpp +++ b/src/genetic.cpp @@ -1,125 +1,139 @@ #include "genetic.h" #include "pthread.h" -#include +#include +#include #include +#define NUM_QUEUE_RETRIES 10 + +using namespace std; + +// std::visit/std::variant overload pattern +// See: +// https://www.modernescpp.com/index.php/visiting-a-std-variant-with-the-overload-pattern/ +// You don't have to understand this, just use it :) +template struct overload : Ts... { + using Ts::operator()...; +}; +template overload(Ts...) -> overload; + namespace genetic { template struct CellEntry { float score; T cell; - bool stale; }; -template struct WorkEntry { - const CellEntry &cur; - float &score; +template struct CrossoverJob { + const ReadonlySpan &parents; + const Span &children_out; +}; + +template struct FitnessJob { + const T &cell; + float &result_out; }; template struct WorkQueue { - std::vector> jobs; - int i; + variant, FitnessJob> *jobs; + int len; + int read_i; + int write_i; + bool done_writing; + + pthread_mutex_t data_mutex; + pthread_mutex_t gen_complete_mutex; + pthread_mutex_t jobs_available_mutex; + + pthread_cond_t gen_complete_cond; + pthread_cond_t jobs_available_cond; }; -static pthread_mutex_t data_mutex = PTHREAD_MUTEX_INITIALIZER; - -static pthread_mutex_t ready_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t ready_cond = PTHREAD_COND_INITIALIZER; - -static pthread_mutex_t gen_complete_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t gen_complete_cond = PTHREAD_COND_INITIALIZER; - -static pthread_mutex_t run_complete_mutex = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t run_complete_cond = PTHREAD_COND_INITIALIZER; - -/* Thoughts on this approach - * The ideal implementation of a worker thread has them operating at maximum - * load with as little synchronization overhead as possible. i.e. The ideal - * worker thread - * 1. Never waits for new work - * 2. Never spends time synchronizing with other worker threads - * - * Never is impossible, but we want to get as close as we can. - * - * There are two extreme situations to consider - * 1. Fitness functions with highly variable computation times - * 2. Fitness functions with identical computation times. - * - * Most applications that use this library will fall into the second - * category. - * - * In the highly-variable computation time case, it's useful for worker threads - * to operate on 1 work entry at a time. Imagine a scenario with 2 threads, each - * of which claims half the work to do. If thread A completes all of its work - * quickly, it goes to sleep while thread B slogs away on its harder-to-compute - * fitness jobs. However, if both threads only claim 1 work entry at a time, - * thread A can immediately claim new jobs after it completes its current one. - * Thread B can toil away, but little time is lost since thread A remains - * productive. - * - * In the highly consistent computation time case, it's ideal for each - * thread to claim an equal share of the jobs (as this minimizes time spent - * synchronizing access to the job pool). Give each thread its set of work once - * and let them have at it instead of each thread constantly locking/waiting - * on the job queue. - * - * I take a hybrid approach. Users can specify a "batch size". Worker threads - * will bite off jobs in chunks and complete them before locking - * the job pool to grab another chunk. The user should choose a batch size close - * to 1 if their fitness function compute time is highly variable and closer to - * num_cells / num_threads if computation time is consistent. Users should - * experiment with a batch size that works well for their problem. - * - * Worth mentioning this avoiding synchronization is irrelevant once computation - * time >>> synchronization time. - * - * There might be room for dynamic batch size modification, but I don't expect - * to pursue this feature until the library is more mature (and I've run out of - * cooler things to do). - * - */ -template -void worker(std::queue> &fitness_queue, int batch_size, - int num_retries) { - int retries = 0; - std::vector> batch; - bool gen_is_finished; - while (true) { - gen_is_finished = false; - if (pthread_mutex_trylock(&data_mutex)) { - retries = 0; - for (int i = 0; i < batch_size; i++) { - if (fitness_queue.empty()) { - gen_is_finished = true; - break; - } - batch.push_back(fitness_queue.front()); - fitness_queue.pop(); - } - pthread_mutex_unlock(&data_mutex); - } else { - retries++; - } - - if (gen_is_finished) { - pthread_cond_signal(&gen_complete_cond, &gen_complete_mutex); - } - - if (retries > num_retries) { - pthread_mutex_lock(&ready_mutex); - pthread_cond_wait(&ready_cond, &ready_mutex); - retries = 0; - } - } - pthread_mutex_lock(&data_mutex); +template WorkQueue make_work_queue(int len) { + return {.jobs = (variant, CrossoverJob> *)malloc( + sizeof(variant, CrossoverJob>) * len), + .len = len, + .read_i = 0, + .write_i = 0, + .done_writing = false, + .data_mutex = PTHREAD_MUTEX_INITIALIZER, + .gen_complete_mutex = PTHREAD_MUTEX_INITIALIZER, + .jobs_available_mutex = PTHREAD_MUTEX_INITIALIZER, + .gen_complete_cond = PTHREAD_COND_INITIALIZER, + .jobs_available_cond = PTHREAD_COND_INITIALIZER}; +} + +template struct JobBatch { + ReadonlySpan, FitnessJob>> jobs; + bool gen_complete; +}; + +template +optional> get_job_batch(WorkQueue &queue, int batch_size, + bool *stop_flag) { + while (true) { + for (int i = 0; i < NUM_QUEUE_RETRIES; i++) { + if (queue.read_i < queue.write_i && + pthread_mutex_trylock(&queue.data_mutex)) { + JobBatch res; + res.jobs._data = &queue._jobs[queue.read_i]; + int span_size = min(batch_size, queue.write_i - queue.read_i); + res.jobs.len = span_size; + + queue.read_i += span_size; + res.gen_complete = queue.done_writing && queue.read_i == queue.write_i; + + pthread_mutex_unlock(&queue.data_mutex); + return res; + } + } + pthread_mutex_lock(&queue.jobs_available_mutex); + pthread_cond_wait(queue.jobs_available_cond, &queue.jobs_available_mutex); + if (stop_flag) + return {}; + } +} + +template struct WorkerThreadArgs { + Strategy &strat; + WorkQueue &queue; + bool *stop_flag; +}; + +template void *worker(void *args) { + WorkerThreadArgs *work_args = (WorkerThreadArgs *)args; + Strategy &strat = work_args->strat; + WorkQueue &queue = work_args->queue; + bool *stop_flag = work_args->stop_flag; + + auto JobDispatcher = overload{ + [strat](FitnessJob fj) { fj.result_out = strat.fitness(fj.cell); }, + [strat](CrossoverJob cj) { + strat.crossover(cj.parents, cj.children_out); + }, + }; + + while (true) { + auto batch = get_job_batch(queue, strat.batch_size, stop_flag); + if (!batch || *stop_flag) + return NULL; + + // Do the actual work + for (int i = 0; i < batch->jobs.len; i++) { + visit(JobDispatcher, batch->jobs[i]); + } + + if (batch->gen_complete) { + pthread_cond_signal(&queue.gen_complete_cond, &queue.gen_complete_mutex); + } + } } -// Definitions template Stats run(Strategy strat) { Stats stats; + WorkQueue queue = make_work_queue(strat.num_cells); - std::queue> fitness_queue; - std::vector> cells_a, cells_b; + vector> cells_a, cells_b; for (int i = 0; i < strat.num_cells; i++) { T cell = strat.make_default_cell(); cells_a.push_back({0, cell, true}); @@ -129,11 +143,32 @@ template Stats run(Strategy strat) { std::vector> &cur_cells = cells_a; std::vector> &next_cells = cells_b; - for (int i = 0; i < strat.num_generations; i++) { + bool stop_flag = false; + WorkerThreadArgs args = { + .strat = strat, .queue = queue, .stop_flag = &stop_flag}; + // spawn worker threads + pthread_t threads[strat.num_threads]; + for (int i = 0; i < strat.num_threads; i++) { + pthread_create(&threads[i], NULL, worker, (void *)args); + } + + for (int i = 0; i < strat.num_generations; i++) { + // generate fitness jobs + // wait for fitness jobs to complete + // sort cells on performance + // generate crossover jobs cur_cells = cur_cells == cells_a ? cells_b : cells_a; next_cells = cur_cells == cells_a ? cells_b : cells_a; } + + // stop worker threads + stop_flag = true; + pthread_cond_broadcast(queue.jobs_available_cond); + for (int i = 0; i < strat.num_threads; i++) { + pthread_join(threads[i], NULL); + } + } } // namespace genetic