Source code

Revision control

Copy as Markdown

Other Tools

/* MtCoder.c -- Multi-thread Coder↩
2018-02-21 : Igor Pavlov : Public domain */
#include "Precomp.h"
#include "MtCoder.h"
#ifndef _7ZIP_ST↩
SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)↩
{↩
CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);↩
UInt64 inSize2 = 0;↩
UInt64 outSize2 = 0;↩
if (inSize != (UInt64)(Int64)-1)↩
{↩
inSize2 = inSize - thunk->inSize;↩
thunk->inSize = inSize;↩
}↩
if (outSize != (UInt64)(Int64)-1)↩
{↩
outSize2 = outSize - thunk->outSize;↩
thunk->outSize = outSize;↩
}↩
return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);↩
}↩
void MtProgressThunk_CreateVTable(CMtProgressThunk *p)↩
{↩
p->vt.Progress = MtProgressThunk_Progress;↩
}↩
#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }↩
static WRes ArEvent_OptCreate_And_Reset(CEvent *p)↩
{↩
if (Event_IsCreated(p))↩
return Event_Reset(p);↩
return AutoResetEvent_CreateNotSignaled(p);↩
}↩
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);↩
static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)↩
{↩
WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);↩
if (wres == 0)↩
{↩
t->stop = False;↩
if (!Thread_WasCreated(&t->thread))↩
wres = Thread_Create(&t->thread, ThreadFunc, t);↩
if (wres == 0)↩
wres = Event_Set(&t->startEvent);↩
}↩
if (wres == 0)↩
return SZ_OK;↩
return MY_SRes_HRESULT_FROM_WRes(wres);↩
}↩
static void MtCoderThread_Destruct(CMtCoderThread *t)↩
{↩
if (Thread_WasCreated(&t->thread))↩
{↩
t->stop = 1;↩
Event_Set(&t->startEvent);↩
Thread_Wait(&t->thread);↩
Thread_Close(&t->thread);↩
}↩
Event_Close(&t->startEvent);↩
if (t->inBuf)↩
{↩
ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);↩
t->inBuf = NULL;↩
}↩
}↩
static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)↩
{↩
size_t size = *processedSize;↩
*processedSize = 0;↩
while (size != 0)↩
{↩
size_t cur = size;↩
SRes res = ISeqInStream_Read(stream, data, &cur);↩
*processedSize += cur;↩
data += cur;↩
size -= cur;↩
RINOK(res);↩
if (cur == 0)↩
return SZ_OK;↩
}↩
return SZ_OK;↩
}↩
/*↩
ThreadFunc2() returns:↩
SZ_OK - in all normal cases (even for stream error or memory allocation error)↩
SZ_ERROR_THREAD - in case of failure in system synch function↩
*/
static SRes ThreadFunc2(CMtCoderThread *t)↩
{↩
CMtCoder *mtc = t->mtCoder;↩
for (;;)↩
{↩
unsigned bi;↩
SRes res;↩
SRes res2;↩
Bool finished;↩
unsigned bufIndex;↩
size_t size;↩
const Byte *inData;↩
UInt64 readProcessed = 0;↩
RINOK_THREAD(Event_Wait(&mtc->readEvent))↩
/* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
if (mtc->stopReading)↩
{↩
return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;↩
}↩
res = MtProgress_GetError(&mtc->mtProgress);↩
size = 0;↩
inData = NULL;↩
finished = True;↩
if (res == SZ_OK)↩
{↩
size = mtc->blockSize;↩
if (mtc->inStream)↩
{↩
if (!t->inBuf)↩
{↩
t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);↩
if (!t->inBuf)↩
res = SZ_ERROR_MEM;↩
}↩
if (res == SZ_OK)↩
{↩
res = FullRead(mtc->inStream, t->inBuf, &size);↩
readProcessed = mtc->readProcessed + size;↩
mtc->readProcessed = readProcessed;↩
}↩
if (res != SZ_OK)↩
{↩
mtc->readRes = res;↩
/* after reading error - we can stop encoding of previous blocks */
MtProgress_SetError(&mtc->mtProgress, res);↩
}↩
else
finished = (size != mtc->blockSize);↩
}↩
else
{↩
size_t rem;↩
readProcessed = mtc->readProcessed;↩
rem = mtc->inDataSize - (size_t)readProcessed;↩
if (size > rem)↩
size = rem;↩
inData = mtc->inData + (size_t)readProcessed;↩
readProcessed += size;↩
mtc->readProcessed = readProcessed;↩
finished = (mtc->inDataSize == (size_t)readProcessed);↩
}↩
}↩
/* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
res2 = SZ_OK;↩
if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)↩
{↩
res2 = SZ_ERROR_THREAD;↩
if (res == SZ_OK)↩
{↩
res = res2;↩
// MtProgress_SetError(&mtc->mtProgress, res);↩
}↩
}↩
bi = mtc->blockIndex;↩
if (++mtc->blockIndex >= mtc->numBlocksMax)↩
mtc->blockIndex = 0;↩
bufIndex = (unsigned)(int)-1;↩
if (res == SZ_OK)↩
res = MtProgress_GetError(&mtc->mtProgress);↩
if (res != SZ_OK)↩
finished = True;↩
if (!finished)↩
{↩
if (mtc->numStartedThreads < mtc->numStartedThreadsLimit↩
&& mtc->expectedDataSize != readProcessed)↩
{↩
res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);↩
if (res == SZ_OK)↩
mtc->numStartedThreads++;↩
else
{↩
MtProgress_SetError(&mtc->mtProgress, res);↩
finished = True;↩
}↩
}↩
}↩
if (finished)↩
mtc->stopReading = True;↩
RINOK_THREAD(Event_Set(&mtc->readEvent))↩
if (res2 != SZ_OK)↩
return res2;↩
if (res == SZ_OK)↩
{↩
CriticalSection_Enter(&mtc->cs);↩
bufIndex = mtc->freeBlockHead;↩
mtc->freeBlockHead = mtc->freeBlockList[bufIndex];↩
CriticalSection_Leave(&mtc->cs);↩
res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,↩
mtc->inStream ? t->inBuf : inData, size, finished);↩
// MtProgress_Reinit(&mtc->mtProgress, t->index);↩
if (res != SZ_OK)↩
MtProgress_SetError(&mtc->mtProgress, res);↩
}↩
{↩
CMtCoderBlock *block = &mtc->blocks[bi];↩
block->res = res;↩
block->bufIndex = bufIndex;↩
block->finished = finished;↩
}↩
#ifdef MTCODER__USE_WRITE_THREAD↩
RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))↩
#else
{↩
unsigned wi;↩
{↩
CriticalSection_Enter(&mtc->cs);↩
wi = mtc->writeIndex;↩
if (wi == bi)↩
mtc->writeIndex = (unsigned)(int)-1;↩
else
mtc->ReadyBlocks[bi] = True;↩
CriticalSection_Leave(&mtc->cs);↩
}↩
if (wi != bi)↩
{↩
if (res != SZ_OK || finished)↩
return 0;↩
continue;↩
}↩
if (mtc->writeRes != SZ_OK)↩
res = mtc->writeRes;↩
for (;;)↩
{↩
if (res == SZ_OK && bufIndex != (unsigned)(int)-1)↩
{↩
res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);↩
if (res != SZ_OK)↩
{↩
mtc->writeRes = res;↩
MtProgress_SetError(&mtc->mtProgress, res);↩
}↩
}↩
if (++wi >= mtc->numBlocksMax)↩
wi = 0;↩
{↩
Bool isReady;↩
CriticalSection_Enter(&mtc->cs);↩
if (bufIndex != (unsigned)(int)-1)↩
{↩
mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;↩
mtc->freeBlockHead = bufIndex;↩
}↩
isReady = mtc->ReadyBlocks[wi];↩
if (isReady)↩
mtc->ReadyBlocks[wi] = False;↩
else
mtc->writeIndex = wi;↩
CriticalSection_Leave(&mtc->cs);↩
RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))↩
if (!isReady)↩
break;↩
}↩
{↩
CMtCoderBlock *block = &mtc->blocks[wi];↩
if (res == SZ_OK && block->res != SZ_OK)↩
res = block->res;↩
bufIndex = block->bufIndex;↩
finished = block->finished;↩
}↩
}↩
}↩
#endif
if (finished || res != SZ_OK)↩
return 0;↩
}↩
}↩
static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)↩
{↩
CMtCoderThread *t = (CMtCoderThread *)pp;↩
for (;;)↩
{↩
if (Event_Wait(&t->startEvent) != 0)↩
return SZ_ERROR_THREAD;↩
if (t->stop)↩
return 0;↩
{↩
SRes res = ThreadFunc2(t);↩
CMtCoder *mtc = t->mtCoder;↩
if (res != SZ_OK)↩
{↩
MtProgress_SetError(&mtc->mtProgress, res);↩
}↩
#ifndef MTCODER__USE_WRITE_THREAD↩
{↩
unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);↩
if (numFinished == mtc->numStartedThreads)↩
if (Event_Set(&mtc->finishedEvent) != 0)↩
return SZ_ERROR_THREAD;↩
}↩
#endif
}↩
}↩
}↩
void MtCoder_Construct(CMtCoder *p)↩
{↩
unsigned i;↩
p->blockSize = 0;↩
p->numThreadsMax = 0;↩
p->expectedDataSize = (UInt64)(Int64)-1;↩
p->inStream = NULL;↩
p->inData = NULL;↩
p->inDataSize = 0;↩
p->progress = NULL;↩
p->allocBig = NULL;↩
p->mtCallback = NULL;↩
p->mtCallbackObject = NULL;↩
p->allocatedBufsSize = 0;↩
Event_Construct(&p->readEvent);↩
Semaphore_Construct(&p->blocksSemaphore);↩
for (i = 0; i < MTCODER__THREADS_MAX; i++)↩
{↩
CMtCoderThread *t = &p->threads[i];↩
t->mtCoder = p;↩
t->index = i;↩
t->inBuf = NULL;↩
t->stop = False;↩
Event_Construct(&t->startEvent);↩
Thread_Construct(&t->thread);↩
}↩
#ifdef MTCODER__USE_WRITE_THREAD↩
for (i = 0; i < MTCODER__BLOCKS_MAX; i++)↩
Event_Construct(&p->writeEvents[i]);↩
#else
Event_Construct(&p->finishedEvent);↩
#endif
CriticalSection_Init(&p->cs);↩
CriticalSection_Init(&p->mtProgress.cs);↩
}↩
static void MtCoder_Free(CMtCoder *p)↩
{↩
unsigned i;↩
/*↩
p->stopReading = True;↩
if (Event_IsCreated(&p->readEvent))↩
Event_Set(&p->readEvent);↩
*/
for (i = 0; i < MTCODER__THREADS_MAX; i++)↩
MtCoderThread_Destruct(&p->threads[i]);↩
Event_Close(&p->readEvent);↩
Semaphore_Close(&p->blocksSemaphore);↩
#ifdef MTCODER__USE_WRITE_THREAD↩
for (i = 0; i < MTCODER__BLOCKS_MAX; i++)↩
Event_Close(&p->writeEvents[i]);↩
#else
Event_Close(&p->finishedEvent);↩
#endif
}↩
void MtCoder_Destruct(CMtCoder *p)↩
{↩
MtCoder_Free(p);↩
CriticalSection_Delete(&p->cs);↩
CriticalSection_Delete(&p->mtProgress.cs);↩
}↩
SRes MtCoder_Code(CMtCoder *p)↩
{↩
unsigned numThreads = p->numThreadsMax;↩
unsigned numBlocksMax;↩
unsigned i;↩
SRes res = SZ_OK;↩
if (numThreads > MTCODER__THREADS_MAX)↩
numThreads = MTCODER__THREADS_MAX;↩
numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);↩
if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;↩
if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;↩
if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;↩
if (numBlocksMax > MTCODER__BLOCKS_MAX)↩
numBlocksMax = MTCODER__BLOCKS_MAX;↩
if (p->blockSize != p->allocatedBufsSize)↩
{↩
for (i = 0; i < MTCODER__THREADS_MAX; i++)↩
{↩
CMtCoderThread *t = &p->threads[i];↩
if (t->inBuf)↩
{↩
ISzAlloc_Free(p->allocBig, t->inBuf);↩
t->inBuf = NULL;↩
}↩
}↩
p->allocatedBufsSize = p->blockSize;↩
}↩
p->readRes = SZ_OK;↩
MtProgress_Init(&p->mtProgress, p->progress);↩
#ifdef MTCODER__USE_WRITE_THREAD↩
for (i = 0; i < numBlocksMax; i++)↩
{↩
RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));↩
}↩
#else
RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));↩
#endif
{↩
RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));↩
if (Semaphore_IsCreated(&p->blocksSemaphore))↩
{↩
RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore));↩
}↩
RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax));↩
}↩
for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)↩
p->freeBlockList[i] = i + 1;↩
p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;↩
p->freeBlockHead = 0;↩
p->readProcessed = 0;↩
p->blockIndex = 0;↩
p->numBlocksMax = numBlocksMax;↩
p->stopReading = False;↩
#ifndef MTCODER__USE_WRITE_THREAD↩
p->writeIndex = 0;↩
p->writeRes = SZ_OK;↩
for (i = 0; i < MTCODER__BLOCKS_MAX; i++)↩
p->ReadyBlocks[i] = False;↩
p->numFinishedThreads = 0;↩
#endif
p->numStartedThreadsLimit = numThreads;↩
p->numStartedThreads = 0;↩
// for (i = 0; i < numThreads; i++)↩
{↩
CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];↩
RINOK(MtCoderThread_CreateAndStart(nextThread));↩
}↩
RINOK_THREAD(Event_Set(&p->readEvent))↩
#ifdef MTCODER__USE_WRITE_THREAD↩
{↩
unsigned bi = 0;↩
for (;; bi++)↩
{↩
if (bi >= numBlocksMax)↩
bi = 0;↩
RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))↩
{↩
const CMtCoderBlock *block = &p->blocks[bi];↩
unsigned bufIndex = block->bufIndex;↩
Bool finished = block->finished;↩
if (res == SZ_OK && block->res != SZ_OK)↩
res = block->res;↩
if (bufIndex != (unsigned)(int)-1)↩
{↩
if (res == SZ_OK)↩
{↩
res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);↩
if (res != SZ_OK)↩
MtProgress_SetError(&p->mtProgress, res);↩
}↩
CriticalSection_Enter(&p->cs);↩
{↩
p->freeBlockList[bufIndex] = p->freeBlockHead;↩
p->freeBlockHead = bufIndex;↩
}↩
CriticalSection_Leave(&p->cs);↩
}↩
RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))↩
if (finished)↩
break;↩
}↩
}↩
}↩
#else
{↩
WRes wres = Event_Wait(&p->finishedEvent);↩
res = MY_SRes_HRESULT_FROM_WRes(wres);↩
}↩
#endif
if (res == SZ_OK)↩
res = p->readRes;↩
if (res == SZ_OK)↩
res = p->mtProgress.res;↩
#ifndef MTCODER__USE_WRITE_THREAD↩
if (res == SZ_OK)↩
res = p->writeRes;↩
#endif
if (res != SZ_OK)↩
MtCoder_Free(p);↩
return res;↩
}↩
#endif