Source code

Revision control

Copy as Markdown

Other Tools

/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "nspr.h"
/*
* Thread pools
* Thread pools create and manage threads to provide support for
* scheduling jobs onto one or more threads.
*
*/
#ifdef OPT_WINNT
# include <windows.h>
#endif
/*
* worker thread
*/
typedef struct wthread {
PRCList links;
PRThread* thread;
} wthread;
/*
* queue of timer jobs
*/
typedef struct timer_jobq {
PRCList list;
PRLock* lock;
PRCondVar* cv;
PRInt32 cnt;
PRCList wthreads;
} timer_jobq;
/*
* queue of jobs
*/
typedef struct tp_jobq {
PRCList list;
PRInt32 cnt;
PRLock* lock;
PRCondVar* cv;
PRCList wthreads;
#ifdef OPT_WINNT
HANDLE nt_completion_port;
#endif
} tp_jobq;
/*
* queue of IO jobs
*/
typedef struct io_jobq {
PRCList list;
PRPollDesc* pollfds;
PRInt32 npollfds;
PRJob** polljobs;
PRLock* lock;
PRInt32 cnt;
PRFileDesc* notify_fd;
PRCList wthreads;
} io_jobq;
/*
* Threadpool
*/
struct PRThreadPool {
PRInt32 init_threads;
PRInt32 max_threads;
PRInt32 current_threads;
PRInt32 idle_threads;
PRUint32 stacksize;
tp_jobq jobq;
io_jobq ioq;
timer_jobq timerq;
PRLock* join_lock; /* used with jobp->join_cv */
PRCondVar* shutdown_cv;
PRBool shutdown;
};
typedef enum io_op_type {
JOB_IO_READ,
JOB_IO_WRITE,
JOB_IO_CONNECT,
JOB_IO_ACCEPT
} io_op_type;
#ifdef OPT_WINNT
typedef struct NT_notifier {
OVERLAPPED overlapped; /* must be first */
PRJob* jobp;
} NT_notifier;
#endif
struct PRJob {
PRCList links; /* for linking jobs */
PRBool on_ioq; /* job on ioq */
PRBool on_timerq; /* job on timerq */
PRJobFn job_func;
void* job_arg;
PRCondVar* join_cv;
PRBool join_wait; /* == PR_TRUE, when waiting to join */
PRCondVar* cancel_cv; /* for cancelling IO jobs */
PRBool cancel_io; /* for cancelling IO jobs */
PRThreadPool* tpool; /* back pointer to thread pool */
PRJobIoDesc* iod;
io_op_type io_op;
PRInt16 io_poll_flags;
PRNetAddr* netaddr;
PRIntervalTime timeout; /* relative value */
PRIntervalTime absolute;
#ifdef OPT_WINNT
NT_notifier nt_notifier;
#endif
};
#define JOB_LINKS_PTR(_qp) ((PRJob*)((char*)(_qp) - offsetof(PRJob, links)))
#define WTHREAD_LINKS_PTR(_qp) \
((wthread*)((char*)(_qp) - offsetof(wthread, links)))
#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
#define JOIN_NOTIFY(_jobp) \
PR_BEGIN_MACRO \
PR_Lock(_jobp->tpool->join_lock); \
_jobp->join_wait = PR_FALSE; \
PR_NotifyCondVar(_jobp->join_cv); \
PR_Unlock(_jobp->tpool->join_lock); \
PR_END_MACRO
#define CANCEL_IO_JOB(jobp) \
PR_BEGIN_MACRO \
jobp->cancel_io = PR_FALSE; \
jobp->on_ioq = PR_FALSE; \
PR_REMOVE_AND_INIT_LINK(&jobp->links); \
tp->ioq.cnt--; \
PR_NotifyCondVar(jobp->cancel_cv); \
PR_END_MACRO
static void delete_job(PRJob* jobp);
static PRThreadPool* alloc_threadpool(void);
static PRJob* alloc_job(PRBool joinable, PRThreadPool* tp);
static void notify_ioq(PRThreadPool* tp);
static void notify_timerq(PRThreadPool* tp);
/*
* locks are acquired in the following order
*
* tp->ioq.lock,tp->timerq.lock
* |
* V
* tp->jobq->lock
*/
/*
* worker thread function
*/
static void wstart(void* arg) {
PRThreadPool* tp = (PRThreadPool*)arg;
PRCList* head;
/*
* execute jobs until shutdown
*/
while (!tp->shutdown) {
PRJob* jobp;
#ifdef OPT_WINNT
BOOL rv;
DWORD unused, shutdown;
LPOVERLAPPED olp;
PR_Lock(tp->jobq.lock);
tp->idle_threads++;
PR_Unlock(tp->jobq.lock);
rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, &unused,
&shutdown, &olp, INFINITE);
PR_ASSERT(rv);
if (shutdown) {
break;
}
jobp = ((NT_notifier*)olp)->jobp;
PR_Lock(tp->jobq.lock);
tp->idle_threads--;
tp->jobq.cnt--;
PR_Unlock(tp->jobq.lock);
#else
PR_Lock(tp->jobq.lock);
while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
tp->idle_threads++;
PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
tp->idle_threads--;
}
if (tp->shutdown) {
PR_Unlock(tp->jobq.lock);
break;
}
head = PR_LIST_HEAD(&tp->jobq.list);
/*
* remove job from queue
*/
PR_REMOVE_AND_INIT_LINK(head);
tp->jobq.cnt--;
jobp = JOB_LINKS_PTR(head);
PR_Unlock(tp->jobq.lock);
#endif
jobp->job_func(jobp->job_arg);
if (!JOINABLE_JOB(jobp)) {
delete_job(jobp);
} else {
JOIN_NOTIFY(jobp);
}
}
PR_Lock(tp->jobq.lock);
tp->current_threads--;
PR_Unlock(tp->jobq.lock);
}
/*
* add a job to the work queue
*/
static void add_to_jobq(PRThreadPool* tp, PRJob* jobp) {
/*
* add to jobq
*/
#ifdef OPT_WINNT
PR_Lock(tp->jobq.lock);
tp->jobq.cnt++;
PR_Unlock(tp->jobq.lock);
/*
* notify worker thread(s)
*/
PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, FALSE,
&jobp->nt_notifier.overlapped);
#else
PR_Lock(tp->jobq.lock);
PR_APPEND_LINK(&jobp->links, &tp->jobq.list);
tp->jobq.cnt++;
if ((tp->idle_threads < tp->jobq.cnt) &&
(tp->current_threads < tp->max_threads)) {
wthread* wthrp;
/*
* increment thread count and unlock the jobq lock
*/
tp->current_threads++;
PR_Unlock(tp->jobq.lock);
/* create new worker thread */
wthrp = PR_NEWZAP(wthread);
if (wthrp) {
wthrp->thread =
PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL,
PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, tp->stacksize);
if (NULL == wthrp->thread) {
PR_DELETE(wthrp); /* this sets wthrp to NULL */
}
}
PR_Lock(tp->jobq.lock);
if (NULL == wthrp) {
tp->current_threads--;
} else {
PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
}
}
/*
* wakeup a worker thread
*/
PR_NotifyCondVar(tp->jobq.cv);
PR_Unlock(tp->jobq.lock);
#endif
}
/*
* io worker thread function
*/
static void io_wstart(void* arg) {
PRThreadPool* tp = (PRThreadPool*)arg;
int pollfd_cnt, pollfds_used;
int rv;
PRCList *qp, *nextqp;
PRPollDesc* pollfds = NULL;
PRJob** polljobs = NULL;
int poll_timeout;
PRIntervalTime now;
/*
* scan io_jobq
* construct poll list
* call PR_Poll
* for all fds, for which poll returns true, move the job to
* jobq and wakeup worker thread.
*/
while (!tp->shutdown) {
PRJob* jobp;
pollfd_cnt = tp->ioq.cnt + 10;
if (pollfd_cnt > tp->ioq.npollfds) {
/*
* re-allocate pollfd array if the current one is not large
* enough
*/
if (NULL != tp->ioq.pollfds) {
PR_Free(tp->ioq.pollfds);
}
tp->ioq.pollfds = (PRPollDesc*)PR_Malloc(
pollfd_cnt * (sizeof(PRPollDesc) + sizeof(PRJob*)));
PR_ASSERT(NULL != tp->ioq.pollfds);
/*
* array of pollfds
*/
pollfds = tp->ioq.pollfds;
tp->ioq.polljobs = (PRJob**)(&tp->ioq.pollfds[pollfd_cnt]);
/*
* parallel array of jobs
*/
polljobs = tp->ioq.polljobs;
tp->ioq.npollfds = pollfd_cnt;
}
pollfds_used = 0;
/*
* add the notify fd; used for unblocking io thread(s)
*/
pollfds[pollfds_used].fd = tp->ioq.notify_fd;
pollfds[pollfds_used].in_flags = PR_POLL_READ;
pollfds[pollfds_used].out_flags = 0;
polljobs[pollfds_used] = NULL;
pollfds_used++;
/*
* fill in the pollfd array
*/
PR_Lock(tp->ioq.lock);
for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
nextqp = qp->next;
jobp = JOB_LINKS_PTR(qp);
if (jobp->cancel_io) {
CANCEL_IO_JOB(jobp);
continue;
}
if (pollfds_used == (pollfd_cnt)) {
break;
}
pollfds[pollfds_used].fd = jobp->iod->socket;
pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
pollfds[pollfds_used].out_flags = 0;
polljobs[pollfds_used] = jobp;
pollfds_used++;
}
if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
qp = tp->ioq.list.next;
jobp = JOB_LINKS_PTR(qp);
if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
poll_timeout = PR_INTERVAL_NO_TIMEOUT;
} else if (PR_INTERVAL_NO_WAIT == jobp->timeout) {
poll_timeout = PR_INTERVAL_NO_WAIT;
} else {
poll_timeout = jobp->absolute - PR_IntervalNow();
if (poll_timeout <= 0) { /* already timed out */
poll_timeout = PR_INTERVAL_NO_WAIT;
}
}
} else {
poll_timeout = PR_INTERVAL_NO_TIMEOUT;
}
PR_Unlock(tp->ioq.lock);
/*
* XXXX
* should retry if more jobs have been added to the queue?
*
*/
PR_ASSERT(pollfds_used <= pollfd_cnt);
rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
if (tp->shutdown) {
break;
}
if (rv > 0) {
/*
* at least one io event is set
*/
PRStatus rval_status;
PRInt32 index;
PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
/*
* reset the pollable event, if notified
*/
if (pollfds[0].out_flags & PR_POLL_READ) {
rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
PR_ASSERT(PR_SUCCESS == rval_status);
}
for (index = 1; index < (pollfds_used); index++) {
PRInt16 events = pollfds[index].in_flags;
PRInt16 revents = pollfds[index].out_flags;
jobp = polljobs[index];
if ((revents & PR_POLL_NVAL) || /* busted in all cases */
(revents & PR_POLL_ERR) ||
((events & PR_POLL_WRITE) &&
(revents & PR_POLL_HUP))) { /* write op & hup */
PR_Lock(tp->ioq.lock);
if (jobp->cancel_io) {
CANCEL_IO_JOB(jobp);
PR_Unlock(tp->ioq.lock);
continue;
}
PR_REMOVE_AND_INIT_LINK(&jobp->links);
tp->ioq.cnt--;
jobp->on_ioq = PR_FALSE;
PR_Unlock(tp->ioq.lock);
/* set error */
if (PR_POLL_NVAL & revents) {
jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
} else if (PR_POLL_HUP & revents) {
jobp->iod->error = PR_CONNECT_RESET_ERROR;
} else {
jobp->iod->error = PR_IO_ERROR;
}
/*
* add to jobq
*/
add_to_jobq(tp, jobp);
} else if (revents) {
/*
* add to jobq
*/
PR_Lock(tp->ioq.lock);
if (jobp->cancel_io) {
CANCEL_IO_JOB(jobp);
PR_Unlock(tp->ioq.lock);
continue;
}
PR_REMOVE_AND_INIT_LINK(&jobp->links);
tp->ioq.cnt--;
jobp->on_ioq = PR_FALSE;
PR_Unlock(tp->ioq.lock);
if (jobp->io_op == JOB_IO_CONNECT) {
if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) {
jobp->iod->error = 0;
} else {
jobp->iod->error = PR_GetError();
}
} else {
jobp->iod->error = 0;
}
add_to_jobq(tp, jobp);
}
}
}
/*
* timeout processing
*/
now = PR_IntervalNow();
PR_Lock(tp->ioq.lock);
for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
nextqp = qp->next;
jobp = JOB_LINKS_PTR(qp);
if (jobp->cancel_io) {
CANCEL_IO_JOB(jobp);
continue;
}
if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) {
break;
}
if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
((PRInt32)(jobp->absolute - now) > 0)) {
break;
}
PR_REMOVE_AND_INIT_LINK(&jobp->links);
tp->ioq.cnt--;
jobp->on_ioq = PR_FALSE;
jobp->iod->error = PR_IO_TIMEOUT_ERROR;
add_to_jobq(tp, jobp);
}
PR_Unlock(tp->ioq.lock);
}
}
/*
* timer worker thread function
*/
static void timer_wstart(void* arg) {
PRThreadPool* tp = (PRThreadPool*)arg;
PRCList* qp;
PRIntervalTime timeout;
PRIntervalTime now;
/*
* call PR_WaitCondVar with minimum value of all timeouts
*/
while (!tp->shutdown) {
PRJob* jobp;
PR_Lock(tp->timerq.lock);
if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
timeout = PR_INTERVAL_NO_TIMEOUT;
} else {
PRCList* qp;
qp = tp->timerq.list.next;
jobp = JOB_LINKS_PTR(qp);
timeout = jobp->absolute - PR_IntervalNow();
if (timeout <= 0) {
timeout = PR_INTERVAL_NO_WAIT; /* already timed out */
}
}
if (PR_INTERVAL_NO_WAIT != timeout) {
PR_WaitCondVar(tp->timerq.cv, timeout);
}
if (tp->shutdown) {
PR_Unlock(tp->timerq.lock);
break;
}
/*
* move expired-timer jobs to jobq
*/
now = PR_IntervalNow();
while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
qp = tp->timerq.list.next;
jobp = JOB_LINKS_PTR(qp);
if ((PRInt32)(jobp->absolute - now) > 0) {
break;
}
/*
* job timed out
*/
PR_REMOVE_AND_INIT_LINK(&jobp->links);
tp->timerq.cnt--;
jobp->on_timerq = PR_FALSE;
add_to_jobq(tp, jobp);
}
PR_Unlock(tp->timerq.lock);
}
}
static void delete_threadpool(PRThreadPool* tp) {
if (NULL != tp) {
if (NULL != tp->shutdown_cv) {
PR_DestroyCondVar(tp->shutdown_cv);
}
if (NULL != tp->jobq.cv) {
PR_DestroyCondVar(tp->jobq.cv);
}
if (NULL != tp->jobq.lock) {
PR_DestroyLock(tp->jobq.lock);
}
if (NULL != tp->join_lock) {
PR_DestroyLock(tp->join_lock);
}
#ifdef OPT_WINNT
if (NULL != tp->jobq.nt_completion_port) {
CloseHandle(tp->jobq.nt_completion_port);
}
#endif
/* Timer queue */
if (NULL != tp->timerq.cv) {
PR_DestroyCondVar(tp->timerq.cv);
}
if (NULL != tp->timerq.lock) {
PR_DestroyLock(tp->timerq.lock);
}
if (NULL != tp->ioq.lock) {
PR_DestroyLock(tp->ioq.lock);
}
if (NULL != tp->ioq.pollfds) {
PR_Free(tp->ioq.pollfds);
}
if (NULL != tp->ioq.notify_fd) {
PR_DestroyPollableEvent(tp->ioq.notify_fd);
}
PR_Free(tp);
}
return;
}
static PRThreadPool* alloc_threadpool(void) {
PRThreadPool* tp;
tp = (PRThreadPool*)PR_CALLOC(sizeof(*tp));
if (NULL == tp) {
goto failed;
}
tp->jobq.lock = PR_NewLock();
if (NULL == tp->jobq.lock) {
goto failed;
}
tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
if (NULL == tp->jobq.cv) {
goto failed;
}
tp->join_lock = PR_NewLock();
if (NULL == tp->join_lock) {
goto failed;
}
#ifdef OPT_WINNT
tp->jobq.nt_completion_port =
CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (NULL == tp->jobq.nt_completion_port) {
goto failed;
}
#endif
tp->ioq.lock = PR_NewLock();
if (NULL == tp->ioq.lock) {
goto failed;
}
/* Timer queue */
tp->timerq.lock = PR_NewLock();
if (NULL == tp->timerq.lock) {
goto failed;
}
tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
if (NULL == tp->timerq.cv) {
goto failed;
}
tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
if (NULL == tp->shutdown_cv) {
goto failed;
}
tp->ioq.notify_fd = PR_NewPollableEvent();
if (NULL == tp->ioq.notify_fd) {
goto failed;
}
return tp;
failed:
delete_threadpool(tp);
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
return NULL;
}
/* Create thread pool */
PR_IMPLEMENT(PRThreadPool*)
PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
PRUint32 stacksize) {
PRThreadPool* tp;
PRThread* thr;
int i;
wthread* wthrp;
tp = alloc_threadpool();
if (NULL == tp) {
return NULL;
}
tp->init_threads = initial_threads;
tp->max_threads = max_threads;
tp->stacksize = stacksize;
PR_INIT_CLIST(&tp->jobq.list);
PR_INIT_CLIST(&tp->ioq.list);
PR_INIT_CLIST(&tp->timerq.list);
PR_INIT_CLIST(&tp->jobq.wthreads);
PR_INIT_CLIST(&tp->ioq.wthreads);
PR_INIT_CLIST(&tp->timerq.wthreads);
tp->shutdown = PR_FALSE;
PR_Lock(tp->jobq.lock);
for (i = 0; i < initial_threads; ++i) {
thr = PR_CreateThread(PR_USER_THREAD, wstart, tp, PR_PRIORITY_NORMAL,
PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize);
PR_ASSERT(thr);
wthrp = PR_NEWZAP(wthread);
PR_ASSERT(wthrp);
wthrp->thread = thr;
PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
}
tp->current_threads = initial_threads;
thr = PR_CreateThread(PR_USER_THREAD, io_wstart, tp, PR_PRIORITY_NORMAL,
PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize);
PR_ASSERT(thr);
wthrp = PR_NEWZAP(wthread);
PR_ASSERT(wthrp);
wthrp->thread = thr;
PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, tp, PR_PRIORITY_NORMAL,
PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, stacksize);
PR_ASSERT(thr);
wthrp = PR_NEWZAP(wthread);
PR_ASSERT(wthrp);
wthrp->thread = thr;
PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
PR_Unlock(tp->jobq.lock);
return tp;
}
static void delete_job(PRJob* jobp) {
if (NULL != jobp) {
if (NULL != jobp->join_cv) {
PR_DestroyCondVar(jobp->join_cv);
jobp->join_cv = NULL;
}
if (NULL != jobp->cancel_cv) {
PR_DestroyCondVar(jobp->cancel_cv);
jobp->cancel_cv = NULL;
}
PR_DELETE(jobp);
}
}
static PRJob* alloc_job(PRBool joinable, PRThreadPool* tp) {
PRJob* jobp;
jobp = PR_NEWZAP(PRJob);
if (NULL == jobp) {
goto failed;
}
if (joinable) {
jobp->join_cv = PR_NewCondVar(tp->join_lock);
jobp->join_wait = PR_TRUE;
if (NULL == jobp->join_cv) {
goto failed;
}
} else {
jobp->join_cv = NULL;
}
#ifdef OPT_WINNT
jobp->nt_notifier.jobp = jobp;
#endif
return jobp;
failed:
delete_job(jobp);
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
return NULL;
}
/* queue a job */
PR_IMPLEMENT(PRJob*)
PR_QueueJob(PRThreadPool* tpool, PRJobFn fn, void* arg, PRBool joinable) {
PRJob* jobp;
jobp = alloc_job(joinable, tpool);
if (NULL == jobp) {
return NULL;
}
jobp->job_func = fn;
jobp->job_arg = arg;
jobp->tpool = tpool;
add_to_jobq(tpool, jobp);
return jobp;
}
/* queue a job, when a socket is readable or writeable */
static PRJob* queue_io_job(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn,
void* arg, PRBool joinable, io_op_type op) {
PRJob* jobp;
PRIntervalTime now;
jobp = alloc_job(joinable, tpool);
if (NULL == jobp) {
return NULL;
}
/*
* Add a new job to io_jobq
* wakeup io worker thread
*/
jobp->job_func = fn;
jobp->job_arg = arg;
jobp->tpool = tpool;
jobp->iod = iod;
if (JOB_IO_READ == op) {
jobp->io_op = JOB_IO_READ;
jobp->io_poll_flags = PR_POLL_READ;
} else if (JOB_IO_WRITE == op) {
jobp->io_op = JOB_IO_WRITE;
jobp->io_poll_flags = PR_POLL_WRITE;
} else if (JOB_IO_ACCEPT == op) {
jobp->io_op = JOB_IO_ACCEPT;
jobp->io_poll_flags = PR_POLL_READ;
} else if (JOB_IO_CONNECT == op) {
jobp->io_op = JOB_IO_CONNECT;
jobp->io_poll_flags = PR_POLL_WRITE | PR_POLL_EXCEPT;
} else {
delete_job(jobp);
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
return NULL;
}
jobp->timeout = iod->timeout;
if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
(PR_INTERVAL_NO_WAIT == iod->timeout)) {
jobp->absolute = iod->timeout;
} else {
now = PR_IntervalNow();
jobp->absolute = now + iod->timeout;
}
PR_Lock(tpool->ioq.lock);
if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
(PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
PR_APPEND_LINK(&jobp->links, &tpool->ioq.list);
} else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
PR_INSERT_LINK(&jobp->links, &tpool->ioq.list);
} else {
PRCList* qp;
PRJob* tmp_jobp;
/*
* insert into the timeout-sorted ioq
*/
for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; qp = qp->prev) {
tmp_jobp = JOB_LINKS_PTR(qp);
if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
break;
}
}
PR_INSERT_AFTER(&jobp->links, qp);
}
jobp->on_ioq = PR_TRUE;
tpool->ioq.cnt++;
/*
* notify io worker thread(s)
*/
PR_Unlock(tpool->ioq.lock);
notify_ioq(tpool);
return jobp;
}
/* queue a job, when a socket is readable */
PR_IMPLEMENT(PRJob*)
PR_QueueJob_Read(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg,
PRBool joinable) {
return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
}
/* queue a job, when a socket is writeable */
PR_IMPLEMENT(PRJob*)
PR_QueueJob_Write(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg,
PRBool joinable) {
return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
}
/* queue a job, when a socket has a pending connection */
PR_IMPLEMENT(PRJob*)
PR_QueueJob_Accept(PRThreadPool* tpool, PRJobIoDesc* iod, PRJobFn fn, void* arg,
PRBool joinable) {
return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
}
/* queue a job, when a socket can be connected */
PR_IMPLEMENT(PRJob*)
PR_QueueJob_Connect(PRThreadPool* tpool, PRJobIoDesc* iod,
const PRNetAddr* addr, PRJobFn fn, void* arg,
PRBool joinable) {
PRStatus rv;
PRErrorCode err;
rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)) {
/* connection pending */
return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
}
/*
* connection succeeded or failed; add to jobq right away
*/
if (rv == PR_FAILURE) {
iod->error = err;
} else {
iod->error = 0;
}
return (PR_QueueJob(tpool, fn, arg, joinable));
}
/* queue a job, when a timer expires */
PR_IMPLEMENT(PRJob*)
PR_QueueJob_Timer(PRThreadPool* tpool, PRIntervalTime timeout, PRJobFn fn,
void* arg, PRBool joinable) {
PRIntervalTime now;
PRJob* jobp;
if (PR_INTERVAL_NO_TIMEOUT == timeout) {
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
return NULL;
}
if (PR_INTERVAL_NO_WAIT == timeout) {
/*
* no waiting; add to jobq right away
*/
return (PR_QueueJob(tpool, fn, arg, joinable));
}
jobp = alloc_job(joinable, tpool);
if (NULL == jobp) {
return NULL;
}
/*
* Add a new job to timer_jobq
* wakeup timer worker thread
*/
jobp->job_func = fn;
jobp->job_arg = arg;
jobp->tpool = tpool;
jobp->timeout = timeout;
now = PR_IntervalNow();
jobp->absolute = now + timeout;
PR_Lock(tpool->timerq.lock);
jobp->on_timerq = PR_TRUE;
if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
PR_APPEND_LINK(&jobp->links, &tpool->timerq.list);
} else {
PRCList* qp;
PRJob* tmp_jobp;
/*
* insert into the sorted timer jobq
*/
for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
qp = qp->prev) {
tmp_jobp = JOB_LINKS_PTR(qp);
if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
break;
}
}
PR_INSERT_AFTER(&jobp->links, qp);
}
tpool->timerq.cnt++;
/*
* notify timer worker thread(s)
*/
notify_timerq(tpool);
PR_Unlock(tpool->timerq.lock);
return jobp;
}
static void notify_timerq(PRThreadPool* tp) {
/*
* wakeup the timer thread(s)
*/
PR_NotifyCondVar(tp->timerq.cv);
}
static void notify_ioq(PRThreadPool* tp) {
PRStatus rval_status;
/*
* wakeup the io thread(s)
*/
rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
PR_ASSERT(PR_SUCCESS == rval_status);
}
/*
* cancel a job
*
* XXXX: is this needed? likely to be removed
*/
PR_IMPLEMENT(PRStatus)
PR_CancelJob(PRJob* jobp) {
PRStatus rval = PR_FAILURE;
PRThreadPool* tp;
if (jobp->on_timerq) {
/*
* now, check again while holding the timerq lock
*/
tp = jobp->tpool;
PR_Lock(tp->timerq.lock);
if (jobp->on_timerq) {
jobp->on_timerq = PR_FALSE;
PR_REMOVE_AND_INIT_LINK(&jobp->links);
tp->timerq.cnt--;
PR_Unlock(tp->timerq.lock);
if (!JOINABLE_JOB(jobp)) {
delete_job(jobp);
} else {
JOIN_NOTIFY(jobp);
}
rval = PR_SUCCESS;
} else {
PR_Unlock(tp->timerq.lock);
}
} else if (jobp->on_ioq) {
/*
* now, check again while holding the ioq lock
*/
tp = jobp->tpool;
PR_Lock(tp->ioq.lock);
if (jobp->on_ioq) {
jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
if (NULL == jobp->cancel_cv) {
PR_Unlock(tp->ioq.lock);
PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
return PR_FAILURE;
}
/*
* mark job 'cancelled' and notify io thread(s)
* XXXX:
* this assumes there is only one io thread; when there
* are multiple threads, the io thread processing this job
* must be notified.
*/
jobp->cancel_io = PR_TRUE;
PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
notify_ioq(tp);
PR_Lock(tp->ioq.lock);
while (jobp->cancel_io) {
PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
}
PR_Unlock(tp->ioq.lock);
PR_ASSERT(!jobp->on_ioq);
if (!JOINABLE_JOB(jobp)) {
delete_job(jobp);
} else {
JOIN_NOTIFY(jobp);
}
rval = PR_SUCCESS;
} else {
PR_Unlock(tp->ioq.lock);
}
}
if (PR_FAILURE == rval) {
PR_SetError(PR_INVALID_STATE_ERROR, 0);
}
return rval;
}
/* join a job, wait until completion */
PR_IMPLEMENT(PRStatus)
PR_JoinJob(PRJob* jobp) {
if (!JOINABLE_JOB(jobp)) {
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
return PR_FAILURE;
}
PR_Lock(jobp->tpool->join_lock);
while (jobp->join_wait) {
PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
}
PR_Unlock(jobp->tpool->join_lock);
delete_job(jobp);
return PR_SUCCESS;
}
/* shutdown threadpool */
PR_IMPLEMENT(PRStatus)
PR_ShutdownThreadPool(PRThreadPool* tpool) {
PRStatus rval = PR_SUCCESS;
PR_Lock(tpool->jobq.lock);
tpool->shutdown = PR_TRUE;
PR_NotifyAllCondVar(tpool->shutdown_cv);
PR_Unlock(tpool->jobq.lock);
return rval;
}
/*
* join thread pool
* wait for termination of worker threads
* reclaim threadpool resources
*/
PR_IMPLEMENT(PRStatus)
PR_JoinThreadPool(PRThreadPool* tpool) {
PRStatus rval = PR_SUCCESS;
PRCList* head;
PRStatus rval_status;
PR_Lock(tpool->jobq.lock);
while (!tpool->shutdown) {
PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
}
/*
* wakeup worker threads
*/
#ifdef OPT_WINNT
/*
* post shutdown notification for all threads
*/
{
int i;
for (i = 0; i < tpool->current_threads; i++) {
PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, TRUE, NULL);
}
}
#else
PR_NotifyAllCondVar(tpool->jobq.cv);
#endif
/*
* wakeup io thread(s)
*/
notify_ioq(tpool);
/*
* wakeup timer thread(s)
*/
PR_Lock(tpool->timerq.lock);
notify_timerq(tpool);
PR_Unlock(tpool->timerq.lock);
while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
wthread* wthrp;
head = PR_LIST_HEAD(&tpool->jobq.wthreads);
PR_REMOVE_AND_INIT_LINK(head);
PR_Unlock(tpool->jobq.lock);
wthrp = WTHREAD_LINKS_PTR(head);
rval_status = PR_JoinThread(wthrp->thread);
PR_ASSERT(PR_SUCCESS == rval_status);
PR_DELETE(wthrp);
PR_Lock(tpool->jobq.lock);
}
PR_Unlock(tpool->jobq.lock);
while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
wthread* wthrp;
head = PR_LIST_HEAD(&tpool->ioq.wthreads);
PR_REMOVE_AND_INIT_LINK(head);
wthrp = WTHREAD_LINKS_PTR(head);
rval_status = PR_JoinThread(wthrp->thread);
PR_ASSERT(PR_SUCCESS == rval_status);
PR_DELETE(wthrp);
}
while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
wthread* wthrp;
head = PR_LIST_HEAD(&tpool->timerq.wthreads);
PR_REMOVE_AND_INIT_LINK(head);
wthrp = WTHREAD_LINKS_PTR(head);
rval_status = PR_JoinThread(wthrp->thread);
PR_ASSERT(PR_SUCCESS == rval_status);
PR_DELETE(wthrp);
}
/*
* Delete queued jobs
*/
while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
PRJob* jobp;
head = PR_LIST_HEAD(&tpool->jobq.list);
PR_REMOVE_AND_INIT_LINK(head);
jobp = JOB_LINKS_PTR(head);
tpool->jobq.cnt--;
delete_job(jobp);
}
/* delete io jobs */
while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
PRJob* jobp;
head = PR_LIST_HEAD(&tpool->ioq.list);
PR_REMOVE_AND_INIT_LINK(head);
tpool->ioq.cnt--;
jobp = JOB_LINKS_PTR(head);
delete_job(jobp);
}
/* delete timer jobs */
while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
PRJob* jobp;
head = PR_LIST_HEAD(&tpool->timerq.list);
PR_REMOVE_AND_INIT_LINK(head);
tpool->timerq.cnt--;
jobp = JOB_LINKS_PTR(head);
delete_job(jobp);
}
PR_ASSERT(0 == tpool->jobq.cnt);
PR_ASSERT(0 == tpool->ioq.cnt);
PR_ASSERT(0 == tpool->timerq.cnt);
delete_threadpool(tpool);
return rval;
}