1 /* MtDec.c -- Multi-thread Decoder
\r
2 2021-12-21 : Igor Pavlov : Public domain */
\r
6 // #define SHOW_DEBUG_INFO
\r
8 // #include <stdio.h>
\r
11 #ifdef SHOW_DEBUG_INFO
\r
19 #ifdef SHOW_DEBUG_INFO
\r
25 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
\r
27 void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
\r
29 p->progress = progress;
\r
32 p->totalOutSize = 0;
\r
36 SRes MtProgress_Progress_ST(CMtProgress *p)
\r
38 if (p->res == SZ_OK && p->progress)
\r
39 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
\r
40 p->res = SZ_ERROR_PROGRESS;
\r
45 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
\r
48 CriticalSection_Enter(&p->cs);
\r
50 p->totalInSize += inSize;
\r
51 p->totalOutSize += outSize;
\r
52 if (p->res == SZ_OK && p->progress)
\r
53 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
\r
54 p->res = SZ_ERROR_PROGRESS;
\r
57 CriticalSection_Leave(&p->cs);
\r
62 SRes MtProgress_GetError(CMtProgress *p)
\r
65 CriticalSection_Enter(&p->cs);
\r
67 CriticalSection_Leave(&p->cs);
\r
72 void MtProgress_SetError(CMtProgress *p, SRes res)
\r
74 CriticalSection_Enter(&p->cs);
\r
75 if (p->res == SZ_OK)
\r
77 CriticalSection_Leave(&p->cs);
\r
81 #define RINOK_THREAD(x) RINOK_WRes(x)
\r
84 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
\r
86 if (Event_IsCreated(p))
\r
87 return Event_Reset(p);
\r
88 return AutoResetEvent_CreateNotSignaled(p);
\r
92 struct __CMtDecBufLink
\r
94 struct __CMtDecBufLink *next;
\r
98 typedef struct __CMtDecBufLink CMtDecBufLink;
\r
100 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
\r
101 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
\r
105 static THREAD_FUNC_DECL ThreadFunc(void *pp);
\r
108 static WRes MtDecThread_CreateEvents(CMtDecThread *t)
\r
110 WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
\r
113 wres = ArEvent_OptCreate_And_Reset(&t->canRead);
\r
121 static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
\r
123 WRes wres = MtDecThread_CreateEvents(t);
\r
124 // wres = 17; // for test
\r
127 if (Thread_WasCreated(&t->thread))
\r
129 wres = Thread_Create(&t->thread, ThreadFunc, t);
\r
133 return MY_SRes_HRESULT_FROM_WRes(wres);
\r
137 void MtDecThread_FreeInBufs(CMtDecThread *t)
\r
141 void *link = t->inBuf;
\r
145 void *next = ((CMtDecBufLink *)link)->next;
\r
146 ISzAlloc_Free(t->mtDec->alloc, link);
\r
154 static void MtDecThread_CloseThread(CMtDecThread *t)
\r
156 if (Thread_WasCreated(&t->thread))
\r
158 Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
\r
159 Event_Set(&t->canRead);
\r
160 Thread_Wait_Close(&t->thread);
\r
163 Event_Close(&t->canRead);
\r
164 Event_Close(&t->canWrite);
\r
167 static void MtDec_CloseThreads(CMtDec *p)
\r
170 for (i = 0; i < MTDEC__THREADS_MAX; i++)
\r
171 MtDecThread_CloseThread(&p->threads[i]);
\r
174 static void MtDecThread_Destruct(CMtDecThread *t)
\r
176 MtDecThread_CloseThread(t);
\r
177 MtDecThread_FreeInBufs(t);
\r
182 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
\r
184 size_t size = *processedSize;
\r
185 *processedSize = 0;
\r
189 SRes res = ISeqInStream_Read(stream, data, &cur);
\r
190 *processedSize += cur;
\r
201 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
\r
204 CriticalSection_Enter(&p->mtProgress.cs);
\r
205 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
\r
206 res = p->mtProgress.res;
\r
207 CriticalSection_Leave(&p->mtProgress.cs);
\r
211 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
\r
214 CriticalSection_Enter(&p->mtProgress.cs);
\r
216 p->mtProgress.totalInSize += inSize;
\r
217 p->mtProgress.totalOutSize += outSize;
\r
218 if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
\r
219 if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
\r
220 p->mtProgress.res = SZ_ERROR_PROGRESS;
\r
222 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
\r
223 res = p->mtProgress.res;
\r
225 CriticalSection_Leave(&p->mtProgress.cs);
\r
230 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
\r
232 CriticalSection_Enter(&p->mtProgress.cs);
\r
233 if (!p->needInterrupt || interruptIndex < p->interruptIndex)
\r
235 p->interruptIndex = interruptIndex;
\r
236 p->needInterrupt = True;
\r
238 CriticalSection_Leave(&p->mtProgress.cs);
\r
241 Byte *MtDec_GetCrossBuff(CMtDec *p)
\r
243 Byte *cr = p->crossBlock;
\r
246 cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
\r
249 p->crossBlock = cr;
\r
251 return MTDEC__DATA_PTR_FROM_LINK(cr);
\r
256 ThreadFunc2() returns:
\r
257 0 - in all normal cases (even for stream error or memory allocation error)
\r
258 (!= 0) - WRes error return by system threading function
\r
261 // #define MTDEC_ProgessStep (1 << 22)
\r
262 #define MTDEC_ProgessStep (1 << 0)
\r
264 static WRes ThreadFunc2(CMtDecThread *t)
\r
266 CMtDec *p = t->mtDec;
\r
268 PRF_STR_INT("ThreadFunc2", t->index);
\r
270 // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
\r
275 BoolInt wasInterrupted, isAllocError, overflow, finish;
\r
276 SRes threadingErrorSRes;
\r
277 BoolInt needCode, needWrite, needContinue;
\r
279 size_t inDataSize_Start;
\r
281 // UInt64 inDataSize_Full;
\r
286 UInt64 outPrev = 0;
\r
290 Byte *afterEndData = NULL;
\r
291 size_t afterEndData_Size = 0;
\r
292 BoolInt afterEndData_IsCross = False;
\r
294 BoolInt canCreateNewThread = False;
\r
295 // CMtDecCallbackInfo parse;
\r
296 CMtDecThread *nextThread;
\r
298 PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index);
\r
300 RINOK_THREAD(Event_Wait(&t->canRead));
\r
304 PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
\r
306 // if (t->index == 3) return 19; // for test
\r
308 blockIndex = p->blockIndex++;
\r
310 // PRF(printf("\ncanRead\n"))
\r
312 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
\r
314 finish = p->readWasFinished;
\r
317 isAllocError = False;
\r
320 inDataSize_Start = 0;
\r
322 // inDataSize_Full = 0;
\r
324 if (res == SZ_OK && !wasInterrupted)
\r
326 // if (p->inStream)
\r
328 CMtDecBufLink *prev = NULL;
\r
329 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
\r
330 size_t crossSize = p->crossEnd - p->crossStart;
\r
332 PRF(printf("\ncrossSize = %d\n", crossSize));
\r
338 link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
\r
342 // p->allocError_for_Read_BlockIndex = blockIndex;
\r
343 isAllocError = True;
\r
349 // static unsigned g_num = 0;
\r
350 // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
\r
354 t->inBuf = (void *)link;
\r
358 Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
\r
359 Byte *parseData = data;
\r
362 if (crossSize != 0)
\r
364 inDataSize = crossSize;
\r
365 // inDataSize_Full = inDataSize;
\r
366 inDataSize_Start = crossSize;
\r
368 parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
\r
369 PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",
\r
370 (int)p->crossStart, (int)p->crossEnd, (int)finish));
\r
374 size = p->inBufSize;
\r
376 res = FullRead(p->inStream, data, &size);
\r
378 // size = 10; // test
\r
380 inDataSize += size;
\r
381 // inDataSize_Full = inDataSize;
\r
383 inDataSize_Start = size;
\r
385 p->readProcessed += size;
\r
386 finish = (size != p->inBufSize);
\r
388 p->readWasFinished = True;
\r
390 // res = E_INVALIDARG; // test
\r
394 // PRF(printf("\nRead error = %d\n", res))
\r
395 // we want to decode all data before error
\r
397 // p->readError_BlockIndex = blockIndex;
\r
398 p->readWasFinished = True;
\r
404 if (inDataSize - inPrev >= MTDEC_ProgessStep)
\r
406 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
\r
407 if (res != SZ_OK || wasInterrupted)
\r
409 inPrev = inDataSize;
\r
414 CMtDecCallbackInfo parse;
\r
416 parse.startCall = (prev == NULL);
\r
417 parse.src = parseData;
\r
418 parse.srcSize = size;
\r
419 parse.srcFinished = finish;
\r
420 parse.canCreateNewThread = True;
\r
422 PRF(printf("\nParse size = %d\n", (unsigned)size));
\r
424 p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
\r
426 PRF(printf(" Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));
\r
429 canCreateNewThread = parse.canCreateNewThread;
\r
431 // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
\r
434 // parseRes != SZ_OK ||
\r
435 // inDataSize - (size - parse.srcSize) > p->inBlockMax
\r
437 parse.state == MTDEC_PARSE_OVERFLOW
\r
438 // || wasInterrupted
\r
441 // Overflow or Parse error - switch from MT decoding to ST decoding
\r
446 PRF(printf("\n Overflow"));
\r
447 // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
\r
448 PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
\r
451 if (crossSize != 0)
\r
452 memcpy(data, parseData, size);
\r
458 if (crossSize != 0)
\r
460 memcpy(data, parseData, parse.srcSize);
\r
461 p->crossStart += parse.srcSize;
\r
464 if (parse.state != MTDEC_PARSE_CONTINUE || finish)
\r
466 // we don't need to parse in current thread anymore
\r
468 if (parse.state == MTDEC_PARSE_END)
\r
472 // p->crossFinished = finish;
\r
474 if (parse.srcSize == size)
\r
476 // full parsed - no cross transfer
\r
482 if (parse.state == MTDEC_PARSE_END)
\r
484 afterEndData = parseData + parse.srcSize;
\r
485 afterEndData_Size = size - parse.srcSize;
\r
486 if (crossSize != 0)
\r
487 afterEndData_IsCross = True;
\r
488 // we reduce data size to required bytes (parsed only)
\r
489 inDataSize -= afterEndData_Size;
\r
491 inDataSize_Start = parse.srcSize;
\r
496 // partial parsed - need cross transfer
\r
497 if (crossSize != 0)
\r
498 inDataSize = parse.srcSize; // it's only parsed now
\r
501 // partial parsed - is not in initial cross block - we need to copy new data to cross block
\r
502 Byte *cr = MtDec_GetCrossBuff(p);
\r
506 PRF(printf("\ncross alloc error error\n"));
\r
507 // res = SZ_ERROR_MEM;
\r
509 // p->allocError_for_Read_BlockIndex = blockIndex;
\r
510 isAllocError = True;
\r
516 size_t crSize = size - parse.srcSize;
\r
517 inDataSize -= crSize;
\r
518 p->crossEnd = crSize;
\r
520 memcpy(cr, parseData + parse.srcSize, crSize);
\r
524 // inDataSize_Full = inDataSize;
\r
526 inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
\r
533 if (parse.srcSize != size)
\r
535 res = SZ_ERROR_FAIL;
\r
536 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
\r
545 if (crossSize != 0)
\r
555 res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
\r
560 if (res == SZ_OK && needCode && !wasInterrupted)
\r
562 codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
\r
563 if (codeRes != SZ_OK)
\r
567 // SZ_ERROR_MEM is expected error here.
\r
568 // if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
\r
569 // if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
\r
573 if (res != SZ_OK || wasInterrupted)
\r
577 threadingErrorSRes = SZ_OK;
\r
581 if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
\r
583 SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
\r
586 // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
\r
587 p->numStartedThreads++;
\r
591 PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
\r
592 if (p->numStartedThreads == 1)
\r
594 // if only one thread is possible, we leave muti-threading code
\r
597 threadingErrorSRes = res2;
\r
600 p->numStartedThreads_Limit = p->numStartedThreads;
\r
606 unsigned nextIndex = t->index + 1;
\r
607 nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
\r
608 RINOK_THREAD(Event_Set(&nextThread->canRead))
\r
609 // We have started executing for new iteration (with next thread)
\r
610 // And that next thread now is responsible for possible exit from decoding (threading_code)
\r
614 // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
\r
615 // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
\r
616 // if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
\r
617 // - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
\r
618 // - otherwise we stop decoding and exit from ThreadFunc2()
\r
620 // Don't change (finish) variable in the further code
\r
623 // ---------- CODE ----------
\r
630 if (res == SZ_OK && needCode && codeRes == SZ_OK)
\r
632 BoolInt isStartBlock = True;
\r
633 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
\r
641 inSize = inDataSize_Start;
\r
644 UInt64 rem = inDataSize - inCodePos;
\r
645 inSize = p->inBufSize;
\r
647 inSize = (size_t)rem;
\r
650 inCodePos += inSize;
\r
653 codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
\r
654 (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
\r
655 (inCodePos == inDataSize), // srcFinished
\r
656 &inCodePos, &outCodePos, &stop);
\r
658 if (codeRes != SZ_OK)
\r
660 PRF(printf("\nCode Interrupt error = %x\n", codeRes));
\r
661 // we interrupt only later blocks
\r
662 MtDec_Interrupt(p, blockIndex);
\r
666 if (stop || inCodePos == inDataSize)
\r
670 const UInt64 inDelta = inCodePos - inPrev;
\r
671 const UInt64 outDelta = outCodePos - outPrev;
\r
672 if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
\r
675 res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
\r
676 if (res != SZ_OK || wasInterrupted)
\r
678 inPrev = inCodePos;
\r
679 outPrev = outCodePos;
\r
684 isStartBlock = False;
\r
689 // ---------- WRITE ----------
\r
691 RINOK_THREAD(Event_Wait(&t->canWrite));
\r
694 BoolInt isErrorMode = False;
\r
695 BoolInt canRecode = True;
\r
696 BoolInt needWriteToStream = needWrite;
\r
698 if (p->exitThread) return 0; // it's never executed in normal cases
\r
700 if (p->wasInterrupted)
\r
701 wasInterrupted = True;
\r
704 if (codeRes != SZ_OK) // || !needCode // check it !!!
\r
706 p->wasInterrupted = True;
\r
707 p->codeRes = codeRes;
\r
708 if (codeRes == SZ_ERROR_MEM)
\r
709 isAllocError = True;
\r
712 if (threadingErrorSRes)
\r
714 p->wasInterrupted = True;
\r
715 p->threadingErrorSRes = threadingErrorSRes;
\r
716 needWriteToStream = False;
\r
720 p->wasInterrupted = True;
\r
721 p->isAllocError = True;
\r
722 needWriteToStream = False;
\r
726 p->wasInterrupted = True;
\r
727 p->overflow = True;
\r
728 needWriteToStream = False;
\r
734 if (wasInterrupted)
\r
740 const UInt64 inDelta = inCodePos - inPrev;
\r
741 const UInt64 outDelta = outCodePos - outPrev;
\r
742 // if (inDelta != 0 || outDelta != 0)
\r
743 res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
\r
747 needContinue = (!finish);
\r
749 // if (res == SZ_OK && needWrite && !wasInterrupted)
\r
752 // p->inProcessed += inCodePos;
\r
754 PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));
\r
756 res = p->mtCallback->Write(p->mtCallbackObject, t->index,
\r
757 res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
\r
758 afterEndData, afterEndData_Size, afterEndData_IsCross,
\r
762 // res = SZ_ERROR_FAIL; // for test
\r
764 PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
\r
765 PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
\r
769 PRF(printf("\nWrite error = %d\n", res));
\r
770 isErrorMode = True;
\r
771 p->wasInterrupted = True;
\r
774 || (!needContinue && !finish))
\r
776 PRF(printf("\nWrite Interrupt error = %x\n", res));
\r
777 MtDec_Interrupt(p, blockIndex);
\r
784 || p->wasInterrupted
\r
785 || codeRes != SZ_OK
\r
787 || p->numFilledThreads != 0
\r
790 if (p->numFilledThreads == 0)
\r
791 p->filledThreadStart = t->index;
\r
792 if (inDataSize != 0 || !finish)
\r
794 t->inDataSize_Start = inDataSize_Start;
\r
795 t->inDataSize = inDataSize;
\r
796 p->numFilledThreads++;
\r
798 PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
\r
799 PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
\r
804 RINOK_THREAD(Event_Set(&nextThread->canWrite));
\r
810 // we restore decoding with new iteration
\r
811 RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
\r
815 // we exit from decoding
\r
818 p->exitThread = True;
\r
820 RINOK_THREAD(Event_Set(&p->threads[0].canRead));
\r
832 #include <malloc.h>
\r
834 #include <stdlib.h>
\r
839 static THREAD_FUNC_DECL ThreadFunc1(void *pp)
\r
843 CMtDecThread *t = (CMtDecThread *)pp;
\r
846 // fprintf(stdout, "\n%d = %p\n", t->index, &t);
\r
848 res = ThreadFunc2(t);
\r
851 return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;
\r
853 // it's unexpected situation for some threading function error
\r
854 if (p->exitThreadWRes == 0)
\r
855 p->exitThreadWRes = res;
\r
856 PRF(printf("\nthread exit error = %d\n", res));
\r
857 p->exitThread = True;
\r
858 Event_Set(&p->threads[0].canRead);
\r
859 Event_Set(&p->threads[0].canWrite);
\r
860 MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
\r
862 return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;
\r
865 static MY_NO_INLINE THREAD_FUNC_DECL ThreadFunc(void *pp)
\r
868 CMtDecThread *t = (CMtDecThread *)pp;
\r
869 // fprintf(stderr, "\n%d = %p - before", t->index, &t);
\r
870 t->allocaPtr = alloca(t->index * 128);
\r
872 return ThreadFunc1(pp);
\r
876 int MtDec_PrepareRead(CMtDec *p)
\r
878 if (p->crossBlock && p->crossStart == p->crossEnd)
\r
880 ISzAlloc_Free(p->alloc, p->crossBlock);
\r
881 p->crossBlock = NULL;
\r
886 for (i = 0; i < MTDEC__THREADS_MAX; i++)
\r
887 if (i > p->numStartedThreads
\r
888 || p->numFilledThreads <=
\r
889 (i >= p->filledThreadStart ?
\r
890 i - p->filledThreadStart :
\r
891 i + p->numStartedThreads - p->filledThreadStart))
\r
892 MtDecThread_FreeInBufs(&p->threads[i]);
\r
895 return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
\r
899 const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
\r
901 while (p->numFilledThreads != 0)
\r
903 CMtDecThread *t = &p->threads[p->filledThreadStart];
\r
908 void *link = t->inBuf;
\r
909 void *next = ((CMtDecBufLink *)link)->next;
\r
910 ISzAlloc_Free(p->alloc, link);
\r
914 if (t->inDataSize == 0)
\r
916 MtDecThread_FreeInBufs(t);
\r
917 if (--p->numFilledThreads == 0)
\r
919 if (++p->filledThreadStart == p->numStartedThreads)
\r
920 p->filledThreadStart = 0;
\r
921 t = &p->threads[p->filledThreadStart];
\r
926 size_t lim = t->inDataSize_Start;
\r
928 t->inDataSize_Start = 0;
\r
931 UInt64 rem = t->inDataSize;
\r
932 lim = p->inBufSize;
\r
936 t->inDataSize -= lim;
\r
938 return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
\r
943 size_t crossSize = p->crossEnd - p->crossStart;
\r
944 if (crossSize != 0)
\r
946 const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
\r
947 *inLim = crossSize;
\r
955 ISzAlloc_Free(p->alloc, p->crossBlock);
\r
956 p->crossBlock = NULL;
\r
963 void MtDec_Construct(CMtDec *p)
\r
967 p->inBufSize = (size_t)1 << 18;
\r
969 p->numThreadsMax = 0;
\r
971 p->inStream = NULL;
\r
973 // p->inData = NULL;
\r
974 // p->inDataSize = 0;
\r
976 p->crossBlock = NULL;
\r
980 p->numFilledThreads = 0;
\r
982 p->progress = NULL;
\r
985 p->mtCallback = NULL;
\r
986 p->mtCallbackObject = NULL;
\r
988 p->allocatedBufsSize = 0;
\r
990 for (i = 0; i < MTDEC__THREADS_MAX; i++)
\r
992 CMtDecThread *t = &p->threads[i];
\r
996 Event_Construct(&t->canRead);
\r
997 Event_Construct(&t->canWrite);
\r
998 Thread_Construct(&t->thread);
\r
1001 // Event_Construct(&p->finishedEvent);
\r
1003 CriticalSection_Init(&p->mtProgress.cs);
\r
1007 static void MtDec_Free(CMtDec *p)
\r
1011 p->exitThread = True;
\r
1013 for (i = 0; i < MTDEC__THREADS_MAX; i++)
\r
1014 MtDecThread_Destruct(&p->threads[i]);
\r
1016 // Event_Close(&p->finishedEvent);
\r
1018 if (p->crossBlock)
\r
1020 ISzAlloc_Free(p->alloc, p->crossBlock);
\r
1021 p->crossBlock = NULL;
\r
1026 void MtDec_Destruct(CMtDec *p)
\r
1030 CriticalSection_Delete(&p->mtProgress.cs);
\r
1034 SRes MtDec_Code(CMtDec *p)
\r
1038 p->inProcessed = 0;
\r
1040 p->blockIndex = 1; // it must be larger than not_defined index (0)
\r
1041 p->isAllocError = False;
\r
1042 p->overflow = False;
\r
1043 p->threadingErrorSRes = SZ_OK;
\r
1045 p->needContinue = True;
\r
1047 p->readWasFinished = False;
\r
1048 p->needInterrupt = False;
\r
1049 p->interruptIndex = (UInt64)(Int64)-1;
\r
1051 p->readProcessed = 0;
\r
1052 p->readRes = SZ_OK;
\r
1053 p->codeRes = SZ_OK;
\r
1054 p->wasInterrupted = False;
\r
1056 p->crossStart = 0;
\r
1059 p->filledThreadStart = 0;
\r
1060 p->numFilledThreads = 0;
\r
1063 unsigned numThreads = p->numThreadsMax;
\r
1064 if (numThreads > MTDEC__THREADS_MAX)
\r
1065 numThreads = MTDEC__THREADS_MAX;
\r
1066 p->numStartedThreads_Limit = numThreads;
\r
1067 p->numStartedThreads = 0;
\r
1070 if (p->inBufSize != p->allocatedBufsSize)
\r
1072 for (i = 0; i < MTDEC__THREADS_MAX; i++)
\r
1074 CMtDecThread *t = &p->threads[i];
\r
1076 MtDecThread_FreeInBufs(t);
\r
1078 if (p->crossBlock)
\r
1080 ISzAlloc_Free(p->alloc, p->crossBlock);
\r
1081 p->crossBlock = NULL;
\r
1084 p->allocatedBufsSize = p->inBufSize;
\r
1087 MtProgress_Init(&p->mtProgress, p->progress);
\r
1089 // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
\r
1090 p->exitThread = False;
\r
1091 p->exitThreadWRes = 0;
\r
1096 CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
\r
1097 // wres = MtDecThread_CreateAndStart(nextThread);
\r
1098 wres = MtDecThread_CreateEvents(nextThread);
\r
1099 if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
\r
1100 if (wres == 0) { wres = Event_Set(&nextThread->canRead);
\r
1101 if (wres == 0) { THREAD_FUNC_RET_TYPE res = ThreadFunc(nextThread);
\r
1102 wres = (WRes)(UINT_PTR)res;
\r
1105 p->needContinue = False;
\r
1106 MtDec_CloseThreads(p);
\r
1109 // wres = 17; // for test
\r
1110 // wres = Event_Wait(&p->finishedEvent);
\r
1112 sres = MY_SRes_HRESULT_FROM_WRes(wres);
\r
1115 p->threadingErrorSRes = sres;
\r
1120 // || p->mtc.codeRes == SZ_ERROR_MEM
\r
1122 || p->threadingErrorSRes != SZ_OK
\r
1125 // p->needContinue = True;
\r
1128 p->needContinue = False;
\r
1130 if (p->needContinue)
\r
1133 // if (sres != SZ_OK)
\r
1135 // return SZ_ERROR_FAIL;
\r