Source code
Revision control
Copy as Markdown
Other Tools
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
/*
*
* Notes:
* [1] lth. The call to Sleep() is a hack to get the test case to run
* on Windows 95. Without it, the test case fails with an error
* WSAECONNRESET following a recv() call. The error is caused by the
* server side thread termination without a shutdown() or closesocket()
* call. Windows docmunentation suggests that this is predicted
* behavior; that other platforms get away with it is ... serindipity.
* The test case should shutdown() or closesocket() before
* thread termination. I didn't have time to figure out where or how
* to do it. The Sleep() call inserts enough delay to allow the
* client side to recv() all his data before the server side thread
* terminates. Whew! ...
*
** Modification History:
* 14-May-97 AGarcia- Converted the test to accomodate the debug_mode flag.
* The debug mode will print all of the printfs associated with this
*test. The regress mode will be the default mode. Since the regress tool limits
* the output to a one line status:PASS or FAIL,all of the printf
*statements have been handled with an if (debug_mode) statement.
*/
#include "prclist.h"
#include "prcvar.h"
#include "prerror.h"
#include "prinit.h"
#include "prinrval.h"
#include "prio.h"
#include "prlock.h"
#include "prlog.h"
#include "prtime.h"
#include "prmem.h"
#include "prnetdb.h"
#include "prprf.h"
#include "prthread.h"
#include "pprio.h"
#include "primpl.h"
#include "plstr.h"
#include "plerror.h"
#include "plgetopt.h"
#include <stdlib.h>
#include <string.h>
#if defined(XP_UNIX)
# include <math.h>
#endif
/*
** This is the beginning of the test
*/
#ifdef DEBUG
# define PORT_INC_DO +100
#else
# define PORT_INC_DO
#endif
#ifdef IS_64
# define PORT_INC_3264 +200
#else
# define PORT_INC_3264
#endif
#define RECV_FLAGS 0
#define SEND_FLAGS 0
#define BUFFER_SIZE 1024
#define DEFAULT_BACKLOG 5
#define DEFAULT_PORT 13000 PORT_INC_DO PORT_INC_3264
#define DEFAULT_CLIENTS 1
#define ALLOWED_IN_ACCEPT 1
#define DEFAULT_CLIPPING 1000
#define DEFAULT_WORKERS_MIN 1
#define DEFAULT_WORKERS_MAX 1
#define DEFAULT_SERVER "localhost"
#define DEFAULT_EXECUTION_TIME 10
#define DEFAULT_CLIENT_TIMEOUT 4000
#define DEFAULT_SERVER_TIMEOUT 4000
#define DEFAULT_SERVER_PRIORITY PR_PRIORITY_HIGH
typedef enum CSState_e { cs_init, cs_run, cs_stop, cs_exit } CSState_t;
static void PR_CALLBACK Worker(void* arg);
typedef struct CSPool_s CSPool_t;
typedef struct CSWorker_s CSWorker_t;
typedef struct CSServer_s CSServer_t;
typedef enum Verbosity {
TEST_LOG_ALWAYS,
TEST_LOG_ERROR,
TEST_LOG_WARNING,
TEST_LOG_NOTICE,
TEST_LOG_INFO,
TEST_LOG_STATUS,
TEST_LOG_VERBOSE
} Verbosity;
static enum {
thread_nspr,
thread_pthread,
thread_sproc,
thread_win32
} thread_provider;
static PRInt32 domain = AF_INET;
static PRInt32 protocol = 6; /* TCP */
static PRFileDesc* debug_out = NULL;
static PRBool debug_mode = PR_FALSE;
static PRBool pthread_stats = PR_FALSE;
static Verbosity verbosity = TEST_LOG_ALWAYS;
static PRThreadScope thread_scope = PR_LOCAL_THREAD;
struct CSWorker_s {
PRCList element; /* list of the server's workers */
PRThread* thread; /* this worker objects thread */
CSServer_t* server; /* back pointer to server structure */
};
struct CSPool_s {
PRCondVar* exiting;
PRCondVar* acceptComplete;
PRUint32 accepting, active, workers;
};
struct CSServer_s {
PRCList list; /* head of worker list */
PRLock* ml;
PRThread* thread; /* the main server thread */
PRCondVar* stateChange;
PRUint16 port; /* port we're listening on */
PRUint32 backlog; /* size of our listener backlog */
PRFileDesc* listener; /* the fd accepting connections */
CSPool_t pool; /* statistics on worker threads */
CSState_t state; /* the server's state */
struct /* controlling worker counts */
{
PRUint32 minimum, maximum, accepting;
} workers;
/* statistics */
PRIntervalTime started, stopped;
PRUint32 operations, bytesTransferred;
};
typedef struct CSDescriptor_s {
PRInt32 size; /* size of transfer */
char filename[60]; /* filename, null padded */
} CSDescriptor_t;
typedef struct CSClient_s {
PRLock* ml;
PRThread* thread;
PRCondVar* stateChange;
PRNetAddr serverAddress;
CSState_t state;
/* statistics */
PRIntervalTime started, stopped;
PRUint32 operations, bytesTransferred;
} CSClient_t;
#define TEST_LOG(l, p, a) \
do { \
if (debug_mode || (p <= verbosity)) printf a; \
} while (0)
PRLogModuleInfo* cltsrv_log_file = NULL;
#define MY_ASSERT(_expr) \
((_expr) ? ((void)0) : _MY_Assert(#_expr, __FILE__, __LINE__))
#define TEST_ASSERT(_expr) \
((_expr) ? ((void)0) : _MY_Assert(#_expr, __FILE__, __LINE__))
static void _MY_Assert(const char* s, const char* file, PRIntn ln) {
PL_PrintError(NULL);
PR_Assert(s, file, ln);
} /* _MY_Assert */
static PRBool Aborted(PRStatus rv) {
return ((PR_FAILURE == rv) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError()))
? PR_TRUE
: PR_FALSE;
}
static void TimeOfDayMessage(const char* msg, PRThread* me) {
char buffer[100];
PRExplodedTime tod;
PR_ExplodeTime(PR_Now(), PR_LocalTimeParameters, &tod);
(void)PR_FormatTime(buffer, sizeof(buffer), "%H:%M:%S", &tod);
TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS,
("%s(0x%p): %s\n", msg, me, buffer));
} /* TimeOfDayMessage */
static void PR_CALLBACK Client(void* arg) {
PRStatus rv;
PRIntn index;
char buffer[1024];
PRFileDesc* fd = NULL;
PRUintn clipping = DEFAULT_CLIPPING;
CSClient_t* client = (CSClient_t*)arg;
PRThread* me = client->thread = PR_GetCurrentThread();
CSDescriptor_t* descriptor = PR_NEW(CSDescriptor_t);
PRIntervalTime timeout = PR_MillisecondsToInterval(DEFAULT_CLIENT_TIMEOUT);
for (index = 0; index < sizeof(buffer); ++index) {
buffer[index] = (char)index;
}
client->started = PR_IntervalNow();
PR_Lock(client->ml);
client->state = cs_run;
PR_NotifyCondVar(client->stateChange);
PR_Unlock(client->ml);
TimeOfDayMessage("Client started at", me);
while (cs_run == client->state) {
PRInt32 bytes, descbytes, filebytes, netbytes;
(void)PR_NetAddrToString(&client->serverAddress, buffer, sizeof(buffer));
TEST_LOG(cltsrv_log_file, TEST_LOG_INFO,
("\tClient(0x%p): connecting to server at %s\n", me, buffer));
fd = PR_Socket(domain, SOCK_STREAM, protocol);
TEST_ASSERT(NULL != fd);
rv = PR_Connect(fd, &client->serverAddress, timeout);
if (PR_FAILURE == rv) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\tClient(0x%p): conection failed\n", me));
goto aborted;
}
memset(descriptor, 0, sizeof(*descriptor));
descriptor->size = PR_htonl(descbytes = rand() % clipping);
PR_snprintf(descriptor->filename, sizeof(descriptor->filename),
"CS%p%p-%p.dat", client->started, me, client->operations);
TEST_LOG(
cltsrv_log_file, TEST_LOG_VERBOSE,
("\tClient(0x%p): sending descriptor for %u bytes\n", me, descbytes));
bytes = PR_Send(fd, descriptor, sizeof(*descriptor), SEND_FLAGS, timeout);
if (sizeof(CSDescriptor_t) != bytes) {
if (Aborted(PR_FAILURE)) {
goto aborted;
}
if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\tClient(0x%p): send descriptor timeout\n", me));
goto retry;
}
}
TEST_ASSERT(sizeof(*descriptor) == bytes);
netbytes = 0;
while (netbytes < descbytes) {
filebytes = sizeof(buffer);
if ((descbytes - netbytes) < filebytes) {
filebytes = descbytes - netbytes;
}
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\tClient(0x%p): sending %d bytes\n", me, filebytes));
bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout);
if (filebytes != bytes) {
if (Aborted(PR_FAILURE)) {
goto aborted;
}
if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\tClient(0x%p): send data timeout\n", me));
goto retry;
}
}
TEST_ASSERT(bytes == filebytes);
netbytes += bytes;
}
filebytes = 0;
while (filebytes < descbytes) {
netbytes = sizeof(buffer);
if ((descbytes - filebytes) < netbytes) {
netbytes = descbytes - filebytes;
}
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\tClient(0x%p): receiving %d bytes\n", me, netbytes));
bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout);
if (-1 == bytes) {
if (Aborted(PR_FAILURE)) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\tClient(0x%p): receive data aborted\n", me));
goto aborted;
} else if (PR_IO_TIMEOUT_ERROR == PR_GetError())
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\tClient(0x%p): receive data timeout\n", me));
else
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\tClient(0x%p): receive error (%d, %d)\n", me,
PR_GetError(), PR_GetOSError()));
goto retry;
}
if (0 == bytes) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\t\tClient(0x%p): unexpected end of stream\n",
PR_GetCurrentThread()));
break;
}
filebytes += bytes;
}
rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
if (Aborted(rv)) {
goto aborted;
}
TEST_ASSERT(PR_SUCCESS == rv);
retry:
(void)PR_Close(fd);
fd = NULL;
TEST_LOG(cltsrv_log_file, TEST_LOG_INFO,
("\tClient(0x%p): disconnected from server\n", me));
PR_Lock(client->ml);
client->operations += 1;
client->bytesTransferred += 2 * descbytes;
rv = PR_WaitCondVar(client->stateChange, rand() % clipping);
PR_Unlock(client->ml);
if (Aborted(rv)) {
break;
}
}
aborted:
client->stopped = PR_IntervalNow();
PR_ClearInterrupt();
if (NULL != fd) {
rv = PR_Close(fd);
}
PR_Lock(client->ml);
client->state = cs_exit;
PR_NotifyCondVar(client->stateChange);
PR_Unlock(client->ml);
PR_DELETE(descriptor);
TEST_LOG(
cltsrv_log_file, TEST_LOG_ALWAYS,
("\tClient(0x%p): stopped after %u operations and %u bytes\n",
PR_GetCurrentThread(), client->operations, client->bytesTransferred));
} /* Client */
static PRStatus ProcessRequest(PRFileDesc* fd, CSServer_t* server) {
PRStatus drv, rv;
char buffer[1024];
PRFileDesc* file = NULL;
PRThread* me = PR_GetCurrentThread();
PRInt32 bytes, descbytes, netbytes, filebytes = 0;
CSDescriptor_t* descriptor = PR_NEW(CSDescriptor_t);
PRIntervalTime timeout = PR_MillisecondsToInterval(DEFAULT_SERVER_TIMEOUT);
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\tProcessRequest(0x%p): receiving desciptor\n", me));
bytes = PR_Recv(fd, descriptor, sizeof(*descriptor), RECV_FLAGS, timeout);
if (-1 == bytes) {
rv = PR_FAILURE;
if (Aborted(rv)) {
goto exit;
}
if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\tProcessRequest(0x%p): receive timeout\n", me));
}
goto exit;
}
if (0 == bytes) {
rv = PR_FAILURE;
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\tProcessRequest(0x%p): unexpected end of file\n", me));
goto exit;
}
descbytes = PR_ntohl(descriptor->size);
TEST_ASSERT(sizeof(*descriptor) == bytes);
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\t\tProcessRequest(0x%p): read descriptor {%d, %s}\n", me,
descbytes, descriptor->filename));
file = PR_Open(descriptor->filename, (PR_CREATE_FILE | PR_WRONLY), 0666);
if (NULL == file) {
rv = PR_FAILURE;
if (Aborted(rv)) {
goto aborted;
}
if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\tProcessRequest(0x%p): open file timeout\n", me));
goto aborted;
}
}
TEST_ASSERT(NULL != file);
filebytes = 0;
while (filebytes < descbytes) {
netbytes = sizeof(buffer);
if ((descbytes - filebytes) < netbytes) {
netbytes = descbytes - filebytes;
}
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\tProcessRequest(0x%p): receive %d bytes\n", me, netbytes));
bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout);
if (-1 == bytes) {
rv = PR_FAILURE;
if (Aborted(rv)) {
goto aborted;
}
if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\t\tProcessRequest(0x%p): receive data timeout\n", me));
goto aborted;
}
/*
* XXX: I got (PR_CONNECT_RESET_ERROR, ERROR_NETNAME_DELETED)
* on NT here. This is equivalent to ECONNRESET on Unix.
* -wtc
*/
TEST_LOG(cltsrv_log_file, TEST_LOG_WARNING,
("\t\tProcessRequest(0x%p): unexpected error (%d, %d)\n", me,
PR_GetError(), PR_GetOSError()));
goto aborted;
}
if (0 == bytes) {
TEST_LOG(cltsrv_log_file, TEST_LOG_WARNING,
("\t\tProcessRequest(0x%p): unexpected end of stream\n", me));
rv = PR_FAILURE;
goto aborted;
}
filebytes += bytes;
netbytes = bytes;
/* The byte count for PR_Write should be positive */
MY_ASSERT(netbytes > 0);
TEST_LOG(
cltsrv_log_file, TEST_LOG_VERBOSE,
("\tProcessRequest(0x%p): write %d bytes to file\n", me, netbytes));
bytes = PR_Write(file, buffer, netbytes);
if (netbytes != bytes) {
rv = PR_FAILURE;
if (Aborted(rv)) {
goto aborted;
}
if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\t\tProcessRequest(0x%p): write file timeout\n", me));
goto aborted;
}
}
TEST_ASSERT(bytes > 0);
}
PR_Lock(server->ml);
server->operations += 1;
server->bytesTransferred += filebytes;
PR_Unlock(server->ml);
rv = PR_Close(file);
file = NULL;
if (Aborted(rv)) {
goto aborted;
}
TEST_ASSERT(PR_SUCCESS == rv);
TEST_LOG(
cltsrv_log_file, TEST_LOG_VERBOSE,
("\t\tProcessRequest(0x%p): opening %s\n", me, descriptor->filename));
file = PR_Open(descriptor->filename, PR_RDONLY, 0);
if (NULL == file) {
rv = PR_FAILURE;
if (Aborted(rv)) {
goto aborted;
}
if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\t\tProcessRequest(0x%p): open file timeout\n",
PR_GetCurrentThread()));
goto aborted;
}
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\t\tProcessRequest(0x%p): other file open error (%u, %u)\n", me,
PR_GetError(), PR_GetOSError()));
goto aborted;
}
TEST_ASSERT(NULL != file);
netbytes = 0;
while (netbytes < descbytes) {
filebytes = sizeof(buffer);
if ((descbytes - netbytes) < filebytes) {
filebytes = descbytes - netbytes;
}
TEST_LOG(
cltsrv_log_file, TEST_LOG_VERBOSE,
("\tProcessRequest(0x%p): read %d bytes from file\n", me, filebytes));
bytes = PR_Read(file, buffer, filebytes);
if (filebytes != bytes) {
rv = PR_FAILURE;
if (Aborted(rv)) {
goto aborted;
}
if (PR_IO_TIMEOUT_ERROR == PR_GetError())
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\t\tProcessRequest(0x%p): read file timeout\n", me));
else
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\t\tProcessRequest(0x%p): other file error (%d, %d)\n", me,
PR_GetError(), PR_GetOSError()));
goto aborted;
}
TEST_ASSERT(bytes > 0);
netbytes += bytes;
filebytes = bytes;
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\t\tProcessRequest(0x%p): sending %d bytes\n", me, filebytes));
bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout);
if (filebytes != bytes) {
rv = PR_FAILURE;
if (Aborted(rv)) {
goto aborted;
}
if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\t\tProcessRequest(0x%p): send data timeout\n", me));
goto aborted;
}
break;
}
TEST_ASSERT(bytes > 0);
}
PR_Lock(server->ml);
server->bytesTransferred += filebytes;
PR_Unlock(server->ml);
rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
if (Aborted(rv)) {
goto aborted;
}
rv = PR_Close(file);
file = NULL;
if (Aborted(rv)) {
goto aborted;
}
TEST_ASSERT(PR_SUCCESS == rv);
aborted:
PR_ClearInterrupt();
if (NULL != file) {
PR_Close(file);
}
drv = PR_Delete(descriptor->filename);
TEST_ASSERT(PR_SUCCESS == drv);
exit:
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\t\tProcessRequest(0x%p): Finished\n", me));
PR_DELETE(descriptor);
#if defined(WIN95)
PR_Sleep(PR_MillisecondsToInterval(200)); /* lth. see note [1] */
#endif
return rv;
} /* ProcessRequest */
typedef void (*StartFn)(void*);
typedef struct StartObject {
StartFn start;
void* arg;
} StartObject;
#if defined(_PR_PTHREADS)
# include "md/_pth.h"
# include <pthread.h>
static void* pthread_start(void* arg) {
StartObject* so = (StartObject*)arg;
StartFn start = so->start;
void* data = so->arg;
PR_Free(so);
start(data);
return NULL;
} /* pthread_start */
#endif /* defined(_PR_PTHREADS) */
#if defined(WIN32)
# include <process.h> /* for _beginthreadex() */
static PRUintn __stdcall windows_start(void* arg) {
StartObject* so = (StartObject*)arg;
StartFn start = so->start;
void* data = so->arg;
PR_Free(so);
start(data);
return 0;
} /* windows_start */
#endif /* defined(WIN32) */
static PRStatus JoinThread(PRThread* thread) {
PRStatus rv;
switch (thread_provider) {
case thread_nspr:
rv = PR_JoinThread(thread);
break;
case thread_pthread:
#if defined(_PR_PTHREADS)
rv = PR_SUCCESS;
break;
#endif /* defined(_PR_PTHREADS) */
case thread_win32:
#if defined(WIN32)
rv = PR_SUCCESS;
break;
#endif
default:
rv = PR_FAILURE;
break;
}
return rv;
} /* JoinThread */
static PRStatus NewThread(StartFn start, void* arg, PRThreadPriority prio,
PRThreadState state) {
PRStatus rv;
switch (thread_provider) {
case thread_nspr: {
PRThread* thread =
PR_CreateThread(PR_USER_THREAD, start, arg, PR_PRIORITY_NORMAL,
PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, 0);
rv = (NULL == thread) ? PR_FAILURE : PR_SUCCESS;
} break;
case thread_pthread:
#if defined(_PR_PTHREADS)
{
int rv;
pthread_t id;
pthread_attr_t tattr;
StartObject* start_object;
start_object = PR_NEW(StartObject);
PR_ASSERT(NULL != start_object);
start_object->start = start;
start_object->arg = arg;
rv = _PT_PTHREAD_ATTR_INIT(&tattr);
PR_ASSERT(0 == rv);
rv = pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
PR_ASSERT(0 == rv);
rv = pthread_attr_setstacksize(&tattr, 64 * 1024);
PR_ASSERT(0 == rv);
rv = _PT_PTHREAD_CREATE(&id, tattr, pthread_start, start_object);
(void)_PT_PTHREAD_ATTR_DESTROY(&tattr);
return (0 == rv) ? PR_SUCCESS : PR_FAILURE;
}
#else
PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0);
rv = PR_FAILURE;
#endif /* defined(_PR_PTHREADS) */
break;
case thread_sproc:
PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0);
rv = PR_FAILURE;
break;
case thread_win32:
#if defined(WIN32)
{
void* th;
PRUintn id;
StartObject* start_object;
start_object = PR_NEW(StartObject);
PR_ASSERT(NULL != start_object);
start_object->start = start;
start_object->arg = arg;
th = (void*)_beginthreadex(
NULL, /* LPSECURITY_ATTRIBUTES - pointer to thread security attributes
*/
0U, /* DWORD - initial thread stack size, in bytes */
windows_start, /* LPTHREAD_START_ROUTINE - pointer to thread function
*/
start_object, /* LPVOID - argument for new thread */
STACK_SIZE_PARAM_IS_A_RESERVATION, /*DWORD dwCreationFlags - creation
flags */
&id /* LPDWORD - pointer to returned thread identifier */);
rv = (NULL == th) ? PR_FAILURE : PR_SUCCESS;
}
#else
PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0);
rv = PR_FAILURE;
#endif
break;
default:
PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0);
rv = PR_FAILURE;
}
return rv;
} /* NewThread */
static PRStatus CreateWorker(CSServer_t* server, CSPool_t* pool) {
PRStatus rv;
CSWorker_t* worker = PR_NEWZAP(CSWorker_t);
worker->server = server;
PR_INIT_CLIST(&worker->element);
rv = NewThread(Worker, worker, DEFAULT_SERVER_PRIORITY, PR_UNJOINABLE_THREAD);
if (PR_FAILURE == rv) {
PR_DELETE(worker);
}
TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS,
("\tCreateWorker(0x%p): create new worker (0x%p)\n",
PR_GetCurrentThread(), worker->thread));
return rv;
} /* CreateWorker */
static void PR_CALLBACK Worker(void* arg) {
PRStatus rv;
PRNetAddr from;
PRFileDesc* fd = NULL;
CSWorker_t* worker = (CSWorker_t*)arg;
CSServer_t* server = worker->server;
CSPool_t* pool = &server->pool;
PRThread* me = worker->thread = PR_GetCurrentThread();
TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
("\t\tWorker(0x%p): started [%u]\n", me, pool->workers + 1));
PR_Lock(server->ml);
PR_APPEND_LINK(&worker->element, &server->list);
pool->workers += 1; /* define our existance */
while (cs_run == server->state) {
while (pool->accepting >= server->workers.accepting) {
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\t\tWorker(0x%p): waiting for accept slot[%d]\n", me,
pool->accepting));
rv = PR_WaitCondVar(pool->acceptComplete, PR_INTERVAL_NO_TIMEOUT);
if (Aborted(rv) || (cs_run != server->state)) {
TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
("\tWorker(0x%p): has been %s\n", me,
(Aborted(rv) ? "interrupted" : "stopped")));
goto exit;
}
}
pool->accepting += 1; /* how many are really in accept */
PR_Unlock(server->ml);
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\t\tWorker(0x%p): calling accept\n", me));
fd = PR_Accept(server->listener, &from, PR_INTERVAL_NO_TIMEOUT);
PR_Lock(server->ml);
pool->accepting -= 1;
PR_NotifyCondVar(pool->acceptComplete);
if ((NULL == fd) && Aborted(PR_FAILURE)) {
if (NULL != server->listener) {
PR_Close(server->listener);
server->listener = NULL;
}
goto exit;
}
if (NULL != fd) {
/*
** Create another worker of the total number of workers is
** less than the minimum specified or we have none left in
** accept() AND we're not over the maximum.
** This sort of presumes that the number allowed in accept
** is at least as many as the minimum. Otherwise we'll keep
** creating new threads and deleting them soon after.
*/
PRBool another = ((pool->workers < server->workers.minimum) ||
((0 == pool->accepting) &&
(pool->workers < server->workers.maximum)))
? PR_TRUE
: PR_FALSE;
pool->active += 1;
PR_Unlock(server->ml);
if (another) {
(void)CreateWorker(server, pool);
}
rv = ProcessRequest(fd, server);
if (PR_SUCCESS != rv)
TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
("\t\tWorker(0x%p): server process ended abnormally\n", me));
(void)PR_Close(fd);
fd = NULL;
PR_Lock(server->ml);
pool->active -= 1;
}
}
exit:
PR_ClearInterrupt();
PR_Unlock(server->ml);
if (NULL != fd) {
(void)PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
(void)PR_Close(fd);
}
TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
("\t\tWorker(0x%p): exiting [%u]\n", PR_GetCurrentThread(),
pool->workers));
PR_Lock(server->ml);
pool->workers -= 1; /* undefine our existance */
PR_REMOVE_AND_INIT_LINK(&worker->element);
PR_NotifyCondVar(pool->exiting);
PR_Unlock(server->ml);
PR_DELETE(worker); /* destruction of the "worker" object */
} /* Worker */
static void PR_CALLBACK Server(void* arg) {
PRStatus rv;
PRNetAddr serverAddress;
CSServer_t* server = (CSServer_t*)arg;
PRThread* me = server->thread = PR_GetCurrentThread();
PRSocketOptionData sockOpt;
server->listener = PR_Socket(domain, SOCK_STREAM, protocol);
sockOpt.option = PR_SockOpt_Reuseaddr;
sockOpt.value.reuse_addr = PR_TRUE;
rv = PR_SetSocketOption(server->listener, &sockOpt);
TEST_ASSERT(PR_SUCCESS == rv);
memset(&serverAddress, 0, sizeof(serverAddress));
rv = PR_InitializeNetAddr(PR_IpAddrAny, DEFAULT_PORT, &serverAddress);
rv = PR_Bind(server->listener, &serverAddress);
TEST_ASSERT(PR_SUCCESS == rv);
rv = PR_Listen(server->listener, server->backlog);
TEST_ASSERT(PR_SUCCESS == rv);
server->started = PR_IntervalNow();
TimeOfDayMessage("Server started at", me);
PR_Lock(server->ml);
server->state = cs_run;
PR_NotifyCondVar(server->stateChange);
PR_Unlock(server->ml);
/*
** Create the first worker (actually, a thread that accepts
** connections and then processes the work load as needed).
** From this point on, additional worker threads are created
** as they are needed by existing worker threads.
*/
rv = CreateWorker(server, &server->pool);
TEST_ASSERT(PR_SUCCESS == rv);
/*
** From here on this thread is merely hanging around as the contact
** point for the main test driver. It's just waiting for the driver
** to declare the test complete.
*/
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\tServer(0x%p): waiting for state change\n", me));
PR_Lock(server->ml);
while ((cs_run == server->state) && !Aborted(rv)) {
rv = PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
}
PR_Unlock(server->ml);
PR_ClearInterrupt();
TEST_LOG(cltsrv_log_file, TEST_LOG_INFO,
("\tServer(0x%p): shutting down workers\n", me));
/*
** Get all the worker threads to exit. They know how to
** clean up after themselves, so this is just a matter of
** waiting for clorine in the pool to take effect. During
** this stage we're ignoring interrupts.
*/
server->workers.minimum = server->workers.maximum = 0;
PR_Lock(server->ml);
while (!PR_CLIST_IS_EMPTY(&server->list)) {
PRCList* head = PR_LIST_HEAD(&server->list);
CSWorker_t* worker = (CSWorker_t*)head;
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("\tServer(0x%p): interrupting worker(0x%p)\n", me, worker));
rv = PR_Interrupt(worker->thread);
TEST_ASSERT(PR_SUCCESS == rv);
PR_REMOVE_AND_INIT_LINK(head);
}
while (server->pool.workers > 0) {
TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
("\tServer(0x%p): waiting for %u workers to exit\n", me,
server->pool.workers));
(void)PR_WaitCondVar(server->pool.exiting, PR_INTERVAL_NO_TIMEOUT);
}
server->state = cs_exit;
PR_NotifyCondVar(server->stateChange);
PR_Unlock(server->ml);
TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS,
("\tServer(0x%p): stopped after %u operations and %u bytes\n", me,
server->operations, server->bytesTransferred));
if (NULL != server->listener) {
PR_Close(server->listener);
}
server->stopped = PR_IntervalNow();
} /* Server */
static void WaitForCompletion(PRIntn execution) {
while (execution > 0) {
PRIntn dally = (execution > 30) ? 30 : execution;
PR_Sleep(PR_SecondsToInterval(dally));
if (pthread_stats) {
PT_FPrintStats(debug_out, "\nPThread Statistics\n");
}
execution -= dally;
}
} /* WaitForCompletion */
static void Help(void) {
PR_fprintf(debug_out, "cltsrv test program usage:\n");
PR_fprintf(debug_out,
"\t-a <n> threads allowed in accept (5)\n");
PR_fprintf(debug_out,
"\t-b <n> backlock for listen (5)\n");
PR_fprintf(debug_out,
"\t-c <threads> number of clients to create (1)\n");
PR_fprintf(debug_out,
"\t-w <threads> minimal number of server threads (1)\n");
PR_fprintf(debug_out,
"\t-W <threads> maximum number of server threads (1)\n");
PR_fprintf(debug_out,
"\t-e <seconds> duration of the test in seconds (10)\n");
PR_fprintf(debug_out,
"\t-s <string> dsn name of server (localhost)\n");
PR_fprintf(debug_out,
"\t-G use GLOBAL threads (LOCAL)\n");
PR_fprintf(debug_out,
"\t-T <string> thread provider ('n' | 'p' | 'w')(n)\n");
PR_fprintf(debug_out,
"\t-X use XTP as transport (TCP)\n");
PR_fprintf(debug_out,
"\t-6 Use IPv6 (IPv4)\n");
PR_fprintf(debug_out,
"\t-v verbosity (accumulative) (0)\n");
PR_fprintf(debug_out,
"\t-p pthread statistics (FALSE)\n");
PR_fprintf(debug_out,
"\t-d debug mode (FALSE)\n");
PR_fprintf(debug_out, "\t-h this message\n");
} /* Help */
static Verbosity IncrementVerbosity(void) {
PRIntn verboge = (PRIntn)verbosity + 1;
return (Verbosity)verboge;
} /* IncrementVerbosity */
int main(int argc, char** argv) {
PRUintn index;
PRBool boolean;
CSClient_t* client;
PRStatus rv, joinStatus;
CSServer_t* server = NULL;
char* thread_type;
PRUintn backlog = DEFAULT_BACKLOG;
PRUintn clients = DEFAULT_CLIENTS;
const char* serverName = DEFAULT_SERVER;
PRBool serverIsLocal = PR_TRUE;
PRUintn accepting = ALLOWED_IN_ACCEPT;
PRUintn workersMin = DEFAULT_WORKERS_MIN;
PRUintn workersMax = DEFAULT_WORKERS_MAX;
PRIntn execution = DEFAULT_EXECUTION_TIME;
/*
* -G use global threads
* -a <n> threads allowed in accept
* -b <n> backlock for listen
* -c <threads> number of clients to create
* -w <threads> minimal number of server threads
* -W <threads> maximum number of server threads
* -e <seconds> duration of the test in seconds
* -s <string> dsn name of server (implies no server here)
* -v verbosity
*/
PLOptStatus os;
PLOptState* opt = PL_CreateOptState(argc, argv, "GX6b:a:c:w:W:e:s:T:vdhp");
#if defined(WIN32)
thread_provider = thread_win32;
#elif defined(_PR_PTHREADS)
thread_provider = thread_pthread;
#else
thread_provider = thread_nspr;
#endif
debug_out = PR_GetSpecialFD(PR_StandardError);
while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) {
if (PL_OPT_BAD == os) {
continue;
}
switch (opt->option) {
case 'G': /* use global threads */
thread_scope = PR_GLOBAL_THREAD;
break;
case 'X': /* use XTP as transport */
protocol = 36;
break;
case '6': /* Use IPv6 */
domain = PR_AF_INET6;
break;
case 'a': /* the value for accepting */
accepting = atoi(opt->value);
break;
case 'b': /* the value for backlock */
backlog = atoi(opt->value);
break;
case 'T': /* the thread provider */
if ('n' == *opt->value) {
thread_provider = thread_nspr;
} else if ('p' == *opt->value) {
thread_provider = thread_pthread;
} else if ('w' == *opt->value) {
thread_provider = thread_win32;
} else {
Help();
return 2;
}
break;
case 'c': /* number of client threads */
clients = atoi(opt->value);
break;
case 'w': /* minimum server worker threads */
workersMin = atoi(opt->value);
break;
case 'W': /* maximum server worker threads */
workersMax = atoi(opt->value);
break;
case 'e': /* program execution time in seconds */
execution = atoi(opt->value);
break;
case 's': /* server's address */
serverName = opt->value;
break;
case 'v': /* verbosity */
verbosity = IncrementVerbosity();
break;
case 'd': /* debug mode */
debug_mode = PR_TRUE;
break;
case 'p': /* pthread mode */
pthread_stats = PR_TRUE;
break;
case 'h':
default:
Help();
return 2;
}
}
PL_DestroyOptState(opt);
if (0 != PL_strcmp(serverName, DEFAULT_SERVER)) {
serverIsLocal = PR_FALSE;
}
if (0 == execution) {
execution = DEFAULT_EXECUTION_TIME;
}
if (0 == workersMax) {
workersMax = DEFAULT_WORKERS_MAX;
}
if (0 == workersMin) {
workersMin = DEFAULT_WORKERS_MIN;
}
if (0 == accepting) {
accepting = ALLOWED_IN_ACCEPT;
}
if (0 == backlog) {
backlog = DEFAULT_BACKLOG;
}
if (workersMin > accepting) {
accepting = workersMin;
}
PR_STDIO_INIT();
TimeOfDayMessage("Client/Server started at", PR_GetCurrentThread());
cltsrv_log_file = PR_NewLogModule("cltsrv_log");
MY_ASSERT(NULL != cltsrv_log_file);
boolean = PR_SetLogFile("cltsrv.log");
MY_ASSERT(boolean);
if (serverIsLocal) {
/* Establish the server */
TEST_LOG(cltsrv_log_file, TEST_LOG_INFO,
("main(0x%p): starting server\n", PR_GetCurrentThread()));
server = PR_NEWZAP(CSServer_t);
PR_INIT_CLIST(&server->list);
server->state = cs_init;
server->ml = PR_NewLock();
server->backlog = backlog;
server->port = DEFAULT_PORT;
server->workers.minimum = workersMin;
server->workers.maximum = workersMax;
server->workers.accepting = accepting;
server->stateChange = PR_NewCondVar(server->ml);
server->pool.exiting = PR_NewCondVar(server->ml);
server->pool.acceptComplete = PR_NewCondVar(server->ml);
TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
("main(0x%p): creating server thread\n", PR_GetCurrentThread()));
rv = NewThread(Server, server, PR_PRIORITY_HIGH, PR_JOINABLE_THREAD);
TEST_ASSERT(PR_SUCCESS == rv);
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("main(0x%p): waiting for server init\n", PR_GetCurrentThread()));
PR_Lock(server->ml);
while (server->state == cs_init) {
PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
}
PR_Unlock(server->ml);
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("main(0x%p): server init complete (port #%d)\n",
PR_GetCurrentThread(), server->port));
}
if (clients != 0) {
/* Create all of the clients */
PRHostEnt host;
char buffer[BUFFER_SIZE];
client = (CSClient_t*)PR_CALLOC(clients * sizeof(CSClient_t));
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("main(0x%p): creating %d client threads\n", PR_GetCurrentThread(),
clients));
if (!serverIsLocal) {
rv = PR_GetHostByName(serverName, buffer, BUFFER_SIZE, &host);
if (PR_SUCCESS != rv) {
PL_FPrintError(PR_STDERR, "PR_GetHostByName");
return 2;
}
}
for (index = 0; index < clients; ++index) {
client[index].state = cs_init;
client[index].ml = PR_NewLock();
if (serverIsLocal) {
(void)PR_InitializeNetAddr(PR_IpAddrLoopback, DEFAULT_PORT,
&client[index].serverAddress);
} else {
(void)PR_EnumerateHostEnt(0, &host, DEFAULT_PORT,
&client[index].serverAddress);
}
client[index].stateChange = PR_NewCondVar(client[index].ml);
TEST_LOG(
cltsrv_log_file, TEST_LOG_INFO,
("main(0x%p): creating client threads\n", PR_GetCurrentThread()));
rv = NewThread(Client, &client[index], PR_PRIORITY_NORMAL,
PR_JOINABLE_THREAD);
TEST_ASSERT(PR_SUCCESS == rv);
PR_Lock(client[index].ml);
while (cs_init == client[index].state) {
PR_WaitCondVar(client[index].stateChange, PR_INTERVAL_NO_TIMEOUT);
}
PR_Unlock(client[index].ml);
}
}
/* Then just let them go at it for a bit */
TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS,
("main(0x%p): waiting for execution interval (%d seconds)\n",
PR_GetCurrentThread(), execution));
WaitForCompletion(execution);
TimeOfDayMessage("Shutting down", PR_GetCurrentThread());
if (clients != 0) {
for (index = 0; index < clients; ++index) {
TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS,
("main(0x%p): notifying client(0x%p) to stop\n",
PR_GetCurrentThread(), client[index].thread));
PR_Lock(client[index].ml);
if (cs_run == client[index].state) {
client[index].state = cs_stop;
PR_Interrupt(client[index].thread);
while (cs_stop == client[index].state)
PR_WaitCondVar(client[index].stateChange, PR_INTERVAL_NO_TIMEOUT);
}
PR_Unlock(client[index].ml);
TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
("main(0x%p): joining client(0x%p)\n", PR_GetCurrentThread(),
client[index].thread));
joinStatus = JoinThread(client[index].thread);
TEST_ASSERT(PR_SUCCESS == joinStatus);
PR_DestroyCondVar(client[index].stateChange);
PR_DestroyLock(client[index].ml);
}
PR_DELETE(client);
}
if (NULL != server) {
/* All clients joined - retrieve the server */
TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
("main(0x%p): notifying server(0x%p) to stop\n",
PR_GetCurrentThread(), server->thread));
PR_Lock(server->ml);
server->state = cs_stop;
PR_Interrupt(server->thread);
while (cs_exit != server->state) {
PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
}
PR_Unlock(server->ml);
TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
("main(0x%p): joining server(0x%p)\n", PR_GetCurrentThread(),
server->thread));
joinStatus = JoinThread(server->thread);
TEST_ASSERT(PR_SUCCESS == joinStatus);
PR_DestroyCondVar(server->stateChange);
PR_DestroyCondVar(server->pool.exiting);
PR_DestroyCondVar(server->pool.acceptComplete);
PR_DestroyLock(server->ml);
PR_DELETE(server);
}
TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS,
("main(0x%p): test complete\n", PR_GetCurrentThread()));
if (thread_provider == thread_win32) {
thread_type = "\nWin32 Thread Statistics\n";
} else if (thread_provider == thread_pthread) {
thread_type = "\npthread Statistics\n";
} else if (thread_provider == thread_sproc) {
thread_type = "\nsproc Statistics\n";
} else {
PR_ASSERT(thread_provider == thread_nspr);
thread_type = "\nPRThread Statistics\nn";
}
PT_FPrintStats(debug_out, thread_type);
TimeOfDayMessage("Test exiting at", PR_GetCurrentThread());
PR_Cleanup();
return 0;
} /* main */
/* cltsrv.c */