--- /dev/null
+/* MtCoder.c -- Multi-thread Coder\r
+2021-12-21 : Igor Pavlov : Public domain */\r
+\r
+#include "Precomp.h"\r
+\r
+#include "MtCoder.h"\r
+\r
+#ifndef _7ZIP_ST\r
+\r
+static SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)\r
+{\r
+ CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);\r
+ UInt64 inSize2 = 0;\r
+ UInt64 outSize2 = 0;\r
+ if (inSize != (UInt64)(Int64)-1)\r
+ {\r
+ inSize2 = inSize - thunk->inSize;\r
+ thunk->inSize = inSize;\r
+ }\r
+ if (outSize != (UInt64)(Int64)-1)\r
+ {\r
+ outSize2 = outSize - thunk->outSize;\r
+ thunk->outSize = outSize;\r
+ }\r
+ return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);\r
+}\r
+\r
+\r
+void MtProgressThunk_CreateVTable(CMtProgressThunk *p)\r
+{\r
+ p->vt.Progress = MtProgressThunk_Progress;\r
+}\r
+\r
+\r
+\r
+#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }\r
+\r
+\r
+static WRes ArEvent_OptCreate_And_Reset(CEvent *p)\r
+{\r
+ if (Event_IsCreated(p))\r
+ return Event_Reset(p);\r
+ return AutoResetEvent_CreateNotSignaled(p);\r
+}\r
+\r
+\r
+static THREAD_FUNC_DECL ThreadFunc(void *pp);\r
+\r
+\r
+static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)\r
+{\r
+ WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);\r
+ if (wres == 0)\r
+ {\r
+ t->stop = False;\r
+ if (!Thread_WasCreated(&t->thread))\r
+ wres = Thread_Create(&t->thread, ThreadFunc, t);\r
+ if (wres == 0)\r
+ wres = Event_Set(&t->startEvent);\r
+ }\r
+ if (wres == 0)\r
+ return SZ_OK;\r
+ return MY_SRes_HRESULT_FROM_WRes(wres);\r
+}\r
+\r
+\r
+static void MtCoderThread_Destruct(CMtCoderThread *t)\r
+{\r
+ if (Thread_WasCreated(&t->thread))\r
+ {\r
+ t->stop = 1;\r
+ Event_Set(&t->startEvent);\r
+ Thread_Wait_Close(&t->thread);\r
+ }\r
+\r
+ Event_Close(&t->startEvent);\r
+\r
+ if (t->inBuf)\r
+ {\r
+ ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);\r
+ t->inBuf = NULL;\r
+ }\r
+}\r
+\r
+\r
+\r
+static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)\r
+{\r
+ size_t size = *processedSize;\r
+ *processedSize = 0;\r
+ while (size != 0)\r
+ {\r
+ size_t cur = size;\r
+ SRes res = ISeqInStream_Read(stream, data, &cur);\r
+ *processedSize += cur;\r
+ data += cur;\r
+ size -= cur;\r
+ RINOK(res);\r
+ if (cur == 0)\r
+ return SZ_OK;\r
+ }\r
+ return SZ_OK;\r
+}\r
+\r
+\r
+/*\r
+ ThreadFunc2() returns:\r
+ SZ_OK - in all normal cases (even for stream error or memory allocation error)\r
+ SZ_ERROR_THREAD - in case of failure in system synch function\r
+*/\r
+\r
+static SRes ThreadFunc2(CMtCoderThread *t)\r
+{\r
+ CMtCoder *mtc = t->mtCoder;\r
+\r
+ for (;;)\r
+ {\r
+ unsigned bi;\r
+ SRes res;\r
+ SRes res2;\r
+ BoolInt finished;\r
+ unsigned bufIndex;\r
+ size_t size;\r
+ const Byte *inData;\r
+ UInt64 readProcessed = 0;\r
+ \r
+ RINOK_THREAD(Event_Wait(&mtc->readEvent))\r
+\r
+ /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */\r
+\r
+ if (mtc->stopReading)\r
+ {\r
+ return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;\r
+ }\r
+\r
+ res = MtProgress_GetError(&mtc->mtProgress);\r
+ \r
+ size = 0;\r
+ inData = NULL;\r
+ finished = True;\r
+\r
+ if (res == SZ_OK)\r
+ {\r
+ size = mtc->blockSize;\r
+ if (mtc->inStream)\r
+ {\r
+ if (!t->inBuf)\r
+ {\r
+ t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);\r
+ if (!t->inBuf)\r
+ res = SZ_ERROR_MEM;\r
+ }\r
+ if (res == SZ_OK)\r
+ {\r
+ res = FullRead(mtc->inStream, t->inBuf, &size);\r
+ readProcessed = mtc->readProcessed + size;\r
+ mtc->readProcessed = readProcessed;\r
+ }\r
+ if (res != SZ_OK)\r
+ {\r
+ mtc->readRes = res;\r
+ /* after reading error - we can stop encoding of previous blocks */\r
+ MtProgress_SetError(&mtc->mtProgress, res);\r
+ }\r
+ else\r
+ finished = (size != mtc->blockSize);\r
+ }\r
+ else\r
+ {\r
+ size_t rem;\r
+ readProcessed = mtc->readProcessed;\r
+ rem = mtc->inDataSize - (size_t)readProcessed;\r
+ if (size > rem)\r
+ size = rem;\r
+ inData = mtc->inData + (size_t)readProcessed;\r
+ readProcessed += size;\r
+ mtc->readProcessed = readProcessed;\r
+ finished = (mtc->inDataSize == (size_t)readProcessed);\r
+ }\r
+ }\r
+\r
+ /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */\r
+\r
+ res2 = SZ_OK;\r
+\r
+ if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)\r
+ {\r
+ res2 = SZ_ERROR_THREAD;\r
+ if (res == SZ_OK)\r
+ {\r
+ res = res2;\r
+ // MtProgress_SetError(&mtc->mtProgress, res);\r
+ }\r
+ }\r
+\r
+ bi = mtc->blockIndex;\r
+\r
+ if (++mtc->blockIndex >= mtc->numBlocksMax)\r
+ mtc->blockIndex = 0;\r
+\r
+ bufIndex = (unsigned)(int)-1;\r
+\r
+ if (res == SZ_OK)\r
+ res = MtProgress_GetError(&mtc->mtProgress);\r
+\r
+ if (res != SZ_OK)\r
+ finished = True;\r
+\r
+ if (!finished)\r
+ {\r
+ if (mtc->numStartedThreads < mtc->numStartedThreadsLimit\r
+ && mtc->expectedDataSize != readProcessed)\r
+ {\r
+ res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);\r
+ if (res == SZ_OK)\r
+ mtc->numStartedThreads++;\r
+ else\r
+ {\r
+ MtProgress_SetError(&mtc->mtProgress, res);\r
+ finished = True;\r
+ }\r
+ }\r
+ }\r
+\r
+ if (finished)\r
+ mtc->stopReading = True;\r
+\r
+ RINOK_THREAD(Event_Set(&mtc->readEvent))\r
+\r
+ if (res2 != SZ_OK)\r
+ return res2;\r
+\r
+ if (res == SZ_OK)\r
+ {\r
+ CriticalSection_Enter(&mtc->cs);\r
+ bufIndex = mtc->freeBlockHead;\r
+ mtc->freeBlockHead = mtc->freeBlockList[bufIndex];\r
+ CriticalSection_Leave(&mtc->cs);\r
+ \r
+ res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,\r
+ mtc->inStream ? t->inBuf : inData, size, finished);\r
+ \r
+ // MtProgress_Reinit(&mtc->mtProgress, t->index);\r
+\r
+ if (res != SZ_OK)\r
+ MtProgress_SetError(&mtc->mtProgress, res);\r
+ }\r
+\r
+ {\r
+ CMtCoderBlock *block = &mtc->blocks[bi];\r
+ block->res = res;\r
+ block->bufIndex = bufIndex;\r
+ block->finished = finished;\r
+ }\r
+ \r
+ #ifdef MTCODER__USE_WRITE_THREAD\r
+ RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))\r
+ #else\r
+ {\r
+ unsigned wi;\r
+ {\r
+ CriticalSection_Enter(&mtc->cs);\r
+ wi = mtc->writeIndex;\r
+ if (wi == bi)\r
+ mtc->writeIndex = (unsigned)(int)-1;\r
+ else\r
+ mtc->ReadyBlocks[bi] = True;\r
+ CriticalSection_Leave(&mtc->cs);\r
+ }\r
+\r
+ if (wi != bi)\r
+ {\r
+ if (res != SZ_OK || finished)\r
+ return 0;\r
+ continue;\r
+ }\r
+\r
+ if (mtc->writeRes != SZ_OK)\r
+ res = mtc->writeRes;\r
+\r
+ for (;;)\r
+ {\r
+ if (res == SZ_OK && bufIndex != (unsigned)(int)-1)\r
+ {\r
+ res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);\r
+ if (res != SZ_OK)\r
+ {\r
+ mtc->writeRes = res;\r
+ MtProgress_SetError(&mtc->mtProgress, res);\r
+ }\r
+ }\r
+\r
+ if (++wi >= mtc->numBlocksMax)\r
+ wi = 0;\r
+ {\r
+ BoolInt isReady;\r
+\r
+ CriticalSection_Enter(&mtc->cs);\r
+ \r
+ if (bufIndex != (unsigned)(int)-1)\r
+ {\r
+ mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;\r
+ mtc->freeBlockHead = bufIndex;\r
+ }\r
+ \r
+ isReady = mtc->ReadyBlocks[wi];\r
+ \r
+ if (isReady)\r
+ mtc->ReadyBlocks[wi] = False;\r
+ else\r
+ mtc->writeIndex = wi;\r
+ \r
+ CriticalSection_Leave(&mtc->cs);\r
+\r
+ RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))\r
+\r
+ if (!isReady)\r
+ break;\r
+ }\r
+\r
+ {\r
+ CMtCoderBlock *block = &mtc->blocks[wi];\r
+ if (res == SZ_OK && block->res != SZ_OK)\r
+ res = block->res;\r
+ bufIndex = block->bufIndex;\r
+ finished = block->finished;\r
+ }\r
+ }\r
+ }\r
+ #endif\r
+ \r
+ if (finished || res != SZ_OK)\r
+ return 0;\r
+ }\r
+}\r
+\r
+\r
+static THREAD_FUNC_DECL ThreadFunc(void *pp)\r
+{\r
+ CMtCoderThread *t = (CMtCoderThread *)pp;\r
+ for (;;)\r
+ {\r
+ if (Event_Wait(&t->startEvent) != 0)\r
+ return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;\r
+ if (t->stop)\r
+ return 0;\r
+ {\r
+ SRes res = ThreadFunc2(t);\r
+ CMtCoder *mtc = t->mtCoder;\r
+ if (res != SZ_OK)\r
+ {\r
+ MtProgress_SetError(&mtc->mtProgress, res);\r
+ }\r
+ \r
+ #ifndef MTCODER__USE_WRITE_THREAD\r
+ {\r
+ unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);\r
+ if (numFinished == mtc->numStartedThreads)\r
+ if (Event_Set(&mtc->finishedEvent) != 0)\r
+ return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;\r
+ }\r
+ #endif\r
+ }\r
+ }\r
+}\r
+\r
+\r
+\r
+void MtCoder_Construct(CMtCoder *p)\r
+{\r
+ unsigned i;\r
+ \r
+ p->blockSize = 0;\r
+ p->numThreadsMax = 0;\r
+ p->expectedDataSize = (UInt64)(Int64)-1;\r
+\r
+ p->inStream = NULL;\r
+ p->inData = NULL;\r
+ p->inDataSize = 0;\r
+\r
+ p->progress = NULL;\r
+ p->allocBig = NULL;\r
+\r
+ p->mtCallback = NULL;\r
+ p->mtCallbackObject = NULL;\r
+\r
+ p->allocatedBufsSize = 0;\r
+\r
+ Event_Construct(&p->readEvent);\r
+ Semaphore_Construct(&p->blocksSemaphore);\r
+\r
+ for (i = 0; i < MTCODER__THREADS_MAX; i++)\r
+ {\r
+ CMtCoderThread *t = &p->threads[i];\r
+ t->mtCoder = p;\r
+ t->index = i;\r
+ t->inBuf = NULL;\r
+ t->stop = False;\r
+ Event_Construct(&t->startEvent);\r
+ Thread_Construct(&t->thread);\r
+ }\r
+\r
+ #ifdef MTCODER__USE_WRITE_THREAD\r
+ for (i = 0; i < MTCODER__BLOCKS_MAX; i++)\r
+ Event_Construct(&p->writeEvents[i]);\r
+ #else\r
+ Event_Construct(&p->finishedEvent);\r
+ #endif\r
+\r
+ CriticalSection_Init(&p->cs);\r
+ CriticalSection_Init(&p->mtProgress.cs);\r
+}\r
+\r
+\r
+\r
+\r
+static void MtCoder_Free(CMtCoder *p)\r
+{\r
+ unsigned i;\r
+\r
+ /*\r
+ p->stopReading = True;\r
+ if (Event_IsCreated(&p->readEvent))\r
+ Event_Set(&p->readEvent);\r
+ */\r
+\r
+ for (i = 0; i < MTCODER__THREADS_MAX; i++)\r
+ MtCoderThread_Destruct(&p->threads[i]);\r
+\r
+ Event_Close(&p->readEvent);\r
+ Semaphore_Close(&p->blocksSemaphore);\r
+\r
+ #ifdef MTCODER__USE_WRITE_THREAD\r
+ for (i = 0; i < MTCODER__BLOCKS_MAX; i++)\r
+ Event_Close(&p->writeEvents[i]);\r
+ #else\r
+ Event_Close(&p->finishedEvent);\r
+ #endif\r
+}\r
+\r
+\r
+void MtCoder_Destruct(CMtCoder *p)\r
+{\r
+ MtCoder_Free(p);\r
+\r
+ CriticalSection_Delete(&p->cs);\r
+ CriticalSection_Delete(&p->mtProgress.cs);\r
+}\r
+\r
+\r
+SRes MtCoder_Code(CMtCoder *p)\r
+{\r
+ unsigned numThreads = p->numThreadsMax;\r
+ unsigned numBlocksMax;\r
+ unsigned i;\r
+ SRes res = SZ_OK;\r
+\r
+ if (numThreads > MTCODER__THREADS_MAX)\r
+ numThreads = MTCODER__THREADS_MAX;\r
+ numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);\r
+ \r
+ if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;\r
+ if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;\r
+ if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;\r
+\r
+ if (numBlocksMax > MTCODER__BLOCKS_MAX)\r
+ numBlocksMax = MTCODER__BLOCKS_MAX;\r
+\r
+ if (p->blockSize != p->allocatedBufsSize)\r
+ {\r
+ for (i = 0; i < MTCODER__THREADS_MAX; i++)\r
+ {\r
+ CMtCoderThread *t = &p->threads[i];\r
+ if (t->inBuf)\r
+ {\r
+ ISzAlloc_Free(p->allocBig, t->inBuf);\r
+ t->inBuf = NULL;\r
+ }\r
+ }\r
+ p->allocatedBufsSize = p->blockSize;\r
+ }\r
+\r
+ p->readRes = SZ_OK;\r
+\r
+ MtProgress_Init(&p->mtProgress, p->progress);\r
+\r
+ #ifdef MTCODER__USE_WRITE_THREAD\r
+ for (i = 0; i < numBlocksMax; i++)\r
+ {\r
+ RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));\r
+ }\r
+ #else\r
+ RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));\r
+ #endif\r
+\r
+ {\r
+ RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));\r
+ RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax));\r
+ }\r
+\r
+ for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)\r
+ p->freeBlockList[i] = i + 1;\r
+ p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;\r
+ p->freeBlockHead = 0;\r
+\r
+ p->readProcessed = 0;\r
+ p->blockIndex = 0;\r
+ p->numBlocksMax = numBlocksMax;\r
+ p->stopReading = False;\r
+\r
+ #ifndef MTCODER__USE_WRITE_THREAD\r
+ p->writeIndex = 0;\r
+ p->writeRes = SZ_OK;\r
+ for (i = 0; i < MTCODER__BLOCKS_MAX; i++)\r
+ p->ReadyBlocks[i] = False;\r
+ p->numFinishedThreads = 0;\r
+ #endif\r
+\r
+ p->numStartedThreadsLimit = numThreads;\r
+ p->numStartedThreads = 0;\r
+\r
+ // for (i = 0; i < numThreads; i++)\r
+ {\r
+ CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];\r
+ RINOK(MtCoderThread_CreateAndStart(nextThread));\r
+ }\r
+\r
+ RINOK_THREAD(Event_Set(&p->readEvent))\r
+\r
+ #ifdef MTCODER__USE_WRITE_THREAD\r
+ {\r
+ unsigned bi = 0;\r
+\r
+ for (;; bi++)\r
+ {\r
+ if (bi >= numBlocksMax)\r
+ bi = 0;\r
+\r
+ RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))\r
+\r
+ {\r
+ const CMtCoderBlock *block = &p->blocks[bi];\r
+ unsigned bufIndex = block->bufIndex;\r
+ BoolInt finished = block->finished;\r
+ if (res == SZ_OK && block->res != SZ_OK)\r
+ res = block->res;\r
+\r
+ if (bufIndex != (unsigned)(int)-1)\r
+ {\r
+ if (res == SZ_OK)\r
+ {\r
+ res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);\r
+ if (res != SZ_OK)\r
+ MtProgress_SetError(&p->mtProgress, res);\r
+ }\r
+ \r
+ CriticalSection_Enter(&p->cs);\r
+ {\r
+ p->freeBlockList[bufIndex] = p->freeBlockHead;\r
+ p->freeBlockHead = bufIndex;\r
+ }\r
+ CriticalSection_Leave(&p->cs);\r
+ }\r
+ \r
+ RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))\r
+\r
+ if (finished)\r
+ break;\r
+ }\r
+ }\r
+ }\r
+ #else\r
+ {\r
+ WRes wres = Event_Wait(&p->finishedEvent);\r
+ res = MY_SRes_HRESULT_FROM_WRes(wres);\r
+ }\r
+ #endif\r
+\r
+ if (res == SZ_OK)\r
+ res = p->readRes;\r
+\r
+ if (res == SZ_OK)\r
+ res = p->mtProgress.res;\r
+\r
+ #ifndef MTCODER__USE_WRITE_THREAD\r
+ if (res == SZ_OK)\r
+ res = p->writeRes;\r
+ #endif\r
+\r
+ if (res != SZ_OK)\r
+ MtCoder_Free(p);\r
+ return res;\r
+}\r
+\r
+#endif\r