Source code
Revision control
Copy as Markdown
Other Tools
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
#include "primpl.h"
#include "pprmwait.h"
#define _MW_REHASH_MAX 11
static PRLock* mw_lock = NULL;
static _PRGlobalState* mw_state = NULL;
static PRIntervalTime max_polling_interval;
#ifdef WINNT
typedef struct TimerEvent {
PRIntervalTime absolute;
void (*func)(void*);
void* arg;
LONG ref_count;
PRCList links;
} TimerEvent;
# define TIMER_EVENT_PTR(_qp) \
((TimerEvent*)((char*)(_qp) - offsetof(TimerEvent, links)))
struct {
PRLock* ml;
PRCondVar* new_timer;
PRCondVar* cancel_timer;
PRThread* manager_thread;
PRCList timer_queue;
} tm_vars;
static PRStatus TimerInit(void);
static void TimerManager(void* arg);
static TimerEvent* CreateTimer(PRIntervalTime timeout, void (*func)(void*),
void* arg);
static PRBool CancelTimer(TimerEvent* timer);
static void TimerManager(void* arg) {
PRIntervalTime now;
PRIntervalTime timeout;
PRCList* head;
TimerEvent* timer;
PR_Lock(tm_vars.ml);
while (1) {
if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue)) {
PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
} else {
now = PR_IntervalNow();
head = PR_LIST_HEAD(&tm_vars.timer_queue);
timer = TIMER_EVENT_PTR(head);
if ((PRInt32)(now - timer->absolute) >= 0) {
PR_REMOVE_LINK(head);
/*
* make its prev and next point to itself so that
* it's obvious that it's not on the timer_queue.
*/
PR_INIT_CLIST(head);
PR_ASSERT(2 == timer->ref_count);
PR_Unlock(tm_vars.ml);
timer->func(timer->arg);
PR_Lock(tm_vars.ml);
timer->ref_count -= 1;
if (0 == timer->ref_count) {
PR_NotifyAllCondVar(tm_vars.cancel_timer);
}
} else {
timeout = (PRIntervalTime)(timer->absolute - now);
PR_WaitCondVar(tm_vars.new_timer, timeout);
}
}
}
PR_Unlock(tm_vars.ml);
}
static TimerEvent* CreateTimer(PRIntervalTime timeout, void (*func)(void*),
void* arg) {
TimerEvent* timer;
PRCList *links, *tail;
TimerEvent* elem;
timer = PR_NEW(TimerEvent);
if (NULL == timer) {
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
return timer;
}
timer->absolute = PR_IntervalNow() + timeout;
timer->func = func;
timer->arg = arg;
timer->ref_count = 2;
PR_Lock(tm_vars.ml);
tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
while (links->prev != tail) {
elem = TIMER_EVENT_PTR(links);
if ((PRInt32)(timer->absolute - elem->absolute) >= 0) {
break;
}
links = links->prev;
}
PR_INSERT_AFTER(&timer->links, links);
PR_NotifyCondVar(tm_vars.new_timer);
PR_Unlock(tm_vars.ml);
return timer;
}
static PRBool CancelTimer(TimerEvent* timer) {
PRBool canceled = PR_FALSE;
PR_Lock(tm_vars.ml);
timer->ref_count -= 1;
if (timer->links.prev == &timer->links) {
while (timer->ref_count == 1) {
PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
}
} else {
PR_REMOVE_LINK(&timer->links);
canceled = PR_TRUE;
}
PR_Unlock(tm_vars.ml);
PR_DELETE(timer);
return canceled;
}
static PRStatus TimerInit(void) {
tm_vars.ml = PR_NewLock();
if (NULL == tm_vars.ml) {
goto failed;
}
tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
if (NULL == tm_vars.new_timer) {
goto failed;
}
tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
if (NULL == tm_vars.cancel_timer) {
goto failed;
}
PR_INIT_CLIST(&tm_vars.timer_queue);
tm_vars.manager_thread =
PR_CreateThread(PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
if (NULL == tm_vars.manager_thread) {
goto failed;
}
return PR_SUCCESS;
failed:
if (NULL != tm_vars.cancel_timer) {
PR_DestroyCondVar(tm_vars.cancel_timer);
}
if (NULL != tm_vars.new_timer) {
PR_DestroyCondVar(tm_vars.new_timer);
}
if (NULL != tm_vars.ml) {
PR_DestroyLock(tm_vars.ml);
}
return PR_FAILURE;
}
#endif /* WINNT */
/******************************************************************/
/******************************************************************/
/************************ The private portion *********************/
/******************************************************************/
/******************************************************************/
void _PR_InitMW(void) {
#ifdef WINNT
/*
* We use NT 4's InterlockedCompareExchange() to operate
* on PRMWStatus variables.
*/
PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
TimerInit();
#endif
mw_lock = PR_NewLock();
PR_ASSERT(NULL != mw_lock);
mw_state = PR_NEWZAP(_PRGlobalState);
PR_ASSERT(NULL != mw_state);
PR_INIT_CLIST(&mw_state->group_list);
max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
} /* _PR_InitMW */
void _PR_CleanupMW(void) {
PR_DestroyLock(mw_lock);
mw_lock = NULL;
if (mw_state->group) {
PR_DestroyWaitGroup(mw_state->group);
/* mw_state->group is set to NULL as a side effect. */
}
PR_DELETE(mw_state);
} /* _PR_CleanupMW */
static PRWaitGroup* MW_Init2(void) {
PRWaitGroup* group = mw_state->group; /* it's the null group */
if (NULL == group) /* there is this special case */
{
group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
if (NULL == group) {
goto failed_alloc;
}
PR_Lock(mw_lock);
if (NULL == mw_state->group) {
mw_state->group = group;
group = NULL;
}
PR_Unlock(mw_lock);
if (group != NULL) {
(void)PR_DestroyWaitGroup(group);
}
group = mw_state->group; /* somebody beat us to it */
}
failed_alloc:
return group; /* whatever */
} /* MW_Init2 */
static _PR_HashStory MW_AddHashInternal(PRRecvWait* desc, _PRWaiterHash* hash) {
/*
** The entries are put in the table using the fd (PRFileDesc*) of
** the receive descriptor as the key. This allows us to locate
** the appropriate entry aqain when the poll operation finishes.
**
** The pointer to the file descriptor object is first divided by
** the natural alignment of a pointer in the belief that object
** will have at least that many zeros in the low order bits.
** This may not be a good assuption.
**
** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
** that we declare defeat and force the table to be reconstructed.
** Since some fds might be added more than once, won't that cause
** collisions even in an empty table?
*/
PRIntn rehash = _MW_REHASH_MAX;
PRRecvWait** waiter;
PRUintn hidx = _MW_HASH(desc->fd, hash->length);
PRUintn hoffset = 0;
while (rehash-- > 0) {
waiter = &hash->recv_wait;
if (NULL == waiter[hidx]) {
waiter[hidx] = desc;
hash->count += 1;
#if 0
printf("Adding 0x%x->0x%x ", desc, desc->fd);
printf(
"table[%u:%u:*%u]: 0x%x->0x%x\n",
hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
#endif
return _prmw_success;
}
if (desc == waiter[hidx]) {
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
return _prmw_error;
}
#if 0
printf("Failing 0x%x->0x%x ", desc, desc->fd);
printf(
"table[*%u:%u:%u]: 0x%x->0x%x\n",
hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
#endif
if (0 == hoffset) {
hoffset = _MW_HASH2(desc->fd, hash->length);
PR_ASSERT(0 != hoffset);
}
hidx = (hidx + hoffset) % (hash->length);
}
return _prmw_rehash;
} /* MW_AddHashInternal */
static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup* group) {
PRRecvWait** desc;
PRUint32 pidx, length;
_PRWaiterHash *newHash, *oldHash = group->waiter;
PRBool retry;
_PR_HashStory hrv;
static const PRInt32 prime_number[] = {_PR_DEFAULT_HASH_LENGTH,
179,
521,
907,
1427,
2711,
3917,
5021,
8219,
11549,
18911,
26711,
33749,
44771};
PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
/* look up the next size we'd like to use for the hash table */
for (pidx = 0; pidx < primes; ++pidx) {
if (prime_number[pidx] == oldHash->length) {
break;
}
}
/* table size must be one of the prime numbers */
PR_ASSERT(pidx < primes);
/* if pidx == primes - 1, we can't expand the table any more */
while (pidx < primes - 1) {
/* next size */
++pidx;
length = prime_number[pidx];
/* allocate the new hash table and fill it in with the old */
newHash = (_PRWaiterHash*)PR_CALLOC(sizeof(_PRWaiterHash) +
(length * sizeof(PRRecvWait*)));
if (NULL == newHash) {
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
return _prmw_error;
}
newHash->length = length;
retry = PR_FALSE;
for (desc = &oldHash->recv_wait; newHash->count < oldHash->count; ++desc) {
PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
if (NULL != *desc) {
hrv = MW_AddHashInternal(*desc, newHash);
PR_ASSERT(_prmw_error != hrv);
if (_prmw_success != hrv) {
PR_DELETE(newHash);
retry = PR_TRUE;
break;
}
}
}
if (retry) {
continue;
}
PR_DELETE(group->waiter);
group->waiter = newHash;
group->p_timestamp += 1;
return _prmw_success;
}
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
return _prmw_error; /* we're hosed */
} /* MW_ExpandHashInternal */
#ifndef WINNT
static void _MW_DoneInternal(PRWaitGroup* group, PRRecvWait** waiter,
PRMWStatus outcome) {
/*
** Add this receive wait object to the list of finished I/O
** operations for this particular group. If there are other
** threads waiting on the group, notify one. If not, arrange
** for this thread to return.
*/
# if 0
printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
# endif
(*waiter)->outcome = outcome;
PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
PR_NotifyCondVar(group->io_complete);
PR_ASSERT(0 != group->waiter->count);
group->waiter->count -= 1;
*waiter = NULL;
} /* _MW_DoneInternal */
#endif /* WINNT */
static PRRecvWait** _MW_LookupInternal(PRWaitGroup* group, PRFileDesc* fd) {
/*
** Find the receive wait object corresponding to the file descriptor.
** Only search the wait group specified.
*/
PRRecvWait** desc;
PRIntn rehash = _MW_REHASH_MAX;
_PRWaiterHash* hash = group->waiter;
PRUintn hidx = _MW_HASH(fd, hash->length);
PRUintn hoffset = 0;
while (rehash-- > 0) {
desc = (&hash->recv_wait) + hidx;
if ((*desc != NULL) && ((*desc)->fd == fd)) {
return desc;
}
if (0 == hoffset) {
hoffset = _MW_HASH2(fd, hash->length);
PR_ASSERT(0 != hoffset);
}
hidx = (hidx + hoffset) % (hash->length);
}
return NULL;
} /* _MW_LookupInternal */
#ifndef WINNT
static PRStatus _MW_PollInternal(PRWaitGroup* group) {
PRRecvWait** waiter;
PRStatus rv = PR_FAILURE;
PRInt32 count, count_ready;
PRIntervalTime polling_interval;
group->poller = PR_GetCurrentThread();
while (PR_TRUE) {
PRIntervalTime now, since_last_poll;
PRPollDesc* poll_list;
while (0 == group->waiter->count) {
PRStatus st;
st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
if (_prmw_running != group->state) {
PR_SetError(PR_INVALID_STATE_ERROR, 0);
goto aborted;
}
if (_MW_ABORTED(st)) {
goto aborted;
}
}
/*
** There's something to do. See if our existing polling list
** is large enough for what we have to do?
*/
while (group->polling_count < group->waiter->count) {
PRUint32 old_count = group->waiter->count;
PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
PRSize new_size = sizeof(PRPollDesc) * new_count;
PRPollDesc* old_polling_list = group->polling_list;
PR_Unlock(group->ml);
poll_list = (PRPollDesc*)PR_CALLOC(new_size);
if (NULL == poll_list) {
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
PR_Lock(group->ml);
goto failed_alloc;
}
if (NULL != old_polling_list) {
PR_DELETE(old_polling_list);
}
PR_Lock(group->ml);
if (_prmw_running != group->state) {
PR_DELETE(poll_list);
PR_SetError(PR_INVALID_STATE_ERROR, 0);
goto aborted;
}
group->polling_list = poll_list;
group->polling_count = new_count;
}
now = PR_IntervalNow();
polling_interval = max_polling_interval;
since_last_poll = now - group->last_poll;
waiter = &group->waiter->recv_wait;
poll_list = group->polling_list;
for (count = 0; count < group->waiter->count; ++waiter) {
PR_ASSERT(waiter < &group->waiter->recv_wait + group->waiter->length);
if (NULL != *waiter) /* a live one! */
{
if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) &&
(since_last_poll >= (*waiter)->timeout)) {
_MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
} else {
if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) {
(*waiter)->timeout -= since_last_poll;
if ((*waiter)->timeout < polling_interval) {
polling_interval = (*waiter)->timeout;
}
}
PR_ASSERT(poll_list < group->polling_list + group->polling_count);
poll_list->fd = (*waiter)->fd;
poll_list->in_flags = PR_POLL_READ;
poll_list->out_flags = 0;
# if 0
printf(
"Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
poll_list, count, poll_list->fd, (*waiter)->timeout);
# endif
poll_list += 1;
count += 1;
}
}
}
PR_ASSERT(count == group->waiter->count);
/*
** If there are no more threads waiting for completion,
** we need to return.
*/
if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) &&
(1 == group->waiting_threads)) {
break;
}
if (0 == count) {
continue; /* wait for new business */
}
group->last_poll = now;
PR_Unlock(group->ml);
count_ready = PR_Poll(group->polling_list, count, polling_interval);
PR_Lock(group->ml);
if (_prmw_running != group->state) {
PR_SetError(PR_INVALID_STATE_ERROR, 0);
goto aborted;
}
if (-1 == count_ready) {
goto failed_poll; /* that's a shame */
} else if (0 < count_ready) {
for (poll_list = group->polling_list; count > 0; poll_list++, count--) {
PR_ASSERT(poll_list < group->polling_list + group->polling_count);
if (poll_list->out_flags != 0) {
waiter = _MW_LookupInternal(group, poll_list->fd);
/*
** If 'waiter' is NULL, that means the wait receive
** descriptor has been canceled.
*/
if (NULL != waiter) {
_MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
}
}
}
}
/*
** If there are no more threads waiting for completion,
** we need to return.
** This thread was "borrowed" to do the polling, but it really
** belongs to the client.
*/
if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) &&
(1 == group->waiting_threads)) {
break;
}
}
rv = PR_SUCCESS;
aborted:
failed_poll:
failed_alloc:
group->poller = NULL; /* we were that, not we ain't */
if ((_prmw_running == group->state) && (group->waiting_threads > 1)) {
/* Wake up one thread to become the new poller. */
PR_NotifyCondVar(group->io_complete);
}
return rv; /* we return with the lock held */
} /* _MW_PollInternal */
#endif /* !WINNT */
static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup* group) {
PRMWGroupState rv = group->state;
/*
** Looking at the group's fields is safe because
** once the group's state is no longer running, it
** cannot revert and there is a safe check on entry
** to make sure no more threads are made to wait.
*/
if ((_prmw_stopping == rv) && (0 == group->waiting_threads)) {
rv = group->state = _prmw_stopped;
PR_NotifyCondVar(group->mw_manage);
}
return rv;
} /* MW_TestForShutdownInternal */
#ifndef WINNT
static void _MW_InitialRecv(PRCList* io_ready) {
PRRecvWait* desc = (PRRecvWait*)io_ready;
if ((NULL == desc->buffer.start) || (0 == desc->buffer.length)) {
desc->bytesRecv = 0;
} else {
desc->bytesRecv = (desc->fd->methods->recv)(
desc->fd, desc->buffer.start, desc->buffer.length, 0, desc->timeout);
if (desc->bytesRecv < 0) { /* SetError should already be there */
desc->outcome = PR_MW_FAILURE;
}
}
} /* _MW_InitialRecv */
#endif
#ifdef WINNT
static void NT_TimeProc(void* arg) {
_MDOverlapped* overlapped = (_MDOverlapped*)arg;
PRRecvWait* desc = overlapped->data.mw.desc;
PRFileDesc* bottom;
if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_TIMEOUT,
(LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING) {
/* This wait recv descriptor has already completed. */
return;
}
/* close the osfd to abort the outstanding async io request */
/* $$$$
** Little late to be checking if NSPR's on the bottom of stack,
** but if we don't check, we can't assert that the private data
** is what we think it is.
** $$$$
*/
bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
PR_ASSERT(NULL != bottom);
if (NULL != bottom) /* now what!?!?! */
{
bottom->secret->state = _PR_FILEDESC_CLOSED;
if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
PR_NOT_REACHED("What shall I do?");
}
}
return;
} /* NT_TimeProc */
static PRStatus NT_HashRemove(PRWaitGroup* group, PRFileDesc* fd) {
PRRecvWait** waiter;
_PR_MD_LOCK(&group->mdlock);
waiter = _MW_LookupInternal(group, fd);
if (NULL != waiter) {
group->waiter->count -= 1;
*waiter = NULL;
}
_PR_MD_UNLOCK(&group->mdlock);
return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
}
PRStatus NT_HashRemoveInternal(PRWaitGroup* group, PRFileDesc* fd) {
PRRecvWait** waiter;
waiter = _MW_LookupInternal(group, fd);
if (NULL != waiter) {
group->waiter->count -= 1;
*waiter = NULL;
}
return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
}
#endif /* WINNT */
/******************************************************************/
/******************************************************************/
/********************** The public API portion ********************/
/******************************************************************/
/******************************************************************/
PR_IMPLEMENT(PRStatus)
PR_AddWaitFileDesc(PRWaitGroup* group, PRRecvWait* desc) {
_PR_HashStory hrv;
PRStatus rv = PR_FAILURE;
#ifdef WINNT
_MDOverlapped* overlapped;
HANDLE hFile;
BOOL bResult;
DWORD dwError;
PRFileDesc* bottom;
#endif
if (!_pr_initialized) {
_PR_ImplicitInitialization();
}
if ((NULL == group) && (NULL == (group = MW_Init2()))) {
return rv;
}
PR_ASSERT(NULL != desc->fd);
desc->outcome = PR_MW_PENDING; /* nice, well known value */
desc->bytesRecv = 0; /* likewise, though this value is ambiguious */
PR_Lock(group->ml);
if (_prmw_running != group->state) {
/* Not allowed to add after cancelling the group */
desc->outcome = PR_MW_INTERRUPT;
PR_SetError(PR_INVALID_STATE_ERROR, 0);
PR_Unlock(group->ml);
return rv;
}
#ifdef WINNT
_PR_MD_LOCK(&group->mdlock);
#endif
/*
** If the waiter count is zero at this point, there's no telling
** how long we've been idle. Therefore, initialize the beginning
** of the timing interval. As long as the list doesn't go empty,
** it will maintain itself.
*/
if (0 == group->waiter->count) {
group->last_poll = PR_IntervalNow();
}
do {
hrv = MW_AddHashInternal(desc, group->waiter);
if (_prmw_rehash != hrv) {
break;
}
hrv = MW_ExpandHashInternal(group); /* gruesome */
if (_prmw_success != hrv) {
break;
}
} while (PR_TRUE);
#ifdef WINNT
_PR_MD_UNLOCK(&group->mdlock);
#endif
PR_NotifyCondVar(group->new_business); /* tell the world */
rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
PR_Unlock(group->ml);
#ifdef WINNT
overlapped = PR_NEWZAP(_MDOverlapped);
if (NULL == overlapped) {
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
NT_HashRemove(group, desc->fd);
return rv;
}
overlapped->ioModel = _MD_MultiWaitIO;
overlapped->data.mw.desc = desc;
overlapped->data.mw.group = group;
if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) {
overlapped->data.mw.timer =
CreateTimer(desc->timeout, NT_TimeProc, overlapped);
if (0 == overlapped->data.mw.timer) {
NT_HashRemove(group, desc->fd);
PR_DELETE(overlapped);
/*
* XXX It appears that a maximum of 16 timer events can
* be outstanding. GetLastError() returns 0 when I try it.
*/
PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
return PR_FAILURE;
}
}
/* Reach to the bottom layer to get the OS fd */
bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
PR_ASSERT(NULL != bottom);
if (NULL == bottom) {
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
return PR_FAILURE;
}
hFile = (HANDLE)bottom->secret->md.osfd;
if (!bottom->secret->md.io_model_committed) {
PRInt32 st;
st = _md_Associate(hFile);
PR_ASSERT(0 != st);
bottom->secret->md.io_model_committed = PR_TRUE;
}
bResult = ReadFile(hFile, desc->buffer.start, (DWORD)desc->buffer.length,
NULL, &overlapped->overlapped);
if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING) {
if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) {
if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_FAILURE,
(LONG)PR_MW_PENDING) ==
(LONG)PR_MW_PENDING) {
CancelTimer(overlapped->data.mw.timer);
}
NT_HashRemove(group, desc->fd);
PR_DELETE(overlapped);
}
_PR_MD_MAP_READ_ERROR(dwError);
rv = PR_FAILURE;
}
#endif
return rv;
} /* PR_AddWaitFileDesc */
PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup* group) {
PRCList* io_ready = NULL;
#ifdef WINNT
PRThread* me = _PR_MD_CURRENT_THREAD();
_MDOverlapped* overlapped;
#endif
if (!_pr_initialized) {
_PR_ImplicitInitialization();
}
if ((NULL == group) && (NULL == (group = MW_Init2()))) {
goto failed_init;
}
PR_Lock(group->ml);
if (_prmw_running != group->state) {
PR_SetError(PR_INVALID_STATE_ERROR, 0);
goto invalid_state;
}
group->waiting_threads += 1; /* the polling thread is counted */
#ifdef WINNT
_PR_MD_LOCK(&group->mdlock);
while (PR_CLIST_IS_EMPTY(&group->io_ready)) {
_PR_THREAD_LOCK(me);
me->state = _PR_IO_WAIT;
PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
if (!_PR_IS_NATIVE_THREAD(me)) {
_PR_SLEEPQ_LOCK(me->cpu);
_PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
_PR_SLEEPQ_UNLOCK(me->cpu);
}
_PR_THREAD_UNLOCK(me);
_PR_MD_UNLOCK(&group->mdlock);
PR_Unlock(group->ml);
_PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
me->state = _PR_RUNNING;
PR_Lock(group->ml);
_PR_MD_LOCK(&group->mdlock);
if (_PR_PENDING_INTERRUPT(me)) {
PR_REMOVE_LINK(&me->waitQLinks);
_PR_MD_UNLOCK(&group->mdlock);
me->flags &= ~_PR_INTERRUPT;
me->io_suspended = PR_FALSE;
PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
goto aborted;
}
}
io_ready = PR_LIST_HEAD(&group->io_ready);
PR_ASSERT(io_ready != NULL);
PR_REMOVE_LINK(io_ready);
_PR_MD_UNLOCK(&group->mdlock);
overlapped =
(_MDOverlapped*)((char*)io_ready - offsetof(_MDOverlapped, data));
io_ready = &overlapped->data.mw.desc->internal;
#else
do {
/*
** If the I/O ready list isn't empty, have this thread
** return with the first receive wait object that's available.
*/
if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
/*
** Is there a polling thread yet? If not, grab this thread
** and use it.
*/
if (NULL == group->poller) {
/*
** This thread will stay do polling until it becomes the only one
** left to service a completion. Then it will return and there will
** be none left to actually poll or to run completions.
**
** The polling function should only return w/ failure or
** with some I/O ready.
*/
if (PR_FAILURE == _MW_PollInternal(group)) {
goto failed_poll;
}
} else {
/*
** There are four reasons a thread can be awakened from
** a wait on the io_complete condition variable.
** 1. Some I/O has completed, i.e., the io_ready list
** is nonempty.
** 2. The wait group is canceled.
** 3. The thread is interrupted.
** 4. The current polling thread has to leave and needs
** a replacement.
** The logic to find a new polling thread is made more
** complicated by all the other possible events.
** I tried my best to write the logic clearly, but
** it is still full of if's with continue and goto.
*/
PRStatus st;
do {
st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
if (_prmw_running != group->state) {
PR_SetError(PR_INVALID_STATE_ERROR, 0);
goto aborted;
}
if (_MW_ABORTED(st) || (NULL == group->poller)) {
break;
}
} while (PR_CLIST_IS_EMPTY(&group->io_ready));
/*
** The thread is interrupted and has to leave. It might
** have also been awakened to process ready i/o or be the
** new poller. To be safe, if either condition is true,
** we awaken another thread to take its place.
*/
if (_MW_ABORTED(st)) {
if ((NULL == group->poller || !PR_CLIST_IS_EMPTY(&group->io_ready)) &&
group->waiting_threads > 1) {
PR_NotifyCondVar(group->io_complete);
}
goto aborted;
}
/*
** A new poller is needed, but can I be the new poller?
** If there is no i/o ready, sure. But if there is any
** i/o ready, it has a higher priority. I want to
** process the ready i/o first and wake up another
** thread to be the new poller.
*/
if (NULL == group->poller) {
if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
continue;
}
if (group->waiting_threads > 1) {
PR_NotifyCondVar(group->io_complete);
}
}
}
PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
}
io_ready = PR_LIST_HEAD(&group->io_ready);
PR_NotifyCondVar(group->io_taken);
PR_ASSERT(io_ready != NULL);
PR_REMOVE_LINK(io_ready);
} while (NULL == io_ready);
failed_poll:
#endif
aborted:
group->waiting_threads -= 1;
invalid_state:
(void)MW_TestForShutdownInternal(group);
PR_Unlock(group->ml);
failed_init:
if (NULL != io_ready) {
/* If the operation failed, record the reason why */
switch (((PRRecvWait*)io_ready)->outcome) {
case PR_MW_PENDING:
PR_ASSERT(0);
break;
case PR_MW_SUCCESS:
#ifndef WINNT
_MW_InitialRecv(io_ready);
#endif
break;
#ifdef WINNT
case PR_MW_FAILURE:
_PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
break;
#endif
case PR_MW_TIMEOUT:
PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
break;
case PR_MW_INTERRUPT:
PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
break;
default:
break;
}
#ifdef WINNT
if (NULL != overlapped->data.mw.timer) {
PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout);
CancelTimer(overlapped->data.mw.timer);
} else {
PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout);
}
PR_DELETE(overlapped);
#endif
}
return (PRRecvWait*)io_ready;
} /* PR_WaitRecvReady */
PR_IMPLEMENT(PRStatus)
PR_CancelWaitFileDesc(PRWaitGroup* group, PRRecvWait* desc) {
#if !defined(WINNT)
PRRecvWait** recv_wait;
#endif
PRStatus rv = PR_SUCCESS;
if (NULL == group) {
group = mw_state->group;
}
PR_ASSERT(NULL != group);
if (NULL == group) {
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
return PR_FAILURE;
}
PR_Lock(group->ml);
if (_prmw_running != group->state) {
PR_SetError(PR_INVALID_STATE_ERROR, 0);
rv = PR_FAILURE;
goto unlock;
}
#ifdef WINNT
if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_INTERRUPT,
(LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) {
PRFileDesc* bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
PR_ASSERT(NULL != bottom);
if (NULL == bottom) {
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
goto unlock;
}
bottom->secret->state = _PR_FILEDESC_CLOSED;
# if 0
fprintf(stderr, "cancel wait recv: closing socket\n");
# endif
if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
exit(1);
}
}
#else
if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd))) {
/* it was in the wait table */
_MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
goto unlock;
}
if (!PR_CLIST_IS_EMPTY(&group->io_ready)) {
/* is it already complete? */
PRCList* head = PR_LIST_HEAD(&group->io_ready);
do {
PRRecvWait* done = (PRRecvWait*)head;
if (done == desc) {
goto unlock;
}
head = PR_NEXT_LINK(head);
} while (head != &group->io_ready);
}
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
rv = PR_FAILURE;
#endif
unlock:
PR_Unlock(group->ml);
return rv;
} /* PR_CancelWaitFileDesc */
PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup* group) {
PRRecvWait** desc;
PRRecvWait* recv_wait = NULL;
#ifdef WINNT
_MDOverlapped* overlapped;
PRRecvWait** end;
PRThread* me = _PR_MD_CURRENT_THREAD();
#endif
if (NULL == group) {
group = mw_state->group;
}
PR_ASSERT(NULL != group);
if (NULL == group) {
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
return NULL;
}
PR_Lock(group->ml);
if (_prmw_stopped != group->state) {
if (_prmw_running == group->state) {
group->state = _prmw_stopping; /* so nothing new comes in */
}
if (0 == group->waiting_threads) { /* is there anybody else? */
group->state = _prmw_stopped; /* we can stop right now */
} else {
PR_NotifyAllCondVar(group->new_business);
PR_NotifyAllCondVar(group->io_complete);
}
while (_prmw_stopped != group->state) {
(void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
}
}
#ifdef WINNT
_PR_MD_LOCK(&group->mdlock);
#endif
/* make all the existing descriptors look done/interrupted */
#ifdef WINNT
end = &group->waiter->recv_wait + group->waiter->length;
for (desc = &group->waiter->recv_wait; desc < end; ++desc) {
if (NULL != *desc) {
if (InterlockedCompareExchange(
(LONG*)&(*desc)->outcome, (LONG)PR_MW_INTERRUPT,
(LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) {
PRFileDesc* bottom =
PR_GetIdentitiesLayer((*desc)->fd, PR_NSPR_IO_LAYER);
PR_ASSERT(NULL != bottom);
if (NULL == bottom) {
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
goto invalid_arg;
}
bottom->secret->state = _PR_FILEDESC_CLOSED;
# if 0
fprintf(stderr, "cancel wait group: closing socket\n");
# endif
if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
exit(1);
}
}
}
}
while (group->waiter->count > 0) {
_PR_THREAD_LOCK(me);
me->state = _PR_IO_WAIT;
PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
if (!_PR_IS_NATIVE_THREAD(me)) {
_PR_SLEEPQ_LOCK(me->cpu);
_PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
_PR_SLEEPQ_UNLOCK(me->cpu);
}
_PR_THREAD_UNLOCK(me);
_PR_MD_UNLOCK(&group->mdlock);
PR_Unlock(group->ml);
_PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
me->state = _PR_RUNNING;
PR_Lock(group->ml);
_PR_MD_LOCK(&group->mdlock);
}
#else
for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc) {
PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
if (NULL != *desc) {
_MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
}
}
#endif
/* take first element of finished list and return it or NULL */
if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
} else {
PRCList* head = PR_LIST_HEAD(&group->io_ready);
PR_REMOVE_AND_INIT_LINK(head);
#ifdef WINNT
overlapped = (_MDOverlapped*)((char*)head - offsetof(_MDOverlapped, data));
head = &overlapped->data.mw.desc->internal;
if (NULL != overlapped->data.mw.timer) {
PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout);
CancelTimer(overlapped->data.mw.timer);
} else {
PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout);
}
PR_DELETE(overlapped);
#endif
recv_wait = (PRRecvWait*)head;
}
#ifdef WINNT
invalid_arg:
_PR_MD_UNLOCK(&group->mdlock);
#endif
PR_Unlock(group->ml);
return recv_wait;
} /* PR_CancelWaitGroup */
PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */) {
PRWaitGroup* wg;
if (NULL == (wg = PR_NEWZAP(PRWaitGroup))) {
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
goto failed;
}
/* the wait group itself */
wg->ml = PR_NewLock();
if (NULL == wg->ml) {
goto failed_lock;
}
wg->io_taken = PR_NewCondVar(wg->ml);
if (NULL == wg->io_taken) {
goto failed_cvar0;
}
wg->io_complete = PR_NewCondVar(wg->ml);
if (NULL == wg->io_complete) {
goto failed_cvar1;
}
wg->new_business = PR_NewCondVar(wg->ml);
if (NULL == wg->new_business) {
goto failed_cvar2;
}
wg->mw_manage = PR_NewCondVar(wg->ml);
if (NULL == wg->mw_manage) {
goto failed_cvar3;
}
PR_INIT_CLIST(&wg->group_link);
PR_INIT_CLIST(&wg->io_ready);
/* the waiters sequence */
wg->waiter = (_PRWaiterHash*)PR_CALLOC(
sizeof(_PRWaiterHash) + (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
if (NULL == wg->waiter) {
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
goto failed_waiter;
}
wg->waiter->count = 0;
wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
#ifdef WINNT
_PR_MD_NEW_LOCK(&wg->mdlock);
PR_INIT_CLIST(&wg->wait_list);
#endif /* WINNT */
PR_Lock(mw_lock);
PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
PR_Unlock(mw_lock);
return wg;
failed_waiter:
PR_DestroyCondVar(wg->mw_manage);
failed_cvar3:
PR_DestroyCondVar(wg->new_business);
failed_cvar2:
PR_DestroyCondVar(wg->io_complete);
failed_cvar1:
PR_DestroyCondVar(wg->io_taken);
failed_cvar0:
PR_DestroyLock(wg->ml);
failed_lock:
PR_DELETE(wg);
wg = NULL;
failed:
return wg;
} /* MW_CreateWaitGroup */
PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup* group) {
PRStatus rv = PR_SUCCESS;
if (NULL == group) {
group = mw_state->group;
}
PR_ASSERT(NULL != group);
if (NULL != group) {
PR_Lock(group->ml);
if ((group->waiting_threads == 0) && (group->waiter->count == 0) &&
PR_CLIST_IS_EMPTY(&group->io_ready)) {
group->state = _prmw_stopped;
} else {
PR_SetError(PR_INVALID_STATE_ERROR, 0);
rv = PR_FAILURE;
}
PR_Unlock(group->ml);
if (PR_FAILURE == rv) {
return rv;
}
PR_Lock(mw_lock);
PR_REMOVE_LINK(&group->group_link);
PR_Unlock(mw_lock);
#ifdef WINNT
/*
* XXX make sure wait_list is empty and waiter is empty.
* These must be checked while holding mdlock.
*/
_PR_MD_FREE_LOCK(&group->mdlock);
#endif
PR_DELETE(group->waiter);
PR_DELETE(group->polling_list);
PR_DestroyCondVar(group->mw_manage);
PR_DestroyCondVar(group->new_business);
PR_DestroyCondVar(group->io_complete);
PR_DestroyCondVar(group->io_taken);
PR_DestroyLock(group->ml);
if (group == mw_state->group) {
mw_state->group = NULL;
}
PR_DELETE(group);
} else {
/* The default wait group is not created yet. */
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
rv = PR_FAILURE;
}
return rv;
} /* PR_DestroyWaitGroup */
/**********************************************************************
***********************************************************************
******************** Wait group enumerations **************************
***********************************************************************
**********************************************************************/
PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup* group) {
PRMWaitEnumerator* enumerator = PR_NEWZAP(PRMWaitEnumerator);
if (NULL == enumerator) {
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
} else {
enumerator->group = group;
enumerator->seal = _PR_ENUM_SEALED;
}
return enumerator;
} /* PR_CreateMWaitEnumerator */
PR_IMPLEMENT(PRStatus)
PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator) {
PR_ASSERT(NULL != enumerator);
PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) {
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
return PR_FAILURE;
}
enumerator->seal = _PR_ENUM_UNSEALED;
PR_Free(enumerator);
return PR_SUCCESS;
} /* PR_DestroyMWaitEnumerator */
PR_IMPLEMENT(PRRecvWait*)
PR_EnumerateWaitGroup(PRMWaitEnumerator* enumerator,
const PRRecvWait* previous) {
PRRecvWait* result = NULL;
/* entry point sanity checking */
PR_ASSERT(NULL != enumerator);
PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) {
goto bad_argument;
}
/* beginning of enumeration */
if (NULL == previous) {
if (NULL == enumerator->group) {
enumerator->group = mw_state->group;
if (NULL == enumerator->group) {
PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
return NULL;
}
}
enumerator->waiter = &enumerator->group->waiter->recv_wait;
enumerator->p_timestamp = enumerator->group->p_timestamp;
enumerator->thread = PR_GetCurrentThread();
enumerator->index = 0;
}
/* continuing an enumeration */
else {
PRThread* me = PR_GetCurrentThread();
PR_ASSERT(me == enumerator->thread);
if (me != enumerator->thread) {
goto bad_argument;
}
/* need to restart the enumeration */
if (enumerator->p_timestamp != enumerator->group->p_timestamp) {
return PR_EnumerateWaitGroup(enumerator, NULL);
}
}
/* actually progress the enumeration */
#if defined(WINNT)
_PR_MD_LOCK(&enumerator->group->mdlock);
#else
PR_Lock(enumerator->group->ml);
#endif
while (enumerator->index++ < enumerator->group->waiter->length) {
if (NULL != (result = *(enumerator->waiter)++)) {
break;
}
}
#if defined(WINNT)
_PR_MD_UNLOCK(&enumerator->group->mdlock);
#else
PR_Unlock(enumerator->group->ml);
#endif
return result; /* what we live for */
bad_argument:
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
return NULL; /* probably ambiguous */
} /* PR_EnumerateWaitGroup */
/* prmwait.c */