1 /* MtCoder.c -- Multi-thread Coder
\r
2 2021-12-21 : Igor Pavlov : Public domain */
\r
10 static SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
\r
12 CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
\r
14 UInt64 outSize2 = 0;
\r
15 if (inSize != (UInt64)(Int64)-1)
\r
17 inSize2 = inSize - thunk->inSize;
\r
18 thunk->inSize = inSize;
\r
20 if (outSize != (UInt64)(Int64)-1)
\r
22 outSize2 = outSize - thunk->outSize;
\r
23 thunk->outSize = outSize;
\r
25 return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
\r
29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
\r
31 p->vt.Progress = MtProgressThunk_Progress;
\r
36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
\r
39 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
\r
41 if (Event_IsCreated(p))
\r
42 return Event_Reset(p);
\r
43 return AutoResetEvent_CreateNotSignaled(p);
\r
47 static THREAD_FUNC_DECL ThreadFunc(void *pp);
\r
50 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
\r
52 WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
\r
56 if (!Thread_WasCreated(&t->thread))
\r
57 wres = Thread_Create(&t->thread, ThreadFunc, t);
\r
59 wres = Event_Set(&t->startEvent);
\r
63 return MY_SRes_HRESULT_FROM_WRes(wres);
\r
67 static void MtCoderThread_Destruct(CMtCoderThread *t)
\r
69 if (Thread_WasCreated(&t->thread))
\r
72 Event_Set(&t->startEvent);
\r
73 Thread_Wait_Close(&t->thread);
\r
76 Event_Close(&t->startEvent);
\r
80 ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
\r
87 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
\r
89 size_t size = *processedSize;
\r
94 SRes res = ISeqInStream_Read(stream, data, &cur);
\r
95 *processedSize += cur;
\r
107 ThreadFunc2() returns:
\r
108 SZ_OK - in all normal cases (even for stream error or memory allocation error)
\r
109 SZ_ERROR_THREAD - in case of failure in system synch function
\r
112 static SRes ThreadFunc2(CMtCoderThread *t)
\r
114 CMtCoder *mtc = t->mtCoder;
\r
124 const Byte *inData;
\r
125 UInt64 readProcessed = 0;
\r
127 RINOK_THREAD(Event_Wait(&mtc->readEvent))
\r
129 /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
\r
131 if (mtc->stopReading)
\r
133 return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
\r
136 res = MtProgress_GetError(&mtc->mtProgress);
\r
144 size = mtc->blockSize;
\r
149 t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
\r
151 res = SZ_ERROR_MEM;
\r
155 res = FullRead(mtc->inStream, t->inBuf, &size);
\r
156 readProcessed = mtc->readProcessed + size;
\r
157 mtc->readProcessed = readProcessed;
\r
161 mtc->readRes = res;
\r
162 /* after reading error - we can stop encoding of previous blocks */
\r
163 MtProgress_SetError(&mtc->mtProgress, res);
\r
166 finished = (size != mtc->blockSize);
\r
171 readProcessed = mtc->readProcessed;
\r
172 rem = mtc->inDataSize - (size_t)readProcessed;
\r
175 inData = mtc->inData + (size_t)readProcessed;
\r
176 readProcessed += size;
\r
177 mtc->readProcessed = readProcessed;
\r
178 finished = (mtc->inDataSize == (size_t)readProcessed);
\r
182 /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
\r
186 if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
\r
188 res2 = SZ_ERROR_THREAD;
\r
192 // MtProgress_SetError(&mtc->mtProgress, res);
\r
196 bi = mtc->blockIndex;
\r
198 if (++mtc->blockIndex >= mtc->numBlocksMax)
\r
199 mtc->blockIndex = 0;
\r
201 bufIndex = (unsigned)(int)-1;
\r
204 res = MtProgress_GetError(&mtc->mtProgress);
\r
211 if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
\r
212 && mtc->expectedDataSize != readProcessed)
\r
214 res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
\r
216 mtc->numStartedThreads++;
\r
219 MtProgress_SetError(&mtc->mtProgress, res);
\r
226 mtc->stopReading = True;
\r
228 RINOK_THREAD(Event_Set(&mtc->readEvent))
\r
235 CriticalSection_Enter(&mtc->cs);
\r
236 bufIndex = mtc->freeBlockHead;
\r
237 mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
\r
238 CriticalSection_Leave(&mtc->cs);
\r
240 res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
\r
241 mtc->inStream ? t->inBuf : inData, size, finished);
\r
243 // MtProgress_Reinit(&mtc->mtProgress, t->index);
\r
246 MtProgress_SetError(&mtc->mtProgress, res);
\r
250 CMtCoderBlock *block = &mtc->blocks[bi];
\r
252 block->bufIndex = bufIndex;
\r
253 block->finished = finished;
\r
256 #ifdef MTCODER__USE_WRITE_THREAD
\r
257 RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
\r
262 CriticalSection_Enter(&mtc->cs);
\r
263 wi = mtc->writeIndex;
\r
265 mtc->writeIndex = (unsigned)(int)-1;
\r
267 mtc->ReadyBlocks[bi] = True;
\r
268 CriticalSection_Leave(&mtc->cs);
\r
273 if (res != SZ_OK || finished)
\r
278 if (mtc->writeRes != SZ_OK)
\r
279 res = mtc->writeRes;
\r
283 if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
\r
285 res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
\r
288 mtc->writeRes = res;
\r
289 MtProgress_SetError(&mtc->mtProgress, res);
\r
293 if (++wi >= mtc->numBlocksMax)
\r
298 CriticalSection_Enter(&mtc->cs);
\r
300 if (bufIndex != (unsigned)(int)-1)
\r
302 mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
\r
303 mtc->freeBlockHead = bufIndex;
\r
306 isReady = mtc->ReadyBlocks[wi];
\r
309 mtc->ReadyBlocks[wi] = False;
\r
311 mtc->writeIndex = wi;
\r
313 CriticalSection_Leave(&mtc->cs);
\r
315 RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
\r
322 CMtCoderBlock *block = &mtc->blocks[wi];
\r
323 if (res == SZ_OK && block->res != SZ_OK)
\r
325 bufIndex = block->bufIndex;
\r
326 finished = block->finished;
\r
332 if (finished || res != SZ_OK)
\r
338 static THREAD_FUNC_DECL ThreadFunc(void *pp)
\r
340 CMtCoderThread *t = (CMtCoderThread *)pp;
\r
343 if (Event_Wait(&t->startEvent) != 0)
\r
344 return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
\r
348 SRes res = ThreadFunc2(t);
\r
349 CMtCoder *mtc = t->mtCoder;
\r
352 MtProgress_SetError(&mtc->mtProgress, res);
\r
355 #ifndef MTCODER__USE_WRITE_THREAD
\r
357 unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
\r
358 if (numFinished == mtc->numStartedThreads)
\r
359 if (Event_Set(&mtc->finishedEvent) != 0)
\r
360 return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;
\r
369 void MtCoder_Construct(CMtCoder *p)
\r
374 p->numThreadsMax = 0;
\r
375 p->expectedDataSize = (UInt64)(Int64)-1;
\r
377 p->inStream = NULL;
\r
381 p->progress = NULL;
\r
382 p->allocBig = NULL;
\r
384 p->mtCallback = NULL;
\r
385 p->mtCallbackObject = NULL;
\r
387 p->allocatedBufsSize = 0;
\r
389 Event_Construct(&p->readEvent);
\r
390 Semaphore_Construct(&p->blocksSemaphore);
\r
392 for (i = 0; i < MTCODER__THREADS_MAX; i++)
\r
394 CMtCoderThread *t = &p->threads[i];
\r
399 Event_Construct(&t->startEvent);
\r
400 Thread_Construct(&t->thread);
\r
403 #ifdef MTCODER__USE_WRITE_THREAD
\r
404 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
\r
405 Event_Construct(&p->writeEvents[i]);
\r
407 Event_Construct(&p->finishedEvent);
\r
410 CriticalSection_Init(&p->cs);
\r
411 CriticalSection_Init(&p->mtProgress.cs);
\r
417 static void MtCoder_Free(CMtCoder *p)
\r
422 p->stopReading = True;
\r
423 if (Event_IsCreated(&p->readEvent))
\r
424 Event_Set(&p->readEvent);
\r
427 for (i = 0; i < MTCODER__THREADS_MAX; i++)
\r
428 MtCoderThread_Destruct(&p->threads[i]);
\r
430 Event_Close(&p->readEvent);
\r
431 Semaphore_Close(&p->blocksSemaphore);
\r
433 #ifdef MTCODER__USE_WRITE_THREAD
\r
434 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
\r
435 Event_Close(&p->writeEvents[i]);
\r
437 Event_Close(&p->finishedEvent);
\r
442 void MtCoder_Destruct(CMtCoder *p)
\r
446 CriticalSection_Delete(&p->cs);
\r
447 CriticalSection_Delete(&p->mtProgress.cs);
\r
451 SRes MtCoder_Code(CMtCoder *p)
\r
453 unsigned numThreads = p->numThreadsMax;
\r
454 unsigned numBlocksMax;
\r
458 if (numThreads > MTCODER__THREADS_MAX)
\r
459 numThreads = MTCODER__THREADS_MAX;
\r
460 numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
\r
462 if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
\r
463 if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
\r
464 if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
\r
466 if (numBlocksMax > MTCODER__BLOCKS_MAX)
\r
467 numBlocksMax = MTCODER__BLOCKS_MAX;
\r
469 if (p->blockSize != p->allocatedBufsSize)
\r
471 for (i = 0; i < MTCODER__THREADS_MAX; i++)
\r
473 CMtCoderThread *t = &p->threads[i];
\r
476 ISzAlloc_Free(p->allocBig, t->inBuf);
\r
480 p->allocatedBufsSize = p->blockSize;
\r
483 p->readRes = SZ_OK;
\r
485 MtProgress_Init(&p->mtProgress, p->progress);
\r
487 #ifdef MTCODER__USE_WRITE_THREAD
\r
488 for (i = 0; i < numBlocksMax; i++)
\r
490 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
\r
493 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
\r
497 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));
\r
498 RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
\r
501 for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
\r
502 p->freeBlockList[i] = i + 1;
\r
503 p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
\r
504 p->freeBlockHead = 0;
\r
506 p->readProcessed = 0;
\r
508 p->numBlocksMax = numBlocksMax;
\r
509 p->stopReading = False;
\r
511 #ifndef MTCODER__USE_WRITE_THREAD
\r
513 p->writeRes = SZ_OK;
\r
514 for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
\r
515 p->ReadyBlocks[i] = False;
\r
516 p->numFinishedThreads = 0;
\r
519 p->numStartedThreadsLimit = numThreads;
\r
520 p->numStartedThreads = 0;
\r
522 // for (i = 0; i < numThreads; i++)
\r
524 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
\r
525 RINOK(MtCoderThread_CreateAndStart(nextThread));
\r
528 RINOK_THREAD(Event_Set(&p->readEvent))
\r
530 #ifdef MTCODER__USE_WRITE_THREAD
\r
536 if (bi >= numBlocksMax)
\r
539 RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
\r
542 const CMtCoderBlock *block = &p->blocks[bi];
\r
543 unsigned bufIndex = block->bufIndex;
\r
544 BoolInt finished = block->finished;
\r
545 if (res == SZ_OK && block->res != SZ_OK)
\r
548 if (bufIndex != (unsigned)(int)-1)
\r
552 res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
\r
554 MtProgress_SetError(&p->mtProgress, res);
\r
557 CriticalSection_Enter(&p->cs);
\r
559 p->freeBlockList[bufIndex] = p->freeBlockHead;
\r
560 p->freeBlockHead = bufIndex;
\r
562 CriticalSection_Leave(&p->cs);
\r
565 RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
\r
574 WRes wres = Event_Wait(&p->finishedEvent);
\r
575 res = MY_SRes_HRESULT_FROM_WRes(wres);
\r
583 res = p->mtProgress.res;
\r
585 #ifndef MTCODER__USE_WRITE_THREAD
\r