From 877f2c0b88321839b939d1031ad7e5a386d89dc0 Mon Sep 17 00:00:00 2001 From: rodri Date: Mon, 2 Sep 2024 13:13:03 +0000 Subject: parallel experiments lain. --- main1.c | 132 +++++++++++++++++++++++++++++++++++++++++++++++ main2.c | 180 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ main3.c | 178 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ main4.c | 148 ++++++++++++++++++++++++++++++++++++++++++++++++++++ mkfile | 9 ++++ readme | 8 +++ 6 files changed, 655 insertions(+) create mode 100644 main1.c create mode 100644 main2.c create mode 100644 main3.c create mode 100644 main4.c create mode 100644 mkfile create mode 100644 readme diff --git a/main1.c b/main1.c new file mode 100644 index 0000000..222d0be --- /dev/null +++ b/main1.c @@ -0,0 +1,132 @@ +#include +#include +#include + +typedef struct Ttask Ttask; +typedef struct Tpool Tpool; + +struct Ttask +{ + void (*fn)(void*); + void *arg; +}; + +struct Tpool +{ + ulong nprocs; + Ref nworking; + + Channel *subq; /* task submission queue */ + Channel *done; /* task completion signal */ +}; + +void +threadloop(void *arg) +{ + Tpool *pool; + Ttask *task; + + pool = arg; + + while((task = recvp(pool->subq)) != nil){ + incref(&pool->nworking); + task->fn(task->arg); + decref(&pool->nworking); + nbsend(pool->done, nil); + } +} + +Tpool * +mkthreadpool(ulong nprocs) +{ + Tpool *tp; + + tp = malloc(sizeof *tp); + memset(tp, 0, sizeof *tp); + tp->nprocs = nprocs; + tp->subq = chancreate(sizeof(void*), nprocs); + tp->done = chancreate(sizeof(void*), 0); + while(nprocs--) + proccreate(threadloop, tp, mainstacksize); + return tp; +} + +void +threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg) +{ + Ttask *t; + + t = malloc(sizeof *t); + t->fn = fn; + t->arg = arg; + + sendp(tp->subq, t); +} + +typedef struct Tsum Tsum; +struct Tsum +{ + int a; + int b; +}; +void +sum(void *arg) +{ + Tsum *sum; + int cnt; + + sum = arg; + cnt = 100; + while(cnt--) sum->a = sum->a+sum->b; +} + +void +usage(void) +{ + fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0); + exits(nil); +} + +void +threadmain(int argc, char *argv[]) +{ + static int W = 1000, H = 1000; + Tpool *pool; + Tsum *t; + int i, j; + int threaded; + int nprocs; + + threaded = 0; + nprocs = 8; + ARGBEGIN{ + case 't': threaded++; break; + case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break; + default: usage(); + }ARGEND; + if(argc != 0) + usage(); + + t = malloc(W*H*sizeof(*t)); + if(threaded){ + pool = mkthreadpool(nprocs); + + for(i = 0; i < H; i++) + for(j = 0; j < W; j++){ + t[i*W+j] = (Tsum){i, j}; + threadpoolexec(pool, sum, &t[i*W+j]); + } + + while(pool->nworking.ref > 0) + recvp(pool->done); + + threadexitsall(nil); + } + + for(i = 0; i < H; i++) + for(j = 0; j < W; j++){ + t[i*W+j] = (Tsum){i, j}; + sum(&t[i*W+j]); + } + exits(nil); +} diff --git a/main2.c b/main2.c new file mode 100644 index 0000000..67e5b3a --- /dev/null +++ b/main2.c @@ -0,0 +1,180 @@ +#include +#include +#include + +typedef struct Ttask Ttask; +typedef struct Taskq Taskq; +typedef struct Tpool Tpool; + +struct Ttask +{ + void (*fn)(void*); + void *arg; + Ttask *next; +}; + +struct Taskq +{ + Ttask *hd; + Ttask *tl; +}; + +struct Tpool +{ + QLock; + Rendez empty; + ulong nprocs; + Ref nworking; + + Taskq subq; /* task submission queue */ + Channel *done; /* task completion signal */ +}; + +void +taskqput(Tpool *tp, Ttask *t) +{ + qlock(tp); + if(tp->subq.tl == nil){ + tp->subq.hd = tp->subq.tl = t; + rwakeup(&tp->empty); + qunlock(tp); + return; + } + + tp->subq.tl->next = t; + tp->subq.tl = t; + rwakeup(&tp->empty); + qunlock(tp); +} + +Ttask * +taskqget(Tpool *tp) +{ + Ttask *t; + + qlock(tp); + while(tp->subq.hd == nil) + rsleep(&tp->empty); + + t = tp->subq.hd; + tp->subq.hd = t->next; + t->next = nil; + if(tp->subq.hd == nil) + tp->subq.tl = nil; + qunlock(tp); + return t; +} + +void +threadloop(void *arg) +{ + Tpool *pool; + Ttask *task; + + pool = arg; + + for(;;){ + task = taskqget(pool); + if(task == nil) + continue; + incref(&pool->nworking); + task->fn(task->arg); + decref(&pool->nworking); + nbsend(pool->done, nil); + } +} + +Tpool * +mkthreadpool(ulong nprocs) +{ + Tpool *tp; + + tp = malloc(sizeof *tp); + memset(tp, 0, sizeof *tp); + tp->empty.l = &tp->QLock; + tp->nprocs = nprocs; + tp->done = chancreate(sizeof(void*), 0); + while(nprocs--) + proccreate(threadloop, tp, mainstacksize); + return tp; +} + +void +threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg) +{ + Ttask *t; + + t = malloc(sizeof *t); + memset(t, 0, sizeof *t); + t->fn = fn; + t->arg = arg; + taskqput(tp, t); +} + +typedef struct Tsum Tsum; +struct Tsum +{ + int a; + int b; +}; +void +sum(void *arg) +{ + Tsum *sum; + int cnt; + + sum = arg; + cnt = 100; + while(cnt--) sum->a = sum->a+sum->b; +} + +void +usage(void) +{ + fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0); + exits(nil); +} + +void +threadmain(int argc, char *argv[]) +{ + static int W = 1000, H = 1000; + Tpool *pool; + Tsum *t; + int i, j; + int threaded; + int nprocs; + + threaded = 0; + nprocs = 8; + ARGBEGIN{ + case 't': threaded++; break; + case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break; + default: usage(); + }ARGEND; + if(argc != 0) + usage(); + + t = malloc(W*H*sizeof(*t)); + if(threaded){ + pool = mkthreadpool(nprocs); + + for(i = 0; i < H; i++) + for(j = 0; j < W; j++){ + t[i*W+j] = (Tsum){i, j}; + threadpoolexec(pool, sum, &t[i*W+j]); + } + + while(pool->nworking.ref > 0) + recvp(pool->done); + + threadexitsall(nil); + } + + for(i = 0; i < H; i++) + for(j = 0; j < W; j++){ + t[i*W+j] = (Tsum){i, j}; + sum(&t[i*W+j]); + } + exits(nil); +} diff --git a/main3.c b/main3.c new file mode 100644 index 0000000..871f586 --- /dev/null +++ b/main3.c @@ -0,0 +1,178 @@ +#include +#include +#include + +typedef struct Ttask Ttask; +typedef struct Taskq Taskq; +typedef struct Tpool Tpool; + +struct Ttask +{ + void (*fn)(void*); + void *arg; + Ttask *next; +}; + +struct Taskq +{ + Ttask *hd; + Ttask *tl; +}; + +struct Tpool +{ + QLock; + ulong nprocs; + Ref nworking; + + Taskq subq; /* task submission queue */ + Channel *done; /* task completion signal */ +}; + +void +taskqput(Tpool *tp, Ttask *t) +{ + qlock(tp); + if(tp->subq.tl == nil){ + tp->subq.hd = tp->subq.tl = t; + qunlock(tp); + return; + } + + tp->subq.tl->next = t; + tp->subq.tl = t; + qunlock(tp); +} + +Ttask * +taskqget(Tpool *tp) +{ + Ttask *t; + + qlock(tp); + if(tp->subq.hd == nil){ + qunlock(tp); + return nil; + } + + t = tp->subq.hd; + tp->subq.hd = t->next; + t->next = nil; + if(tp->subq.hd == nil) + tp->subq.tl = nil; + qunlock(tp); + return t; +} + +void +threadloop(void *arg) +{ + Tpool *pool; + Ttask *task; + + pool = arg; + + for(;;){ + task = taskqget(pool); + if(task == nil) + continue; + incref(&pool->nworking); + task->fn(task->arg); + decref(&pool->nworking); + nbsend(pool->done, nil); + } +} + +Tpool * +mkthreadpool(ulong nprocs) +{ + Tpool *tp; + + tp = malloc(sizeof *tp); + memset(tp, 0, sizeof *tp); + tp->nprocs = nprocs; + tp->done = chancreate(sizeof(void*), 0); + while(nprocs--) + proccreate(threadloop, tp, mainstacksize); + return tp; +} + +void +threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg) +{ + Ttask *t; + + t = malloc(sizeof *t); + memset(t, 0, sizeof *t); + t->fn = fn; + t->arg = arg; + taskqput(tp, t); +} + +typedef struct Tsum Tsum; +struct Tsum +{ + int a; + int b; +}; +void +sum(void *arg) +{ + Tsum *sum; + int cnt; + + sum = arg; + cnt = 100; + while(cnt--) sum->a = sum->a+sum->b; +} + +void +usage(void) +{ + fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0); + exits(nil); +} + +void +threadmain(int argc, char *argv[]) +{ + static int W = 10, H = 10; + Tpool *pool; + Tsum *t; + int i, j; + int threaded; + int nprocs; + + threaded = 0; + nprocs = 8; + ARGBEGIN{ + case 't': threaded++; break; + case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break; + default: usage(); + }ARGEND; + if(argc != 0) + usage(); + + t = malloc(W*H*sizeof(*t)); + if(threaded){ + pool = mkthreadpool(nprocs); + + for(i = 0; i < H; i++) + for(j = 0; j < W; j++){ + t[i*W+j] = (Tsum){i, j}; + threadpoolexec(pool, sum, &t[i*W+j]); + } + + while(pool->nworking.ref > 0) + recvp(pool->done); + + threadexitsall(nil); + } + + for(i = 0; i < H; i++) + for(j = 0; j < W; j++){ + t[i*W+j] = (Tsum){i, j}; + sum(&t[i*W+j]); + } + exits(nil); +} diff --git a/main4.c b/main4.c new file mode 100644 index 0000000..cd0e1d8 --- /dev/null +++ b/main4.c @@ -0,0 +1,148 @@ +#include +#include +#include +#include +#include + +typedef struct Ttask Ttask; +typedef struct Tpool Tpool; + +struct Ttask +{ + void (*fn)(void*); + void *arg; +}; + +struct Tpool +{ + ulong nprocs; + Ref issued; + Ref complete; + + Channel *subq; /* task submission queue */ + Channel *done; /* task completion signal */ +}; + +void +threadloop(void *arg) +{ + Tpool *pool; + Ttask *task; + + pool = arg; + + while((task = recvp(pool->subq)) != nil){ + task->fn(task->arg); + incref(&pool->complete); + nbsend(pool->done, nil); + } +} + +Tpool * +mkthreadpool(ulong nprocs) +{ + Tpool *tp; + + tp = malloc(sizeof *tp); + memset(tp, 0, sizeof *tp); + tp->nprocs = nprocs; + tp->subq = chancreate(sizeof(void*), nprocs); + tp->done = chancreate(sizeof(void*), 0); + while(nprocs--) + proccreate(threadloop, tp, mainstacksize); + return tp; +} + +void +threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg) +{ + Ttask *t; + + t = malloc(sizeof *t); + t->fn = fn; + t->arg = arg; + + sendp(tp->subq, t); + incref(&tp->issued); +} + +typedef struct Targs Targs; +struct Targs +{ + Memimage *i; + int y; +}; +void +fillpix(void *arg) +{ + Targs *imgop; + Point p; + ulong *fb, pix; + double α; + + imgop = arg; + + for(p = Pt(0, imgop->y); p.x < Dx(imgop->i->r); p.x++){ + fb = (ulong*)byteaddr(imgop->i, p); + α = atan2(p.y, p.x); + pix = α*25523UL*25523UL/* + truerand()*/; + *fb = pix|0xFF<<24; + } +} + +void +usage(void) +{ + fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0); + exits(nil); +} + +void +threadmain(int argc, char *argv[]) +{ + static int W = 1000, H = 1000; + Tpool *pool; + Targs *t; + Memimage *img; + int i; + int threaded; + int nprocs; + + threaded = 0; + nprocs = 8; + ARGBEGIN{ + case 't': threaded++; break; + case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break; + default: usage(); + }ARGEND; + if(argc != 0) + usage(); + + if(memimageinit() != 0) + sysfatal("memimageinit: %r"); + + img = allocmemimage(Rect(0,0,W,H), XRGB32); + t = malloc(H*sizeof(*t)); + if(threaded){ + pool = mkthreadpool(nprocs); + + for(i = 0; i < H; i++){ + t[i] = (Targs){img, i}; + threadpoolexec(pool, fillpix, &t[i]); + } + + while(pool->issued.ref != pool->complete.ref) + recvp(pool->done); + + writememimage(1, img); + + threadexitsall(nil); + } + + for(i = 0; i < H; i++){ + t[i] = (Targs){img, i}; + fillpix(&t[i]); + } + writememimage(1, img); + exits(nil); +} diff --git a/mkfile b/mkfile new file mode 100644 index 0000000..2d6eaac --- /dev/null +++ b/mkfile @@ -0,0 +1,9 @@ +