summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorrodri <rgl@antares-labs.eu>2024-09-02 13:13:03 +0000
committerrodri <rgl@antares-labs.eu>2024-09-02 13:13:03 +0000
commit877f2c0b88321839b939d1031ad7e5a386d89dc0 (patch)
tree063c456969092f423d1ad4289772a80233718840
downloadthreadpool-877f2c0b88321839b939d1031ad7e5a386d89dc0.tar.gz
threadpool-877f2c0b88321839b939d1031ad7e5a386d89dc0.tar.bz2
threadpool-877f2c0b88321839b939d1031ad7e5a386d89dc0.zip
parallel experiments lain.
-rw-r--r--main1.c132
-rw-r--r--main2.c180
-rw-r--r--main3.c178
-rw-r--r--main4.c148
-rw-r--r--mkfile9
-rw-r--r--readme8
6 files changed, 655 insertions, 0 deletions
diff --git a/main1.c b/main1.c
new file mode 100644
index 0000000..222d0be
--- /dev/null
+++ b/main1.c
@@ -0,0 +1,132 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+
+typedef struct Ttask Ttask;
+typedef struct Tpool Tpool;
+
+struct Ttask
+{
+ void (*fn)(void*);
+ void *arg;
+};
+
+struct Tpool
+{
+ ulong nprocs;
+ Ref nworking;
+
+ Channel *subq; /* task submission queue */
+ Channel *done; /* task completion signal */
+};
+
+void
+threadloop(void *arg)
+{
+ Tpool *pool;
+ Ttask *task;
+
+ pool = arg;
+
+ while((task = recvp(pool->subq)) != nil){
+ incref(&pool->nworking);
+ task->fn(task->arg);
+ decref(&pool->nworking);
+ nbsend(pool->done, nil);
+ }
+}
+
+Tpool *
+mkthreadpool(ulong nprocs)
+{
+ Tpool *tp;
+
+ tp = malloc(sizeof *tp);
+ memset(tp, 0, sizeof *tp);
+ tp->nprocs = nprocs;
+ tp->subq = chancreate(sizeof(void*), nprocs);
+ tp->done = chancreate(sizeof(void*), 0);
+ while(nprocs--)
+ proccreate(threadloop, tp, mainstacksize);
+ return tp;
+}
+
+void
+threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg)
+{
+ Ttask *t;
+
+ t = malloc(sizeof *t);
+ t->fn = fn;
+ t->arg = arg;
+
+ sendp(tp->subq, t);
+}
+
+typedef struct Tsum Tsum;
+struct Tsum
+{
+ int a;
+ int b;
+};
+void
+sum(void *arg)
+{
+ Tsum *sum;
+ int cnt;
+
+ sum = arg;
+ cnt = 100;
+ while(cnt--) sum->a = sum->a+sum->b;
+}
+
+void
+usage(void)
+{
+ fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0);
+ exits(nil);
+}
+
+void
+threadmain(int argc, char *argv[])
+{
+ static int W = 1000, H = 1000;
+ Tpool *pool;
+ Tsum *t;
+ int i, j;
+ int threaded;
+ int nprocs;
+
+ threaded = 0;
+ nprocs = 8;
+ ARGBEGIN{
+ case 't': threaded++; break;
+ case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break;
+ default: usage();
+ }ARGEND;
+ if(argc != 0)
+ usage();
+
+ t = malloc(W*H*sizeof(*t));
+ if(threaded){
+ pool = mkthreadpool(nprocs);
+
+ for(i = 0; i < H; i++)
+ for(j = 0; j < W; j++){
+ t[i*W+j] = (Tsum){i, j};
+ threadpoolexec(pool, sum, &t[i*W+j]);
+ }
+
+ while(pool->nworking.ref > 0)
+ recvp(pool->done);
+
+ threadexitsall(nil);
+ }
+
+ for(i = 0; i < H; i++)
+ for(j = 0; j < W; j++){
+ t[i*W+j] = (Tsum){i, j};
+ sum(&t[i*W+j]);
+ }
+ exits(nil);
+}
diff --git a/main2.c b/main2.c
new file mode 100644
index 0000000..67e5b3a
--- /dev/null
+++ b/main2.c
@@ -0,0 +1,180 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+
+typedef struct Ttask Ttask;
+typedef struct Taskq Taskq;
+typedef struct Tpool Tpool;
+
+struct Ttask
+{
+ void (*fn)(void*);
+ void *arg;
+ Ttask *next;
+};
+
+struct Taskq
+{
+ Ttask *hd;
+ Ttask *tl;
+};
+
+struct Tpool
+{
+ QLock;
+ Rendez empty;
+ ulong nprocs;
+ Ref nworking;
+
+ Taskq subq; /* task submission queue */
+ Channel *done; /* task completion signal */
+};
+
+void
+taskqput(Tpool *tp, Ttask *t)
+{
+ qlock(tp);
+ if(tp->subq.tl == nil){
+ tp->subq.hd = tp->subq.tl = t;
+ rwakeup(&tp->empty);
+ qunlock(tp);
+ return;
+ }
+
+ tp->subq.tl->next = t;
+ tp->subq.tl = t;
+ rwakeup(&tp->empty);
+ qunlock(tp);
+}
+
+Ttask *
+taskqget(Tpool *tp)
+{
+ Ttask *t;
+
+ qlock(tp);
+ while(tp->subq.hd == nil)
+ rsleep(&tp->empty);
+
+ t = tp->subq.hd;
+ tp->subq.hd = t->next;
+ t->next = nil;
+ if(tp->subq.hd == nil)
+ tp->subq.tl = nil;
+ qunlock(tp);
+ return t;
+}
+
+void
+threadloop(void *arg)
+{
+ Tpool *pool;
+ Ttask *task;
+
+ pool = arg;
+
+ for(;;){
+ task = taskqget(pool);
+ if(task == nil)
+ continue;
+ incref(&pool->nworking);
+ task->fn(task->arg);
+ decref(&pool->nworking);
+ nbsend(pool->done, nil);
+ }
+}
+
+Tpool *
+mkthreadpool(ulong nprocs)
+{
+ Tpool *tp;
+
+ tp = malloc(sizeof *tp);
+ memset(tp, 0, sizeof *tp);
+ tp->empty.l = &tp->QLock;
+ tp->nprocs = nprocs;
+ tp->done = chancreate(sizeof(void*), 0);
+ while(nprocs--)
+ proccreate(threadloop, tp, mainstacksize);
+ return tp;
+}
+
+void
+threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg)
+{
+ Ttask *t;
+
+ t = malloc(sizeof *t);
+ memset(t, 0, sizeof *t);
+ t->fn = fn;
+ t->arg = arg;
+ taskqput(tp, t);
+}
+
+typedef struct Tsum Tsum;
+struct Tsum
+{
+ int a;
+ int b;
+};
+void
+sum(void *arg)
+{
+ Tsum *sum;
+ int cnt;
+
+ sum = arg;
+ cnt = 100;
+ while(cnt--) sum->a = sum->a+sum->b;
+}
+
+void
+usage(void)
+{
+ fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0);
+ exits(nil);
+}
+
+void
+threadmain(int argc, char *argv[])
+{
+ static int W = 1000, H = 1000;
+ Tpool *pool;
+ Tsum *t;
+ int i, j;
+ int threaded;
+ int nprocs;
+
+ threaded = 0;
+ nprocs = 8;
+ ARGBEGIN{
+ case 't': threaded++; break;
+ case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break;
+ default: usage();
+ }ARGEND;
+ if(argc != 0)
+ usage();
+
+ t = malloc(W*H*sizeof(*t));
+ if(threaded){
+ pool = mkthreadpool(nprocs);
+
+ for(i = 0; i < H; i++)
+ for(j = 0; j < W; j++){
+ t[i*W+j] = (Tsum){i, j};
+ threadpoolexec(pool, sum, &t[i*W+j]);
+ }
+
+ while(pool->nworking.ref > 0)
+ recvp(pool->done);
+
+ threadexitsall(nil);
+ }
+
+ for(i = 0; i < H; i++)
+ for(j = 0; j < W; j++){
+ t[i*W+j] = (Tsum){i, j};
+ sum(&t[i*W+j]);
+ }
+ exits(nil);
+}
diff --git a/main3.c b/main3.c
new file mode 100644
index 0000000..871f586
--- /dev/null
+++ b/main3.c
@@ -0,0 +1,178 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+
+typedef struct Ttask Ttask;
+typedef struct Taskq Taskq;
+typedef struct Tpool Tpool;
+
+struct Ttask
+{
+ void (*fn)(void*);
+ void *arg;
+ Ttask *next;
+};
+
+struct Taskq
+{
+ Ttask *hd;
+ Ttask *tl;
+};
+
+struct Tpool
+{
+ QLock;
+ ulong nprocs;
+ Ref nworking;
+
+ Taskq subq; /* task submission queue */
+ Channel *done; /* task completion signal */
+};
+
+void
+taskqput(Tpool *tp, Ttask *t)
+{
+ qlock(tp);
+ if(tp->subq.tl == nil){
+ tp->subq.hd = tp->subq.tl = t;
+ qunlock(tp);
+ return;
+ }
+
+ tp->subq.tl->next = t;
+ tp->subq.tl = t;
+ qunlock(tp);
+}
+
+Ttask *
+taskqget(Tpool *tp)
+{
+ Ttask *t;
+
+ qlock(tp);
+ if(tp->subq.hd == nil){
+ qunlock(tp);
+ return nil;
+ }
+
+ t = tp->subq.hd;
+ tp->subq.hd = t->next;
+ t->next = nil;
+ if(tp->subq.hd == nil)
+ tp->subq.tl = nil;
+ qunlock(tp);
+ return t;
+}
+
+void
+threadloop(void *arg)
+{
+ Tpool *pool;
+ Ttask *task;
+
+ pool = arg;
+
+ for(;;){
+ task = taskqget(pool);
+ if(task == nil)
+ continue;
+ incref(&pool->nworking);
+ task->fn(task->arg);
+ decref(&pool->nworking);
+ nbsend(pool->done, nil);
+ }
+}
+
+Tpool *
+mkthreadpool(ulong nprocs)
+{
+ Tpool *tp;
+
+ tp = malloc(sizeof *tp);
+ memset(tp, 0, sizeof *tp);
+ tp->nprocs = nprocs;
+ tp->done = chancreate(sizeof(void*), 0);
+ while(nprocs--)
+ proccreate(threadloop, tp, mainstacksize);
+ return tp;
+}
+
+void
+threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg)
+{
+ Ttask *t;
+
+ t = malloc(sizeof *t);
+ memset(t, 0, sizeof *t);
+ t->fn = fn;
+ t->arg = arg;
+ taskqput(tp, t);
+}
+
+typedef struct Tsum Tsum;
+struct Tsum
+{
+ int a;
+ int b;
+};
+void
+sum(void *arg)
+{
+ Tsum *sum;
+ int cnt;
+
+ sum = arg;
+ cnt = 100;
+ while(cnt--) sum->a = sum->a+sum->b;
+}
+
+void
+usage(void)
+{
+ fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0);
+ exits(nil);
+}
+
+void
+threadmain(int argc, char *argv[])
+{
+ static int W = 10, H = 10;
+ Tpool *pool;
+ Tsum *t;
+ int i, j;
+ int threaded;
+ int nprocs;
+
+ threaded = 0;
+ nprocs = 8;
+ ARGBEGIN{
+ case 't': threaded++; break;
+ case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break;
+ default: usage();
+ }ARGEND;
+ if(argc != 0)
+ usage();
+
+ t = malloc(W*H*sizeof(*t));
+ if(threaded){
+ pool = mkthreadpool(nprocs);
+
+ for(i = 0; i < H; i++)
+ for(j = 0; j < W; j++){
+ t[i*W+j] = (Tsum){i, j};
+ threadpoolexec(pool, sum, &t[i*W+j]);
+ }
+
+ while(pool->nworking.ref > 0)
+ recvp(pool->done);
+
+ threadexitsall(nil);
+ }
+
+ for(i = 0; i < H; i++)
+ for(j = 0; j < W; j++){
+ t[i*W+j] = (Tsum){i, j};
+ sum(&t[i*W+j]);
+ }
+ exits(nil);
+}
diff --git a/main4.c b/main4.c
new file mode 100644
index 0000000..cd0e1d8
--- /dev/null
+++ b/main4.c
@@ -0,0 +1,148 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include <draw.h>
+#include <memdraw.h>
+
+typedef struct Ttask Ttask;
+typedef struct Tpool Tpool;
+
+struct Ttask
+{
+ void (*fn)(void*);
+ void *arg;
+};
+
+struct Tpool
+{
+ ulong nprocs;
+ Ref issued;
+ Ref complete;
+
+ Channel *subq; /* task submission queue */
+ Channel *done; /* task completion signal */
+};
+
+void
+threadloop(void *arg)
+{
+ Tpool *pool;
+ Ttask *task;
+
+ pool = arg;
+
+ while((task = recvp(pool->subq)) != nil){
+ task->fn(task->arg);
+ incref(&pool->complete);
+ nbsend(pool->done, nil);
+ }
+}
+
+Tpool *
+mkthreadpool(ulong nprocs)
+{
+ Tpool *tp;
+
+ tp = malloc(sizeof *tp);
+ memset(tp, 0, sizeof *tp);
+ tp->nprocs = nprocs;
+ tp->subq = chancreate(sizeof(void*), nprocs);
+ tp->done = chancreate(sizeof(void*), 0);
+ while(nprocs--)
+ proccreate(threadloop, tp, mainstacksize);
+ return tp;
+}
+
+void
+threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg)
+{
+ Ttask *t;
+
+ t = malloc(sizeof *t);
+ t->fn = fn;
+ t->arg = arg;
+
+ sendp(tp->subq, t);
+ incref(&tp->issued);
+}
+
+typedef struct Targs Targs;
+struct Targs
+{
+ Memimage *i;
+ int y;
+};
+void
+fillpix(void *arg)
+{
+ Targs *imgop;
+ Point p;
+ ulong *fb, pix;
+ double α;
+
+ imgop = arg;
+
+ for(p = Pt(0, imgop->y); p.x < Dx(imgop->i->r); p.x++){
+ fb = (ulong*)byteaddr(imgop->i, p);
+ α = atan2(p.y, p.x);
+ pix = α*25523UL*25523UL/* + truerand()*/;
+ *fb = pix|0xFF<<24;
+ }
+}
+
+void
+usage(void)
+{
+ fprint(2, "usage: %s [-t] [-n nprocs]\n", argv0);
+ exits(nil);
+}
+
+void
+threadmain(int argc, char *argv[])
+{
+ static int W = 1000, H = 1000;
+ Tpool *pool;
+ Targs *t;
+ Memimage *img;
+ int i;
+ int threaded;
+ int nprocs;
+
+ threaded = 0;
+ nprocs = 8;
+ ARGBEGIN{
+ case 't': threaded++; break;
+ case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break;
+ default: usage();
+ }ARGEND;
+ if(argc != 0)
+ usage();
+
+ if(memimageinit() != 0)
+ sysfatal("memimageinit: %r");
+
+ img = allocmemimage(Rect(0,0,W,H), XRGB32);
+ t = malloc(H*sizeof(*t));
+ if(threaded){
+ pool = mkthreadpool(nprocs);
+
+ for(i = 0; i < H; i++){
+ t[i] = (Targs){img, i};
+ threadpoolexec(pool, fillpix, &t[i]);
+ }
+
+ while(pool->issued.ref != pool->complete.ref)
+ recvp(pool->done);
+
+ writememimage(1, img);
+
+ threadexitsall(nil);
+ }
+
+ for(i = 0; i < H; i++){
+ t[i] = (Targs){img, i};
+ fillpix(&t[i]);
+ }
+ writememimage(1, img);
+ exits(nil);
+}
diff --git a/mkfile b/mkfile
new file mode 100644
index 0000000..2d6eaac
--- /dev/null
+++ b/mkfile
@@ -0,0 +1,9 @@
+</$objtype/mkfile
+
+BIN=/$objtype/bin
+TARG=main1\
+ main2\
+ main3\
+ main4\
+
+</sys/src/cmd/mkmany
diff --git a/readme b/readme
new file mode 100644
index 0000000..a119df7
--- /dev/null
+++ b/readme
@@ -0,0 +1,8 @@
+threadpool
+
+Thread pool experiments.
+
+ - main1: channel-based task queue
+ - main2: rendezvous point task queue
+ - main3: qlocked task queue
+ - main4: channel-based memimage line raster task