update libchdr
[pcsx_rearmed.git] / deps / libchdr / deps / lzma-22.01 / src / MtDec.c
CommitLineData
9e052883 1/* MtDec.c -- Multi-thread Decoder\r
22021-12-21 : Igor Pavlov : Public domain */\r
3\r
4#include "Precomp.h"\r
5\r
6// #define SHOW_DEBUG_INFO\r
7\r
8// #include <stdio.h>\r
9#include <string.h>\r
10\r
11#ifdef SHOW_DEBUG_INFO\r
12#include <stdio.h>\r
13#endif\r
14\r
15#include "MtDec.h"\r
16\r
17#ifndef _7ZIP_ST\r
18\r
19#ifdef SHOW_DEBUG_INFO\r
20#define PRF(x) x\r
21#else\r
22#define PRF(x)\r
23#endif\r
24\r
25#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))\r
26\r
27void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)\r
28{\r
29 p->progress = progress;\r
30 p->res = SZ_OK;\r
31 p->totalInSize = 0;\r
32 p->totalOutSize = 0;\r
33}\r
34\r
35\r
36SRes MtProgress_Progress_ST(CMtProgress *p)\r
37{\r
38 if (p->res == SZ_OK && p->progress)\r
39 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)\r
40 p->res = SZ_ERROR_PROGRESS;\r
41 return p->res;\r
42}\r
43\r
44\r
45SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)\r
46{\r
47 SRes res;\r
48 CriticalSection_Enter(&p->cs);\r
49 \r
50 p->totalInSize += inSize;\r
51 p->totalOutSize += outSize;\r
52 if (p->res == SZ_OK && p->progress)\r
53 if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)\r
54 p->res = SZ_ERROR_PROGRESS;\r
55 res = p->res;\r
56 \r
57 CriticalSection_Leave(&p->cs);\r
58 return res;\r
59}\r
60\r
61\r
62SRes MtProgress_GetError(CMtProgress *p)\r
63{\r
64 SRes res;\r
65 CriticalSection_Enter(&p->cs);\r
66 res = p->res;\r
67 CriticalSection_Leave(&p->cs);\r
68 return res;\r
69}\r
70\r
71\r
72void MtProgress_SetError(CMtProgress *p, SRes res)\r
73{\r
74 CriticalSection_Enter(&p->cs);\r
75 if (p->res == SZ_OK)\r
76 p->res = res;\r
77 CriticalSection_Leave(&p->cs);\r
78}\r
79\r
80\r
81#define RINOK_THREAD(x) RINOK_WRes(x)\r
82\r
83\r
84static WRes ArEvent_OptCreate_And_Reset(CEvent *p)\r
85{\r
86 if (Event_IsCreated(p))\r
87 return Event_Reset(p);\r
88 return AutoResetEvent_CreateNotSignaled(p);\r
89}\r
90\r
91\r
92struct __CMtDecBufLink\r
93{\r
94 struct __CMtDecBufLink *next;\r
95 void *pad[3];\r
96};\r
97\r
98typedef struct __CMtDecBufLink CMtDecBufLink;\r
99\r
100#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)\r
101#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)\r
102\r
103\r
104\r
105static THREAD_FUNC_DECL ThreadFunc(void *pp);\r
106\r
107\r
108static WRes MtDecThread_CreateEvents(CMtDecThread *t)\r
109{\r
110 WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);\r
111 if (wres == 0)\r
112 {\r
113 wres = ArEvent_OptCreate_And_Reset(&t->canRead);\r
114 if (wres == 0)\r
115 return SZ_OK;\r
116 }\r
117 return wres;\r
118}\r
119\r
120\r
121static SRes MtDecThread_CreateAndStart(CMtDecThread *t)\r
122{\r
123 WRes wres = MtDecThread_CreateEvents(t);\r
124 // wres = 17; // for test\r
125 if (wres == 0)\r
126 {\r
127 if (Thread_WasCreated(&t->thread))\r
128 return SZ_OK;\r
129 wres = Thread_Create(&t->thread, ThreadFunc, t);\r
130 if (wres == 0)\r
131 return SZ_OK;\r
132 }\r
133 return MY_SRes_HRESULT_FROM_WRes(wres);\r
134}\r
135\r
136\r
137void MtDecThread_FreeInBufs(CMtDecThread *t)\r
138{\r
139 if (t->inBuf)\r
140 {\r
141 void *link = t->inBuf;\r
142 t->inBuf = NULL;\r
143 do\r
144 {\r
145 void *next = ((CMtDecBufLink *)link)->next;\r
146 ISzAlloc_Free(t->mtDec->alloc, link);\r
147 link = next;\r
148 }\r
149 while (link);\r
150 }\r
151}\r
152\r
153\r
154static void MtDecThread_CloseThread(CMtDecThread *t)\r
155{\r
156 if (Thread_WasCreated(&t->thread))\r
157 {\r
158 Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */\r
159 Event_Set(&t->canRead);\r
160 Thread_Wait_Close(&t->thread);\r
161 }\r
162\r
163 Event_Close(&t->canRead);\r
164 Event_Close(&t->canWrite);\r
165}\r
166\r
167static void MtDec_CloseThreads(CMtDec *p)\r
168{\r
169 unsigned i;\r
170 for (i = 0; i < MTDEC__THREADS_MAX; i++)\r
171 MtDecThread_CloseThread(&p->threads[i]);\r
172}\r
173\r
174static void MtDecThread_Destruct(CMtDecThread *t)\r
175{\r
176 MtDecThread_CloseThread(t);\r
177 MtDecThread_FreeInBufs(t);\r
178}\r
179\r
180\r
181\r
182static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)\r
183{\r
184 size_t size = *processedSize;\r
185 *processedSize = 0;\r
186 while (size != 0)\r
187 {\r
188 size_t cur = size;\r
189 SRes res = ISeqInStream_Read(stream, data, &cur);\r
190 *processedSize += cur;\r
191 data += cur;\r
192 size -= cur;\r
193 RINOK(res);\r
194 if (cur == 0)\r
195 return SZ_OK;\r
196 }\r
197 return SZ_OK;\r
198}\r
199\r
200\r
201static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)\r
202{\r
203 SRes res;\r
204 CriticalSection_Enter(&p->mtProgress.cs);\r
205 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);\r
206 res = p->mtProgress.res;\r
207 CriticalSection_Leave(&p->mtProgress.cs);\r
208 return res;\r
209}\r
210\r
211static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)\r
212{\r
213 SRes res;\r
214 CriticalSection_Enter(&p->mtProgress.cs);\r
215\r
216 p->mtProgress.totalInSize += inSize;\r
217 p->mtProgress.totalOutSize += outSize;\r
218 if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)\r
219 if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)\r
220 p->mtProgress.res = SZ_ERROR_PROGRESS;\r
221\r
222 *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);\r
223 res = p->mtProgress.res;\r
224 \r
225 CriticalSection_Leave(&p->mtProgress.cs);\r
226\r
227 return res;\r
228}\r
229\r
230static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)\r
231{\r
232 CriticalSection_Enter(&p->mtProgress.cs);\r
233 if (!p->needInterrupt || interruptIndex < p->interruptIndex)\r
234 {\r
235 p->interruptIndex = interruptIndex;\r
236 p->needInterrupt = True;\r
237 }\r
238 CriticalSection_Leave(&p->mtProgress.cs);\r
239}\r
240\r
241Byte *MtDec_GetCrossBuff(CMtDec *p)\r
242{\r
243 Byte *cr = p->crossBlock;\r
244 if (!cr)\r
245 {\r
246 cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);\r
247 if (!cr)\r
248 return NULL;\r
249 p->crossBlock = cr;\r
250 }\r
251 return MTDEC__DATA_PTR_FROM_LINK(cr);\r
252}\r
253\r
254\r
255/*\r
256 ThreadFunc2() returns:\r
257 0 - in all normal cases (even for stream error or memory allocation error)\r
258 (!= 0) - WRes error return by system threading function\r
259*/\r
260\r
261// #define MTDEC_ProgessStep (1 << 22)\r
262#define MTDEC_ProgessStep (1 << 0)\r
263\r
264static WRes ThreadFunc2(CMtDecThread *t)\r
265{\r
266 CMtDec *p = t->mtDec;\r
267\r
268 PRF_STR_INT("ThreadFunc2", t->index);\r
269\r
270 // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);\r
271\r
272 for (;;)\r
273 {\r
274 SRes res, codeRes;\r
275 BoolInt wasInterrupted, isAllocError, overflow, finish;\r
276 SRes threadingErrorSRes;\r
277 BoolInt needCode, needWrite, needContinue;\r
278 \r
279 size_t inDataSize_Start;\r
280 UInt64 inDataSize;\r
281 // UInt64 inDataSize_Full;\r
282 \r
283 UInt64 blockIndex;\r
284\r
285 UInt64 inPrev = 0;\r
286 UInt64 outPrev = 0;\r
287 UInt64 inCodePos;\r
288 UInt64 outCodePos;\r
289 \r
290 Byte *afterEndData = NULL;\r
291 size_t afterEndData_Size = 0;\r
292 BoolInt afterEndData_IsCross = False;\r
293\r
294 BoolInt canCreateNewThread = False;\r
295 // CMtDecCallbackInfo parse;\r
296 CMtDecThread *nextThread;\r
297\r
298 PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index);\r
299\r
300 RINOK_THREAD(Event_Wait(&t->canRead));\r
301 if (p->exitThread)\r
302 return 0;\r
303\r
304 PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);\r
305\r
306 // if (t->index == 3) return 19; // for test\r
307\r
308 blockIndex = p->blockIndex++;\r
309\r
310 // PRF(printf("\ncanRead\n"))\r
311\r
312 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);\r
313\r
314 finish = p->readWasFinished;\r
315 needCode = False;\r
316 needWrite = False;\r
317 isAllocError = False;\r
318 overflow = False;\r
319\r
320 inDataSize_Start = 0;\r
321 inDataSize = 0;\r
322 // inDataSize_Full = 0;\r
323\r
324 if (res == SZ_OK && !wasInterrupted)\r
325 {\r
326 // if (p->inStream)\r
327 {\r
328 CMtDecBufLink *prev = NULL;\r
329 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;\r
330 size_t crossSize = p->crossEnd - p->crossStart;\r
331\r
332 PRF(printf("\ncrossSize = %d\n", crossSize));\r
333\r
334 for (;;)\r
335 {\r
336 if (!link)\r
337 {\r
338 link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);\r
339 if (!link)\r
340 {\r
341 finish = True;\r
342 // p->allocError_for_Read_BlockIndex = blockIndex;\r
343 isAllocError = True;\r
344 break;\r
345 }\r
346 link->next = NULL;\r
347 if (prev)\r
348 {\r
349 // static unsigned g_num = 0;\r
350 // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));\r
351 prev->next = link;\r
352 }\r
353 else\r
354 t->inBuf = (void *)link;\r
355 }\r
356\r
357 {\r
358 Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);\r
359 Byte *parseData = data;\r
360 size_t size;\r
361\r
362 if (crossSize != 0)\r
363 {\r
364 inDataSize = crossSize;\r
365 // inDataSize_Full = inDataSize;\r
366 inDataSize_Start = crossSize;\r
367 size = crossSize;\r
368 parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;\r
369 PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",\r
370 (int)p->crossStart, (int)p->crossEnd, (int)finish));\r
371 }\r
372 else\r
373 {\r
374 size = p->inBufSize;\r
375 \r
376 res = FullRead(p->inStream, data, &size);\r
377 \r
378 // size = 10; // test\r
379\r
380 inDataSize += size;\r
381 // inDataSize_Full = inDataSize;\r
382 if (!prev)\r
383 inDataSize_Start = size;\r
384\r
385 p->readProcessed += size;\r
386 finish = (size != p->inBufSize);\r
387 if (finish)\r
388 p->readWasFinished = True;\r
389 \r
390 // res = E_INVALIDARG; // test\r
391\r
392 if (res != SZ_OK)\r
393 {\r
394 // PRF(printf("\nRead error = %d\n", res))\r
395 // we want to decode all data before error\r
396 p->readRes = res;\r
397 // p->readError_BlockIndex = blockIndex;\r
398 p->readWasFinished = True;\r
399 finish = True;\r
400 res = SZ_OK;\r
401 // break;\r
402 }\r
403\r
404 if (inDataSize - inPrev >= MTDEC_ProgessStep)\r
405 {\r
406 res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);\r
407 if (res != SZ_OK || wasInterrupted)\r
408 break;\r
409 inPrev = inDataSize;\r
410 }\r
411 }\r
412\r
413 {\r
414 CMtDecCallbackInfo parse;\r
415\r
416 parse.startCall = (prev == NULL);\r
417 parse.src = parseData;\r
418 parse.srcSize = size;\r
419 parse.srcFinished = finish;\r
420 parse.canCreateNewThread = True;\r
421\r
422 PRF(printf("\nParse size = %d\n", (unsigned)size));\r
423\r
424 p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);\r
425\r
426 PRF(printf(" Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));\r
427\r
428 needWrite = True;\r
429 canCreateNewThread = parse.canCreateNewThread;\r
430\r
431 // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);\r
432 \r
433 if (\r
434 // parseRes != SZ_OK ||\r
435 // inDataSize - (size - parse.srcSize) > p->inBlockMax\r
436 // ||\r
437 parse.state == MTDEC_PARSE_OVERFLOW\r
438 // || wasInterrupted\r
439 )\r
440 {\r
441 // Overflow or Parse error - switch from MT decoding to ST decoding\r
442 finish = True;\r
443 overflow = True;\r
444\r
445 {\r
446 PRF(printf("\n Overflow"));\r
447 // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));\r
448 PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));\r
449 }\r
450 \r
451 if (crossSize != 0)\r
452 memcpy(data, parseData, size);\r
453 p->crossStart = 0;\r
454 p->crossEnd = 0;\r
455 break;\r
456 }\r
457\r
458 if (crossSize != 0)\r
459 {\r
460 memcpy(data, parseData, parse.srcSize);\r
461 p->crossStart += parse.srcSize;\r
462 }\r
463\r
464 if (parse.state != MTDEC_PARSE_CONTINUE || finish)\r
465 {\r
466 // we don't need to parse in current thread anymore\r
467\r
468 if (parse.state == MTDEC_PARSE_END)\r
469 finish = True;\r
470\r
471 needCode = True;\r
472 // p->crossFinished = finish;\r
473\r
474 if (parse.srcSize == size)\r
475 {\r
476 // full parsed - no cross transfer\r
477 p->crossStart = 0;\r
478 p->crossEnd = 0;\r
479 break;\r
480 }\r
481\r
482 if (parse.state == MTDEC_PARSE_END)\r
483 {\r
484 afterEndData = parseData + parse.srcSize;\r
485 afterEndData_Size = size - parse.srcSize;\r
486 if (crossSize != 0)\r
487 afterEndData_IsCross = True;\r
488 // we reduce data size to required bytes (parsed only)\r
489 inDataSize -= afterEndData_Size;\r
490 if (!prev)\r
491 inDataSize_Start = parse.srcSize;\r
492 break;\r
493 }\r
494\r
495 {\r
496 // partial parsed - need cross transfer\r
497 if (crossSize != 0)\r
498 inDataSize = parse.srcSize; // it's only parsed now\r
499 else\r
500 {\r
501 // partial parsed - is not in initial cross block - we need to copy new data to cross block\r
502 Byte *cr = MtDec_GetCrossBuff(p);\r
503 if (!cr)\r
504 {\r
505 {\r
506 PRF(printf("\ncross alloc error error\n"));\r
507 // res = SZ_ERROR_MEM;\r
508 finish = True;\r
509 // p->allocError_for_Read_BlockIndex = blockIndex;\r
510 isAllocError = True;\r
511 break;\r
512 }\r
513 }\r
514\r
515 {\r
516 size_t crSize = size - parse.srcSize;\r
517 inDataSize -= crSize;\r
518 p->crossEnd = crSize;\r
519 p->crossStart = 0;\r
520 memcpy(cr, parseData + parse.srcSize, crSize);\r
521 }\r
522 }\r
523\r
524 // inDataSize_Full = inDataSize;\r
525 if (!prev)\r
526 inDataSize_Start = parse.srcSize; // it's partial size (parsed only)\r
527\r
528 finish = False;\r
529 break;\r
530 }\r
531 }\r
532\r
533 if (parse.srcSize != size)\r
534 {\r
535 res = SZ_ERROR_FAIL;\r
536 PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));\r
537 break;\r
538 }\r
539 }\r
540 }\r
541 \r
542 prev = link;\r
543 link = link->next;\r
544\r
545 if (crossSize != 0)\r
546 {\r
547 crossSize = 0;\r
548 p->crossStart = 0;\r
549 p->crossEnd = 0;\r
550 }\r
551 }\r
552 }\r
553\r
554 if (res == SZ_OK)\r
555 res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);\r
556 }\r
557\r
558 codeRes = SZ_OK;\r
559\r
560 if (res == SZ_OK && needCode && !wasInterrupted)\r
561 {\r
562 codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);\r
563 if (codeRes != SZ_OK)\r
564 {\r
565 needCode = False;\r
566 finish = True;\r
567 // SZ_ERROR_MEM is expected error here.\r
568 // if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.\r
569 // if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.\r
570 }\r
571 }\r
572 \r
573 if (res != SZ_OK || wasInterrupted)\r
574 finish = True;\r
575 \r
576 nextThread = NULL;\r
577 threadingErrorSRes = SZ_OK;\r
578\r
579 if (!finish)\r
580 {\r
581 if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)\r
582 {\r
583 SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);\r
584 if (res2 == SZ_OK)\r
585 {\r
586 // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));\r
587 p->numStartedThreads++;\r
588 }\r
589 else\r
590 {\r
591 PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));\r
592 if (p->numStartedThreads == 1)\r
593 {\r
594 // if only one thread is possible, we leave muti-threading code\r
595 finish = True;\r
596 needCode = False;\r
597 threadingErrorSRes = res2;\r
598 }\r
599 else\r
600 p->numStartedThreads_Limit = p->numStartedThreads;\r
601 }\r
602 }\r
603 \r
604 if (!finish)\r
605 {\r
606 unsigned nextIndex = t->index + 1;\r
607 nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];\r
608 RINOK_THREAD(Event_Set(&nextThread->canRead))\r
609 // We have started executing for new iteration (with next thread)\r
610 // And that next thread now is responsible for possible exit from decoding (threading_code)\r
611 }\r
612 }\r
613\r
614 // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)\r
615 // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case\r
616 // if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):\r
617 // - if (needContinue) after Write(&needContinue), we restore decoding with new iteration\r
618 // - otherwise we stop decoding and exit from ThreadFunc2()\r
619\r
620 // Don't change (finish) variable in the further code\r
621\r
622\r
623 // ---------- CODE ----------\r
624\r
625 inPrev = 0;\r
626 outPrev = 0;\r
627 inCodePos = 0;\r
628 outCodePos = 0;\r
629\r
630 if (res == SZ_OK && needCode && codeRes == SZ_OK)\r
631 {\r
632 BoolInt isStartBlock = True;\r
633 CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;\r
634\r
635 for (;;)\r
636 {\r
637 size_t inSize;\r
638 int stop;\r
639\r
640 if (isStartBlock)\r
641 inSize = inDataSize_Start;\r
642 else\r
643 {\r
644 UInt64 rem = inDataSize - inCodePos;\r
645 inSize = p->inBufSize;\r
646 if (inSize > rem)\r
647 inSize = (size_t)rem;\r
648 }\r
649\r
650 inCodePos += inSize;\r
651 stop = True;\r
652\r
653 codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,\r
654 (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,\r
655 (inCodePos == inDataSize), // srcFinished\r
656 &inCodePos, &outCodePos, &stop);\r
657 \r
658 if (codeRes != SZ_OK)\r
659 {\r
660 PRF(printf("\nCode Interrupt error = %x\n", codeRes));\r
661 // we interrupt only later blocks\r
662 MtDec_Interrupt(p, blockIndex);\r
663 break;\r
664 }\r
665\r
666 if (stop || inCodePos == inDataSize)\r
667 break;\r
668 \r
669 {\r
670 const UInt64 inDelta = inCodePos - inPrev;\r
671 const UInt64 outDelta = outCodePos - outPrev;\r
672 if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)\r
673 {\r
674 // Sleep(1);\r
675 res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);\r
676 if (res != SZ_OK || wasInterrupted)\r
677 break;\r
678 inPrev = inCodePos;\r
679 outPrev = outCodePos;\r
680 }\r
681 }\r
682\r
683 link = link->next;\r
684 isStartBlock = False;\r
685 }\r
686 }\r
687\r
688\r
689 // ---------- WRITE ----------\r
690 \r
691 RINOK_THREAD(Event_Wait(&t->canWrite));\r
692\r
693 {\r
694 BoolInt isErrorMode = False;\r
695 BoolInt canRecode = True;\r
696 BoolInt needWriteToStream = needWrite;\r
697\r
698 if (p->exitThread) return 0; // it's never executed in normal cases\r
699\r
700 if (p->wasInterrupted)\r
701 wasInterrupted = True;\r
702 else\r
703 {\r
704 if (codeRes != SZ_OK) // || !needCode // check it !!!\r
705 {\r
706 p->wasInterrupted = True;\r
707 p->codeRes = codeRes;\r
708 if (codeRes == SZ_ERROR_MEM)\r
709 isAllocError = True;\r
710 }\r
711 \r
712 if (threadingErrorSRes)\r
713 {\r
714 p->wasInterrupted = True;\r
715 p->threadingErrorSRes = threadingErrorSRes;\r
716 needWriteToStream = False;\r
717 }\r
718 if (isAllocError)\r
719 {\r
720 p->wasInterrupted = True;\r
721 p->isAllocError = True;\r
722 needWriteToStream = False;\r
723 }\r
724 if (overflow)\r
725 {\r
726 p->wasInterrupted = True;\r
727 p->overflow = True;\r
728 needWriteToStream = False;\r
729 }\r
730 }\r
731\r
732 if (needCode)\r
733 {\r
734 if (wasInterrupted)\r
735 {\r
736 inCodePos = 0;\r
737 outCodePos = 0;\r
738 }\r
739 {\r
740 const UInt64 inDelta = inCodePos - inPrev;\r
741 const UInt64 outDelta = outCodePos - outPrev;\r
742 // if (inDelta != 0 || outDelta != 0)\r
743 res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);\r
744 }\r
745 }\r
746\r
747 needContinue = (!finish);\r
748\r
749 // if (res == SZ_OK && needWrite && !wasInterrupted)\r
750 if (needWrite)\r
751 {\r
752 // p->inProcessed += inCodePos;\r
753\r
754 PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));\r
755\r
756 res = p->mtCallback->Write(p->mtCallbackObject, t->index,\r
757 res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite\r
758 afterEndData, afterEndData_Size, afterEndData_IsCross,\r
759 &needContinue,\r
760 &canRecode);\r
761\r
762 // res = SZ_ERROR_FAIL; // for test\r
763\r
764 PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));\r
765 PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));\r
766\r
767 if (res != SZ_OK)\r
768 {\r
769 PRF(printf("\nWrite error = %d\n", res));\r
770 isErrorMode = True;\r
771 p->wasInterrupted = True;\r
772 }\r
773 if (res != SZ_OK\r
774 || (!needContinue && !finish))\r
775 {\r
776 PRF(printf("\nWrite Interrupt error = %x\n", res));\r
777 MtDec_Interrupt(p, blockIndex);\r
778 }\r
779 }\r
780\r
781 if (canRecode)\r
782 if (!needCode\r
783 || res != SZ_OK\r
784 || p->wasInterrupted\r
785 || codeRes != SZ_OK\r
786 || wasInterrupted\r
787 || p->numFilledThreads != 0\r
788 || isErrorMode)\r
789 {\r
790 if (p->numFilledThreads == 0)\r
791 p->filledThreadStart = t->index;\r
792 if (inDataSize != 0 || !finish)\r
793 {\r
794 t->inDataSize_Start = inDataSize_Start;\r
795 t->inDataSize = inDataSize;\r
796 p->numFilledThreads++;\r
797 }\r
798 PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));\r
799 PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));\r
800 }\r
801\r
802 if (!finish)\r
803 {\r
804 RINOK_THREAD(Event_Set(&nextThread->canWrite));\r
805 }\r
806 else\r
807 {\r
808 if (needContinue)\r
809 {\r
810 // we restore decoding with new iteration\r
811 RINOK_THREAD(Event_Set(&p->threads[0].canWrite));\r
812 }\r
813 else\r
814 {\r
815 // we exit from decoding\r
816 if (t->index == 0)\r
817 return SZ_OK;\r
818 p->exitThread = True;\r
819 }\r
820 RINOK_THREAD(Event_Set(&p->threads[0].canRead));\r
821 }\r
822 }\r
823 }\r
824}\r
825\r
826#ifdef _WIN32\r
827#define USE_ALLOCA\r
828#endif\r
829\r
830#ifdef USE_ALLOCA\r
831#ifdef _WIN32\r
832#include <malloc.h>\r
833#else\r
834#include <stdlib.h>\r
835#endif\r
836#endif\r
837\r
838\r
839static THREAD_FUNC_DECL ThreadFunc1(void *pp)\r
840{\r
841 WRes res;\r
842\r
843 CMtDecThread *t = (CMtDecThread *)pp;\r
844 CMtDec *p;\r
845\r
846 // fprintf(stdout, "\n%d = %p\n", t->index, &t);\r
847\r
848 res = ThreadFunc2(t);\r
849 p = t->mtDec;\r
850 if (res == 0)\r
851 return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;\r
852 {\r
853 // it's unexpected situation for some threading function error\r
854 if (p->exitThreadWRes == 0)\r
855 p->exitThreadWRes = res;\r
856 PRF(printf("\nthread exit error = %d\n", res));\r
857 p->exitThread = True;\r
858 Event_Set(&p->threads[0].canRead);\r
859 Event_Set(&p->threads[0].canWrite);\r
860 MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));\r
861 }\r
862 return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;\r
863}\r
864\r
865static MY_NO_INLINE THREAD_FUNC_DECL ThreadFunc(void *pp)\r
866{\r
867 #ifdef USE_ALLOCA\r
868 CMtDecThread *t = (CMtDecThread *)pp;\r
869 // fprintf(stderr, "\n%d = %p - before", t->index, &t);\r
870 t->allocaPtr = alloca(t->index * 128);\r
871 #endif\r
872 return ThreadFunc1(pp);\r
873}\r
874\r
875\r
876int MtDec_PrepareRead(CMtDec *p)\r
877{\r
878 if (p->crossBlock && p->crossStart == p->crossEnd)\r
879 {\r
880 ISzAlloc_Free(p->alloc, p->crossBlock);\r
881 p->crossBlock = NULL;\r
882 }\r
883 \r
884 {\r
885 unsigned i;\r
886 for (i = 0; i < MTDEC__THREADS_MAX; i++)\r
887 if (i > p->numStartedThreads\r
888 || p->numFilledThreads <=\r
889 (i >= p->filledThreadStart ?\r
890 i - p->filledThreadStart :\r
891 i + p->numStartedThreads - p->filledThreadStart))\r
892 MtDecThread_FreeInBufs(&p->threads[i]);\r
893 }\r
894\r
895 return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);\r
896}\r
897\r
898 \r
899const Byte *MtDec_Read(CMtDec *p, size_t *inLim)\r
900{\r
901 while (p->numFilledThreads != 0)\r
902 {\r
903 CMtDecThread *t = &p->threads[p->filledThreadStart];\r
904 \r
905 if (*inLim != 0)\r
906 {\r
907 {\r
908 void *link = t->inBuf;\r
909 void *next = ((CMtDecBufLink *)link)->next;\r
910 ISzAlloc_Free(p->alloc, link);\r
911 t->inBuf = next;\r
912 }\r
913 \r
914 if (t->inDataSize == 0)\r
915 {\r
916 MtDecThread_FreeInBufs(t);\r
917 if (--p->numFilledThreads == 0)\r
918 break;\r
919 if (++p->filledThreadStart == p->numStartedThreads)\r
920 p->filledThreadStart = 0;\r
921 t = &p->threads[p->filledThreadStart];\r
922 }\r
923 }\r
924 \r
925 {\r
926 size_t lim = t->inDataSize_Start;\r
927 if (lim != 0)\r
928 t->inDataSize_Start = 0;\r
929 else\r
930 {\r
931 UInt64 rem = t->inDataSize;\r
932 lim = p->inBufSize;\r
933 if (lim > rem)\r
934 lim = (size_t)rem;\r
935 }\r
936 t->inDataSize -= lim;\r
937 *inLim = lim;\r
938 return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);\r
939 }\r
940 }\r
941\r
942 {\r
943 size_t crossSize = p->crossEnd - p->crossStart;\r
944 if (crossSize != 0)\r
945 {\r
946 const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;\r
947 *inLim = crossSize;\r
948 p->crossStart = 0;\r
949 p->crossEnd = 0;\r
950 return data;\r
951 }\r
952 *inLim = 0;\r
953 if (p->crossBlock)\r
954 {\r
955 ISzAlloc_Free(p->alloc, p->crossBlock);\r
956 p->crossBlock = NULL;\r
957 }\r
958 return NULL;\r
959 }\r
960}\r
961\r
962\r
963void MtDec_Construct(CMtDec *p)\r
964{\r
965 unsigned i;\r
966 \r
967 p->inBufSize = (size_t)1 << 18;\r
968\r
969 p->numThreadsMax = 0;\r
970\r
971 p->inStream = NULL;\r
972 \r
973 // p->inData = NULL;\r
974 // p->inDataSize = 0;\r
975\r
976 p->crossBlock = NULL;\r
977 p->crossStart = 0;\r
978 p->crossEnd = 0;\r
979\r
980 p->numFilledThreads = 0;\r
981\r
982 p->progress = NULL;\r
983 p->alloc = NULL;\r
984\r
985 p->mtCallback = NULL;\r
986 p->mtCallbackObject = NULL;\r
987\r
988 p->allocatedBufsSize = 0;\r
989\r
990 for (i = 0; i < MTDEC__THREADS_MAX; i++)\r
991 {\r
992 CMtDecThread *t = &p->threads[i];\r
993 t->mtDec = p;\r
994 t->index = i;\r
995 t->inBuf = NULL;\r
996 Event_Construct(&t->canRead);\r
997 Event_Construct(&t->canWrite);\r
998 Thread_Construct(&t->thread);\r
999 }\r
1000\r
1001 // Event_Construct(&p->finishedEvent);\r
1002\r
1003 CriticalSection_Init(&p->mtProgress.cs);\r
1004}\r
1005\r
1006\r
1007static void MtDec_Free(CMtDec *p)\r
1008{\r
1009 unsigned i;\r
1010\r
1011 p->exitThread = True;\r
1012\r
1013 for (i = 0; i < MTDEC__THREADS_MAX; i++)\r
1014 MtDecThread_Destruct(&p->threads[i]);\r
1015\r
1016 // Event_Close(&p->finishedEvent);\r
1017\r
1018 if (p->crossBlock)\r
1019 {\r
1020 ISzAlloc_Free(p->alloc, p->crossBlock);\r
1021 p->crossBlock = NULL;\r
1022 }\r
1023}\r
1024\r
1025\r
1026void MtDec_Destruct(CMtDec *p)\r
1027{\r
1028 MtDec_Free(p);\r
1029\r
1030 CriticalSection_Delete(&p->mtProgress.cs);\r
1031}\r
1032\r
1033\r
1034SRes MtDec_Code(CMtDec *p)\r
1035{\r
1036 unsigned i;\r
1037\r
1038 p->inProcessed = 0;\r
1039\r
1040 p->blockIndex = 1; // it must be larger than not_defined index (0)\r
1041 p->isAllocError = False;\r
1042 p->overflow = False;\r
1043 p->threadingErrorSRes = SZ_OK;\r
1044\r
1045 p->needContinue = True;\r
1046\r
1047 p->readWasFinished = False;\r
1048 p->needInterrupt = False;\r
1049 p->interruptIndex = (UInt64)(Int64)-1;\r
1050\r
1051 p->readProcessed = 0;\r
1052 p->readRes = SZ_OK;\r
1053 p->codeRes = SZ_OK;\r
1054 p->wasInterrupted = False;\r
1055\r
1056 p->crossStart = 0;\r
1057 p->crossEnd = 0;\r
1058\r
1059 p->filledThreadStart = 0;\r
1060 p->numFilledThreads = 0;\r
1061\r
1062 {\r
1063 unsigned numThreads = p->numThreadsMax;\r
1064 if (numThreads > MTDEC__THREADS_MAX)\r
1065 numThreads = MTDEC__THREADS_MAX;\r
1066 p->numStartedThreads_Limit = numThreads;\r
1067 p->numStartedThreads = 0;\r
1068 }\r
1069\r
1070 if (p->inBufSize != p->allocatedBufsSize)\r
1071 {\r
1072 for (i = 0; i < MTDEC__THREADS_MAX; i++)\r
1073 {\r
1074 CMtDecThread *t = &p->threads[i];\r
1075 if (t->inBuf)\r
1076 MtDecThread_FreeInBufs(t);\r
1077 }\r
1078 if (p->crossBlock)\r
1079 {\r
1080 ISzAlloc_Free(p->alloc, p->crossBlock);\r
1081 p->crossBlock = NULL;\r
1082 }\r
1083\r
1084 p->allocatedBufsSize = p->inBufSize;\r
1085 }\r
1086\r
1087 MtProgress_Init(&p->mtProgress, p->progress);\r
1088\r
1089 // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));\r
1090 p->exitThread = False;\r
1091 p->exitThreadWRes = 0;\r
1092\r
1093 {\r
1094 WRes wres;\r
1095 SRes sres;\r
1096 CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];\r
1097 // wres = MtDecThread_CreateAndStart(nextThread);\r
1098 wres = MtDecThread_CreateEvents(nextThread);\r
1099 if (wres == 0) { wres = Event_Set(&nextThread->canWrite);\r
1100 if (wres == 0) { wres = Event_Set(&nextThread->canRead);\r
1101 if (wres == 0) { THREAD_FUNC_RET_TYPE res = ThreadFunc(nextThread);\r
1102 wres = (WRes)(UINT_PTR)res;\r
1103 if (wres != 0)\r
1104 {\r
1105 p->needContinue = False;\r
1106 MtDec_CloseThreads(p);\r
1107 }}}}\r
1108\r
1109 // wres = 17; // for test\r
1110 // wres = Event_Wait(&p->finishedEvent);\r
1111\r
1112 sres = MY_SRes_HRESULT_FROM_WRes(wres);\r
1113\r
1114 if (sres != 0)\r
1115 p->threadingErrorSRes = sres;\r
1116\r
1117 if (\r
1118 // wres == 0\r
1119 // wres != 0\r
1120 // || p->mtc.codeRes == SZ_ERROR_MEM\r
1121 p->isAllocError\r
1122 || p->threadingErrorSRes != SZ_OK\r
1123 || p->overflow)\r
1124 {\r
1125 // p->needContinue = True;\r
1126 }\r
1127 else\r
1128 p->needContinue = False;\r
1129 \r
1130 if (p->needContinue)\r
1131 return SZ_OK;\r
1132\r
1133 // if (sres != SZ_OK)\r
1134 return sres;\r
1135 // return SZ_ERROR_FAIL;\r
1136 }\r
1137}\r
1138\r
1139#endif\r