From: Rusty Russell Date: Tue, 31 May 2011 04:14:48 +0000 (+0930) Subject: lbalance: new module for load balancing X-Git-Url: https://git.ozlabs.org/?p=ccan;a=commitdiff_plain;h=f69389e6806e2f0b63084576ffa4c84bba699147;ds=sidebyside lbalance: new module for load balancing --- diff --git a/ccan/lbalance/LICENSE b/ccan/lbalance/LICENSE new file mode 120000 index 00000000..190cfd5e --- /dev/null +++ b/ccan/lbalance/LICENSE @@ -0,0 +1 @@ +../../licenses/GPL-3 \ No newline at end of file diff --git a/ccan/lbalance/_info b/ccan/lbalance/_info new file mode 100644 index 00000000..c0d0ab0e --- /dev/null +++ b/ccan/lbalance/_info @@ -0,0 +1,25 @@ +#include "config.h" +#include + +/** + * lbalance - helpers for loadbalancing parallel tasks + * + * This code helps when you have a large number of one-shot tasks; it tries + * to determine the maximum amount of useful parallelism. + * + * License: GPL + * Author: Rusty Russell + */ +int main(int argc, char *argv[]) +{ + /* Expect exactly one argument */ + if (argc != 2) + return 1; + + if (strcmp(argv[1], "depends") == 0) { + printf("ccan/tlist\n"); + return 0; + } + + return 1; +} diff --git a/ccan/lbalance/lbalance.c b/ccan/lbalance/lbalance.c new file mode 100644 index 00000000..c3cc4a22 --- /dev/null +++ b/ccan/lbalance/lbalance.c @@ -0,0 +1,326 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +/* Define tlist_lbalance_task */ +TLIST_TYPE(lbalance_task, struct lbalance_task); + +struct stats { + /* How many stats of for this value do we have? */ + unsigned int num_stats; + /* What was our total work rate? */ + float work_rate; +}; + +struct lbalance { + struct tlist_lbalance_task tasks; + unsigned int num_tasks; + + /* We figured out how many we want to run. */ + unsigned int target; + /* We need to recalc once a report comes in via lbalance_task_free. */ + bool target_uptodate; + + /* Integral of how many tasks were running so far */ + struct timeval prev_tasks_time; + float tasks_sum; + + /* For differential rusage. */ + struct rusage prev_usage; + + /* How many stats we have collected (we invalidate old ones). */ + unsigned int total_stats; + + /* Array of stats, indexed by number of tasks we were running. */ + unsigned int max_stats; + struct stats *stats; +}; + +struct lbalance_task { + struct lbalance *lb; + struct list_node list; + + /* The time this task started */ + struct timeval start; + float tasks_sum_start; +}; + +struct lbalance *lbalance_new(void) +{ + struct lbalance *lb = malloc(sizeof *lb); + if (!lb) + return NULL; + + tlist_init(&lb->tasks); + lb->num_tasks = 0; + gettimeofday(&lb->prev_tasks_time, NULL); + lb->tasks_sum = 0.0; + + getrusage(RUSAGE_CHILDREN, &lb->prev_usage); + + lb->max_stats = 1; + lb->stats = malloc(sizeof(lb->stats[0]) * lb->max_stats); + if (!lb->stats) { + free(lb); + return NULL; + } + lb->stats[0].num_stats = 0; + lb->stats[0].work_rate = 0.0; + lb->total_stats = 0; + + /* Start with # CPUS as a guess. */ + lb->target = -1L; +#ifdef _SC_NPROCESSORS_ONLN + lb->target = sysconf(_SC_NPROCESSORS_ONLN); +#elif defined(_SC_NPROCESSORS_CONF) + if (lb->target == (unsigned int)-1L) + lb->target = sysconf(_SC_NPROCESSORS_CONF); +#endif + /* Otherwise, two is a good number. */ + if (lb->target == (unsigned int)-1L || lb->target < 2) + lb->target = 2; + lb->target_uptodate = true; + + return lb; +} + +/* Return time differences in usec */ +static float timeval_sub(struct timeval recent, struct timeval old) +{ + float diff; + + if (old.tv_usec > recent.tv_usec) { + diff = 1000000 + recent.tv_usec - old.tv_usec; + recent.tv_sec--; + } else + diff = recent.tv_usec - old.tv_usec; + + diff += (float)(recent.tv_sec - old.tv_sec) * 1000000; + return diff; +} + +/* There were num_tasks running between prev_tasks_time and now. */ +static void update_tasks_sum(struct lbalance *lb, + const struct timeval *now) +{ + lb->tasks_sum += timeval_sub(*now, lb->prev_tasks_time) + * lb->num_tasks; + lb->prev_tasks_time = *now; +} + +struct lbalance_task *lbalance_task_new(struct lbalance *lb) +{ + struct lbalance_task *task = malloc(sizeof *task); + if (!task) + return NULL; + + if (lb->num_tasks + 1 == lb->max_stats) { + struct stats *s = realloc(lb->stats, + sizeof(*s) * (lb->max_stats + 1)); + if (!s) { + free(task); + return NULL; + } + lb->stats = s; + lb->stats[lb->max_stats].num_stats = 0; + lb->stats[lb->max_stats].work_rate = 0.0; + lb->max_stats++; + } + + task->lb = lb; + gettimeofday(&task->start, NULL); + + /* Record that we ran num_tasks up until now. */ + update_tasks_sum(lb, &task->start); + + task->tasks_sum_start = lb->tasks_sum; + tlist_add_tail(&lb->tasks, task, list); + lb->num_tasks++; + + return task; +} + +/* We slowly erase old stats, once we have enough. */ +static void degrade_stats(struct lbalance *lb) +{ + unsigned int i; + + if (lb->total_stats < lb->max_stats * 16) + return; + +#if 0 + fprintf(stderr, "."); +#endif + for (i = 0; i < lb->max_stats; i++) { + struct stats *s = &lb->stats[i]; + unsigned int stats_lost = (s->num_stats + 1) / 2; + s->work_rate *= (float)(s->num_stats - stats_lost) + / s->num_stats; + s->num_stats -= stats_lost; + lb->total_stats -= stats_lost; + if (s->num_stats == 0) + s->work_rate = 0.0; + } +} + +static void add_to_stats(struct lbalance *lb, + unsigned int num_tasks, + float work_rate) +{ +#if 0 + fprintf(stderr, "With %.2f running, work rate was %.5f\n", + num_tasks, work_rate); +#endif + assert(num_tasks >= 1); + assert(num_tasks < lb->max_stats); + + lb->stats[num_tasks].num_stats++; + lb->stats[num_tasks].work_rate += work_rate; + lb->total_stats++; + lb->target_uptodate = false; +} + +void lbalance_task_free(struct lbalance_task *task, + const struct rusage *usage) +{ + float work_done, duration; + unsigned int num_tasks; + struct timeval now; + struct rusage ru; + + gettimeofday(&now, NULL); + duration = timeval_sub(now, task->start); + + getrusage(RUSAGE_CHILDREN, &ru); + if (usage) { + work_done = usage->ru_utime.tv_usec + usage->ru_stime.tv_usec + + (usage->ru_utime.tv_sec + usage->ru_stime.tv_sec) + * 1000000; + } else { + /* Take difference in rusage as rusage of that task. */ + work_done = timeval_sub(ru.ru_utime, + task->lb->prev_usage.ru_utime) + + timeval_sub(ru.ru_stime, + task->lb->prev_usage.ru_utime); + } + /* Update previous usage. */ + task->lb->prev_usage = ru; + + /* Record that we ran num_tasks up until now. */ + update_tasks_sum(task->lb, &now); + + /* So, on average, how many tasks were running during this time? */ + num_tasks = (task->lb->tasks_sum - task->tasks_sum_start) + / duration + 0.5; + + /* Record the work rate for that many tasks. */ + add_to_stats(task->lb, num_tasks, work_done / duration); + + /* We throw away old stats. */ + degrade_stats(task->lb); + + /* We need to recalculate the target. */ + task->lb->target_uptodate = false; + + /* Remove this task. */ + tlist_del_from(&task->lb->tasks, task, list); + task->lb->num_tasks--; + free(task); +} + +/* We look for the point where the work rate starts to drop. Say you have + * 4 cpus, we'd expect the work rate for 5 processes to drop 20%. + * + * If we're within 1/4 of that ideal ratio, we assume it's still + * optimal. Any drop of more than 1/2 is interpreted as the point we + * are overloaded. */ +static unsigned int best_target(const struct lbalance *lb) +{ + unsigned int i, best = 0, found_drop = 0; + float best_f_max = -1.0, cliff = -1.0; + +#if 0 + for (i = 1; i < lb->max_stats; i++) { + printf("%u: %f (%u)\n", i, + lb->stats[i].work_rate / lb->stats[i].num_stats, + lb->stats[i].num_stats); + } +#endif + + for (i = 1; i < lb->max_stats; i++) { + float f; + + if (!lb->stats[i].num_stats) + f = 0; + else + f = lb->stats[i].work_rate / lb->stats[i].num_stats; + + if (f > best_f_max) { +#if 0 + printf("Best is %i\n", i); +#endif + best_f_max = f - (f / (i + 1)) / 4; + cliff = f - (f / (i + 1)) / 2; + best = i; + found_drop = 0; + } else if (!found_drop && f < cliff) { +#if 0 + printf("Found drop at %i\n", i); +#endif + found_drop = i; + } + } + + if (found_drop) { + return found_drop - 1; + } + return i - 1; +} + +static unsigned int calculate_target(struct lbalance *lb) +{ + unsigned int target; + + target = best_target(lb); + + /* Jitter if the adjacent ones are unknown. */ + if (target >= lb->max_stats || lb->stats[target].num_stats == 0) + return target; + + if (target + 1 == lb->max_stats || lb->stats[target+1].num_stats == 0) + return target + 1; + + if (target > 1 && lb->stats[target-1].num_stats == 0) + return target - 1; + + return target; +} + +unsigned lbalance_target(struct lbalance *lb) +{ + if (!lb->target_uptodate) { + lb->target = calculate_target(lb); + lb->target_uptodate = true; + } + return lb->target; +} + +void lbalance_free(struct lbalance *lb) +{ + struct lbalance_task *task; + + while ((task = tlist_top(&lb->tasks, struct lbalance_task, list))) { + assert(task->lb == lb); + tlist_del_from(&lb->tasks, task, list); + lb->num_tasks--; + free(task); + } + assert(lb->num_tasks == 0); + free(lb->stats); + free(lb); +} diff --git a/ccan/lbalance/lbalance.h b/ccan/lbalance/lbalance.h new file mode 100644 index 00000000..d318a6fe --- /dev/null +++ b/ccan/lbalance/lbalance.h @@ -0,0 +1,52 @@ +#ifndef CCAN_LBALANCE_H +#define CCAN_LBALANCE_H +#include "config.h" + +struct lbalance; +struct lbalance_task; +struct timeval; +struct rusage; + +/** + * lbalance_new - initialize a load balancing structure. + */ +struct lbalance *lbalance_new(void); + +/** + * lbalance_task_new - mark the starting of a new task. + * @lbalance: the load balancer from lbalance_new. + */ +struct lbalance_task *lbalance_task_new(struct lbalance *lbalance); + +/** + * lbalance_task_free - mark the completion of a task. + * @task: the lbalance_task from lbalance_task_new, which will be freed. + * @usage: the resource usage for that task (or NULL). + * + * If @usage is NULL, you must have already wait()ed for the child so + * that lbalance_task_free() can derive it from the difference in + * getrusage() for the child processes. + * + * Otherwise, lbalance_task_free() is a noop, which is useful for failure + * paths. + */ +void lbalance_task_free(struct lbalance_task *task, + const struct rusage *usage); + +/** + * lbalance_target - how many tasks in parallel are recommended? + * @lbalance: the load balancer from lbalance_new. + * + * Normally you keep creating tasks until this limit is reached. It's + * updated by stats from lbalance_task_free. + */ +unsigned lbalance_target(struct lbalance *lbalance); + +/** + * lbalance_free - free a load balancing structure. + * @lbalance: the load balancer from lbalance_new. + * + * Also frees any tasks still attached. + */ +void lbalance_free(struct lbalance *lbalance); +#endif /* CCAN_LBALANCE_H */ diff --git a/ccan/lbalance/test/run.c b/ccan/lbalance/test/run.c new file mode 100644 index 00000000..542501c0 --- /dev/null +++ b/ccan/lbalance/test/run.c @@ -0,0 +1,97 @@ +#include "config.h" +#include +#include +#include +#include + +static int fake_gettimeofday(struct timeval *tv, struct timezone *tz); +static int fake_getrusage(int who, struct rusage *usage); +#define gettimeofday fake_gettimeofday +#define getrusage fake_getrusage + +#include +#include + +static unsigned faketime_ms = 0; +static struct rusage total_usage; + +static int fake_gettimeofday(struct timeval *tv, struct timezone *tz) +{ + assert(tz == NULL); + tv->tv_usec = (faketime_ms % 1000) * 1000; + tv->tv_sec = faketime_ms / 1000; + return 0; +} + +static int fake_getrusage(int who, struct rusage *usage) +{ + assert(who == RUSAGE_CHILDREN); + *usage = total_usage; + return 0; +} + +static void test_optimum(struct lbalance *lb, unsigned int optimum) +{ + unsigned int j, i, num_tasks = 0, usec, num_counted = 0; + float average; + struct lbalance_task *tasks[1000]; + + for (j = 0; j < 1000; j++) { + diag("lbalance_target is %u\n", lbalance_target(lb)); + /* We measure average once we try optimum once. */ + if (lbalance_target(lb) == optimum && num_counted == 0) { + average = lbalance_target(lb); + num_counted = 1; + } else if (num_counted) { + average += lbalance_target(lb); + num_counted++; + } + + /* Create tasks until we reach target. */ + for (i = 0; i < lbalance_target(lb); i++) { + tasks[i] = lbalance_task_new(lb); + } + num_tasks = i; + + faketime_ms += 100; + /* If we're under optimum, set utilization to 100% */ + if (num_tasks <= optimum) { + usec = 100000; + } else { + usec = 100000 * optimum / num_tasks; + } + + for (i = 0; i < num_tasks; i++) { + total_usage.ru_utime.tv_usec += usec / 2; + if (total_usage.ru_utime.tv_usec > 1000000) { + total_usage.ru_utime.tv_usec -= 1000000; + total_usage.ru_utime.tv_sec++; + } + total_usage.ru_stime.tv_usec += usec / 2; + if (total_usage.ru_stime.tv_usec > 1000000) { + total_usage.ru_stime.tv_usec -= 1000000; + total_usage.ru_stime.tv_sec++; + } + lbalance_task_free(tasks[i], NULL); + } + } + + /* We should have stayed close to optimum. */ + ok1(num_counted && (int)(average / num_counted + 0.5) == optimum); +} + +int main(void) +{ + struct lbalance *lb; + + plan_tests(4); + lb = lbalance_new(); + + test_optimum(lb, 1); + test_optimum(lb, 2); + test_optimum(lb, 4); + test_optimum(lb, 64); + lbalance_free(lb); + + return exit_status(); +} diff --git a/ccan/lbalance/tools/Makefile b/ccan/lbalance/tools/Makefile new file mode 100644 index 00000000..fce690c2 --- /dev/null +++ b/ccan/lbalance/tools/Makefile @@ -0,0 +1,13 @@ +#! /usr/bin/make +MODULES=../../jmap.o ../../time.o + +CFLAGS=-I../../.. -g #-O2 +LDFLAGS=-lJudy + +lbalance: lbalance.c $(MODULES) + +$(MODULES): + make -C ../../.. $(patsubst ../../%.o, ccan/%.o, $@) EXCLUDE= + +clean: + rm -f lbalance $(MODULES) diff --git a/ccan/lbalance/tools/lbalance.c b/ccan/lbalance/tools/lbalance.c new file mode 100644 index 00000000..228c947f --- /dev/null +++ b/ccan/lbalance/tools/lbalance.c @@ -0,0 +1,117 @@ +#include +#include +#include +#include +#include +#include + +/* Defines struct jmap_task. */ +JMAP_DEFINE_UINTIDX_TYPE(struct lbalance_task, task); + +/* Figure out how many loops we need to run for about 1 second. */ +static unsigned long burn_count; + +static void calibrate_burn_cpu(void) +{ + struct timeval start = time_now(); + + while (time_less(time_now(), time_add(start, time_from_msec(1000)))) + burn_count++; + printf("Burn count = %lu\n", burn_count); +} + +static void burn_cpu(void) +{ + unsigned int i, after = 0; + struct timeval start = time_now(); + + /* We do a loop similar to the calibrate_burn_cpu loop. */ + for (i = 0; i < burn_count; i++) { + after += time_less(time_now(), + time_add(start, time_from_msec(1000))); + } + /* We use the result so the compiler can't discard it. */ + exit(after); +} + +static pid_t spawn(char *args[]) +{ + pid_t pid = fork(); + + if (pid == -1) + err(1, "forking"); + if (pid == 0) { + if (!args[0]) + burn_cpu(); + execvp(args[0], args); + err(1, "exec failed"); + } + return pid; +} + +int main(int argc, char *argv[]) +{ + unsigned int i, num, fixed_target = 0, num_done = 0, num_running = 0; + struct lbalance *lb; + struct jmap_task *tasks = jmap_task_new(); + + if (argc < 2) { + fprintf(stderr, + "Usage: lbalance --fixed= [...]\n" + "OR: lbalance [...]\n"); + exit(1); + } + + if (strncmp(argv[1], "--fixed=", strlen("--fixed=")) == 0) { + fixed_target = atoi(argv[1] + strlen("--fixed=")); + if (!fixed_target) + errx(1, "Need positive number after --fixed"); + argv++; + argc--; + lb = NULL; + } else { + lb = lbalance_new(); + } + num = atoi(argv[1]); + argv++; + argc--; + + if (!argv[1]) + calibrate_burn_cpu(); + + while (num_done < num) { + unsigned int j, target = fixed_target; + struct lbalance_task *task; + struct rusage ru; + pid_t pid; + + if (lb) { + target = lbalance_target(lb); + printf("(%u)", target); + } + + while (num_running < target && num_done + num_running < num) { + pid = spawn(argv+1); + if (lb) + task = lbalance_task_new(lb); + else + task = (void *)1; + jmap_task_add(tasks, pid, task); + num_running++; + printf("+"); fflush(stdout); + } + + /* Now wait for something to die! */ + pid = wait3(NULL, 0, &ru); + task = jmap_task_get(tasks, pid); + if (lb) + lbalance_task_free(task, &ru); + num_done++; + num_running--; + printf("-"); fflush(stdout); + } + printf("\n"); + if (lb) + lbalance_free(lb); + return 0; +}