9e052883 |
1 | /* MtDec.c -- Multi-thread Decoder\r |
2 | 2021-12-21 : Igor Pavlov : Public domain */\r |
3 | \r |
4 | #include "Precomp.h"\r |
5 | \r |
6 | // #define SHOW_DEBUG_INFO\r |
7 | \r |
8 | // #include <stdio.h>\r |
9 | #include <string.h>\r |
10 | \r |
11 | #ifdef SHOW_DEBUG_INFO\r |
12 | #include <stdio.h>\r |
13 | #endif\r |
14 | \r |
15 | #include "MtDec.h"\r |
16 | \r |
17 | #ifndef _7ZIP_ST\r |
18 | \r |
19 | #ifdef SHOW_DEBUG_INFO\r |
20 | #define PRF(x) x\r |
21 | #else\r |
22 | #define PRF(x)\r |
23 | #endif\r |
24 | \r |
25 | #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))\r |
26 | \r |
27 | void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)\r |
28 | {\r |
29 | p->progress = progress;\r |
30 | p->res = SZ_OK;\r |
31 | p->totalInSize = 0;\r |
32 | p->totalOutSize = 0;\r |
33 | }\r |
34 | \r |
35 | \r |
36 | SRes MtProgress_Progress_ST(CMtProgress *p)\r |
37 | {\r |
38 | if (p->res == SZ_OK && p->progress)\r |
39 | if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)\r |
40 | p->res = SZ_ERROR_PROGRESS;\r |
41 | return p->res;\r |
42 | }\r |
43 | \r |
44 | \r |
45 | SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)\r |
46 | {\r |
47 | SRes res;\r |
48 | CriticalSection_Enter(&p->cs);\r |
49 | \r |
50 | p->totalInSize += inSize;\r |
51 | p->totalOutSize += outSize;\r |
52 | if (p->res == SZ_OK && p->progress)\r |
53 | if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)\r |
54 | p->res = SZ_ERROR_PROGRESS;\r |
55 | res = p->res;\r |
56 | \r |
57 | CriticalSection_Leave(&p->cs);\r |
58 | return res;\r |
59 | }\r |
60 | \r |
61 | \r |
62 | SRes MtProgress_GetError(CMtProgress *p)\r |
63 | {\r |
64 | SRes res;\r |
65 | CriticalSection_Enter(&p->cs);\r |
66 | res = p->res;\r |
67 | CriticalSection_Leave(&p->cs);\r |
68 | return res;\r |
69 | }\r |
70 | \r |
71 | \r |
72 | void MtProgress_SetError(CMtProgress *p, SRes res)\r |
73 | {\r |
74 | CriticalSection_Enter(&p->cs);\r |
75 | if (p->res == SZ_OK)\r |
76 | p->res = res;\r |
77 | CriticalSection_Leave(&p->cs);\r |
78 | }\r |
79 | \r |
80 | \r |
81 | #define RINOK_THREAD(x) RINOK_WRes(x)\r |
82 | \r |
83 | \r |
84 | static WRes ArEvent_OptCreate_And_Reset(CEvent *p)\r |
85 | {\r |
86 | if (Event_IsCreated(p))\r |
87 | return Event_Reset(p);\r |
88 | return AutoResetEvent_CreateNotSignaled(p);\r |
89 | }\r |
90 | \r |
91 | \r |
92 | struct __CMtDecBufLink\r |
93 | {\r |
94 | struct __CMtDecBufLink *next;\r |
95 | void *pad[3];\r |
96 | };\r |
97 | \r |
98 | typedef struct __CMtDecBufLink CMtDecBufLink;\r |
99 | \r |
100 | #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)\r |
101 | #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)\r |
102 | \r |
103 | \r |
104 | \r |
105 | static THREAD_FUNC_DECL ThreadFunc(void *pp);\r |
106 | \r |
107 | \r |
108 | static WRes MtDecThread_CreateEvents(CMtDecThread *t)\r |
109 | {\r |
110 | WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);\r |
111 | if (wres == 0)\r |
112 | {\r |
113 | wres = ArEvent_OptCreate_And_Reset(&t->canRead);\r |
114 | if (wres == 0)\r |
115 | return SZ_OK;\r |
116 | }\r |
117 | return wres;\r |
118 | }\r |
119 | \r |
120 | \r |
121 | static SRes MtDecThread_CreateAndStart(CMtDecThread *t)\r |
122 | {\r |
123 | WRes wres = MtDecThread_CreateEvents(t);\r |
124 | // wres = 17; // for test\r |
125 | if (wres == 0)\r |
126 | {\r |
127 | if (Thread_WasCreated(&t->thread))\r |
128 | return SZ_OK;\r |
129 | wres = Thread_Create(&t->thread, ThreadFunc, t);\r |
130 | if (wres == 0)\r |
131 | return SZ_OK;\r |
132 | }\r |
133 | return MY_SRes_HRESULT_FROM_WRes(wres);\r |
134 | }\r |
135 | \r |
136 | \r |
137 | void MtDecThread_FreeInBufs(CMtDecThread *t)\r |
138 | {\r |
139 | if (t->inBuf)\r |
140 | {\r |
141 | void *link = t->inBuf;\r |
142 | t->inBuf = NULL;\r |
143 | do\r |
144 | {\r |
145 | void *next = ((CMtDecBufLink *)link)->next;\r |
146 | ISzAlloc_Free(t->mtDec->alloc, link);\r |
147 | link = next;\r |
148 | }\r |
149 | while (link);\r |
150 | }\r |
151 | }\r |
152 | \r |
153 | \r |
154 | static void MtDecThread_CloseThread(CMtDecThread *t)\r |
155 | {\r |
156 | if (Thread_WasCreated(&t->thread))\r |
157 | {\r |
158 | Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */\r |
159 | Event_Set(&t->canRead);\r |
160 | Thread_Wait_Close(&t->thread);\r |
161 | }\r |
162 | \r |
163 | Event_Close(&t->canRead);\r |
164 | Event_Close(&t->canWrite);\r |
165 | }\r |
166 | \r |
167 | static void MtDec_CloseThreads(CMtDec *p)\r |
168 | {\r |
169 | unsigned i;\r |
170 | for (i = 0; i < MTDEC__THREADS_MAX; i++)\r |
171 | MtDecThread_CloseThread(&p->threads[i]);\r |
172 | }\r |
173 | \r |
174 | static void MtDecThread_Destruct(CMtDecThread *t)\r |
175 | {\r |
176 | MtDecThread_CloseThread(t);\r |
177 | MtDecThread_FreeInBufs(t);\r |
178 | }\r |
179 | \r |
180 | \r |
181 | \r |
182 | static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)\r |
183 | {\r |
184 | size_t size = *processedSize;\r |
185 | *processedSize = 0;\r |
186 | while (size != 0)\r |
187 | {\r |
188 | size_t cur = size;\r |
189 | SRes res = ISeqInStream_Read(stream, data, &cur);\r |
190 | *processedSize += cur;\r |
191 | data += cur;\r |
192 | size -= cur;\r |
193 | RINOK(res);\r |
194 | if (cur == 0)\r |
195 | return SZ_OK;\r |
196 | }\r |
197 | return SZ_OK;\r |
198 | }\r |
199 | \r |
200 | \r |
201 | static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)\r |
202 | {\r |
203 | SRes res;\r |
204 | CriticalSection_Enter(&p->mtProgress.cs);\r |
205 | *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);\r |
206 | res = p->mtProgress.res;\r |
207 | CriticalSection_Leave(&p->mtProgress.cs);\r |
208 | return res;\r |
209 | }\r |
210 | \r |
211 | static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)\r |
212 | {\r |
213 | SRes res;\r |
214 | CriticalSection_Enter(&p->mtProgress.cs);\r |
215 | \r |
216 | p->mtProgress.totalInSize += inSize;\r |
217 | p->mtProgress.totalOutSize += outSize;\r |
218 | if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)\r |
219 | if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)\r |
220 | p->mtProgress.res = SZ_ERROR_PROGRESS;\r |
221 | \r |
222 | *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);\r |
223 | res = p->mtProgress.res;\r |
224 | \r |
225 | CriticalSection_Leave(&p->mtProgress.cs);\r |
226 | \r |
227 | return res;\r |
228 | }\r |
229 | \r |
230 | static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)\r |
231 | {\r |
232 | CriticalSection_Enter(&p->mtProgress.cs);\r |
233 | if (!p->needInterrupt || interruptIndex < p->interruptIndex)\r |
234 | {\r |
235 | p->interruptIndex = interruptIndex;\r |
236 | p->needInterrupt = True;\r |
237 | }\r |
238 | CriticalSection_Leave(&p->mtProgress.cs);\r |
239 | }\r |
240 | \r |
241 | Byte *MtDec_GetCrossBuff(CMtDec *p)\r |
242 | {\r |
243 | Byte *cr = p->crossBlock;\r |
244 | if (!cr)\r |
245 | {\r |
246 | cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);\r |
247 | if (!cr)\r |
248 | return NULL;\r |
249 | p->crossBlock = cr;\r |
250 | }\r |
251 | return MTDEC__DATA_PTR_FROM_LINK(cr);\r |
252 | }\r |
253 | \r |
254 | \r |
255 | /*\r |
256 | ThreadFunc2() returns:\r |
257 | 0 - in all normal cases (even for stream error or memory allocation error)\r |
258 | (!= 0) - WRes error return by system threading function\r |
259 | */\r |
260 | \r |
261 | // #define MTDEC_ProgessStep (1 << 22)\r |
262 | #define MTDEC_ProgessStep (1 << 0)\r |
263 | \r |
264 | static WRes ThreadFunc2(CMtDecThread *t)\r |
265 | {\r |
266 | CMtDec *p = t->mtDec;\r |
267 | \r |
268 | PRF_STR_INT("ThreadFunc2", t->index);\r |
269 | \r |
270 | // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);\r |
271 | \r |
272 | for (;;)\r |
273 | {\r |
274 | SRes res, codeRes;\r |
275 | BoolInt wasInterrupted, isAllocError, overflow, finish;\r |
276 | SRes threadingErrorSRes;\r |
277 | BoolInt needCode, needWrite, needContinue;\r |
278 | \r |
279 | size_t inDataSize_Start;\r |
280 | UInt64 inDataSize;\r |
281 | // UInt64 inDataSize_Full;\r |
282 | \r |
283 | UInt64 blockIndex;\r |
284 | \r |
285 | UInt64 inPrev = 0;\r |
286 | UInt64 outPrev = 0;\r |
287 | UInt64 inCodePos;\r |
288 | UInt64 outCodePos;\r |
289 | \r |
290 | Byte *afterEndData = NULL;\r |
291 | size_t afterEndData_Size = 0;\r |
292 | BoolInt afterEndData_IsCross = False;\r |
293 | \r |
294 | BoolInt canCreateNewThread = False;\r |
295 | // CMtDecCallbackInfo parse;\r |
296 | CMtDecThread *nextThread;\r |
297 | \r |
298 | PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index);\r |
299 | \r |
300 | RINOK_THREAD(Event_Wait(&t->canRead));\r |
301 | if (p->exitThread)\r |
302 | return 0;\r |
303 | \r |
304 | PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);\r |
305 | \r |
306 | // if (t->index == 3) return 19; // for test\r |
307 | \r |
308 | blockIndex = p->blockIndex++;\r |
309 | \r |
310 | // PRF(printf("\ncanRead\n"))\r |
311 | \r |
312 | res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);\r |
313 | \r |
314 | finish = p->readWasFinished;\r |
315 | needCode = False;\r |
316 | needWrite = False;\r |
317 | isAllocError = False;\r |
318 | overflow = False;\r |
319 | \r |
320 | inDataSize_Start = 0;\r |
321 | inDataSize = 0;\r |
322 | // inDataSize_Full = 0;\r |
323 | \r |
324 | if (res == SZ_OK && !wasInterrupted)\r |
325 | {\r |
326 | // if (p->inStream)\r |
327 | {\r |
328 | CMtDecBufLink *prev = NULL;\r |
329 | CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;\r |
330 | size_t crossSize = p->crossEnd - p->crossStart;\r |
331 | \r |
332 | PRF(printf("\ncrossSize = %d\n", crossSize));\r |
333 | \r |
334 | for (;;)\r |
335 | {\r |
336 | if (!link)\r |
337 | {\r |
338 | link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);\r |
339 | if (!link)\r |
340 | {\r |
341 | finish = True;\r |
342 | // p->allocError_for_Read_BlockIndex = blockIndex;\r |
343 | isAllocError = True;\r |
344 | break;\r |
345 | }\r |
346 | link->next = NULL;\r |
347 | if (prev)\r |
348 | {\r |
349 | // static unsigned g_num = 0;\r |
350 | // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));\r |
351 | prev->next = link;\r |
352 | }\r |
353 | else\r |
354 | t->inBuf = (void *)link;\r |
355 | }\r |
356 | \r |
357 | {\r |
358 | Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);\r |
359 | Byte *parseData = data;\r |
360 | size_t size;\r |
361 | \r |
362 | if (crossSize != 0)\r |
363 | {\r |
364 | inDataSize = crossSize;\r |
365 | // inDataSize_Full = inDataSize;\r |
366 | inDataSize_Start = crossSize;\r |
367 | size = crossSize;\r |
368 | parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;\r |
369 | PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",\r |
370 | (int)p->crossStart, (int)p->crossEnd, (int)finish));\r |
371 | }\r |
372 | else\r |
373 | {\r |
374 | size = p->inBufSize;\r |
375 | \r |
376 | res = FullRead(p->inStream, data, &size);\r |
377 | \r |
378 | // size = 10; // test\r |
379 | \r |
380 | inDataSize += size;\r |
381 | // inDataSize_Full = inDataSize;\r |
382 | if (!prev)\r |
383 | inDataSize_Start = size;\r |
384 | \r |
385 | p->readProcessed += size;\r |
386 | finish = (size != p->inBufSize);\r |
387 | if (finish)\r |
388 | p->readWasFinished = True;\r |
389 | \r |
390 | // res = E_INVALIDARG; // test\r |
391 | \r |
392 | if (res != SZ_OK)\r |
393 | {\r |
394 | // PRF(printf("\nRead error = %d\n", res))\r |
395 | // we want to decode all data before error\r |
396 | p->readRes = res;\r |
397 | // p->readError_BlockIndex = blockIndex;\r |
398 | p->readWasFinished = True;\r |
399 | finish = True;\r |
400 | res = SZ_OK;\r |
401 | // break;\r |
402 | }\r |
403 | \r |
404 | if (inDataSize - inPrev >= MTDEC_ProgessStep)\r |
405 | {\r |
406 | res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);\r |
407 | if (res != SZ_OK || wasInterrupted)\r |
408 | break;\r |
409 | inPrev = inDataSize;\r |
410 | }\r |
411 | }\r |
412 | \r |
413 | {\r |
414 | CMtDecCallbackInfo parse;\r |
415 | \r |
416 | parse.startCall = (prev == NULL);\r |
417 | parse.src = parseData;\r |
418 | parse.srcSize = size;\r |
419 | parse.srcFinished = finish;\r |
420 | parse.canCreateNewThread = True;\r |
421 | \r |
422 | PRF(printf("\nParse size = %d\n", (unsigned)size));\r |
423 | \r |
424 | p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);\r |
425 | \r |
426 | PRF(printf(" Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));\r |
427 | \r |
428 | needWrite = True;\r |
429 | canCreateNewThread = parse.canCreateNewThread;\r |
430 | \r |
431 | // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);\r |
432 | \r |
433 | if (\r |
434 | // parseRes != SZ_OK ||\r |
435 | // inDataSize - (size - parse.srcSize) > p->inBlockMax\r |
436 | // ||\r |
437 | parse.state == MTDEC_PARSE_OVERFLOW\r |
438 | // || wasInterrupted\r |
439 | )\r |
440 | {\r |
441 | // Overflow or Parse error - switch from MT decoding to ST decoding\r |
442 | finish = True;\r |
443 | overflow = True;\r |
444 | \r |
445 | {\r |
446 | PRF(printf("\n Overflow"));\r |
447 | // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));\r |
448 | PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));\r |
449 | }\r |
450 | \r |
451 | if (crossSize != 0)\r |
452 | memcpy(data, parseData, size);\r |
453 | p->crossStart = 0;\r |
454 | p->crossEnd = 0;\r |
455 | break;\r |
456 | }\r |
457 | \r |
458 | if (crossSize != 0)\r |
459 | {\r |
460 | memcpy(data, parseData, parse.srcSize);\r |
461 | p->crossStart += parse.srcSize;\r |
462 | }\r |
463 | \r |
464 | if (parse.state != MTDEC_PARSE_CONTINUE || finish)\r |
465 | {\r |
466 | // we don't need to parse in current thread anymore\r |
467 | \r |
468 | if (parse.state == MTDEC_PARSE_END)\r |
469 | finish = True;\r |
470 | \r |
471 | needCode = True;\r |
472 | // p->crossFinished = finish;\r |
473 | \r |
474 | if (parse.srcSize == size)\r |
475 | {\r |
476 | // full parsed - no cross transfer\r |
477 | p->crossStart = 0;\r |
478 | p->crossEnd = 0;\r |
479 | break;\r |
480 | }\r |
481 | \r |
482 | if (parse.state == MTDEC_PARSE_END)\r |
483 | {\r |
484 | afterEndData = parseData + parse.srcSize;\r |
485 | afterEndData_Size = size - parse.srcSize;\r |
486 | if (crossSize != 0)\r |
487 | afterEndData_IsCross = True;\r |
488 | // we reduce data size to required bytes (parsed only)\r |
489 | inDataSize -= afterEndData_Size;\r |
490 | if (!prev)\r |
491 | inDataSize_Start = parse.srcSize;\r |
492 | break;\r |
493 | }\r |
494 | \r |
495 | {\r |
496 | // partial parsed - need cross transfer\r |
497 | if (crossSize != 0)\r |
498 | inDataSize = parse.srcSize; // it's only parsed now\r |
499 | else\r |
500 | {\r |
501 | // partial parsed - is not in initial cross block - we need to copy new data to cross block\r |
502 | Byte *cr = MtDec_GetCrossBuff(p);\r |
503 | if (!cr)\r |
504 | {\r |
505 | {\r |
506 | PRF(printf("\ncross alloc error error\n"));\r |
507 | // res = SZ_ERROR_MEM;\r |
508 | finish = True;\r |
509 | // p->allocError_for_Read_BlockIndex = blockIndex;\r |
510 | isAllocError = True;\r |
511 | break;\r |
512 | }\r |
513 | }\r |
514 | \r |
515 | {\r |
516 | size_t crSize = size - parse.srcSize;\r |
517 | inDataSize -= crSize;\r |
518 | p->crossEnd = crSize;\r |
519 | p->crossStart = 0;\r |
520 | memcpy(cr, parseData + parse.srcSize, crSize);\r |
521 | }\r |
522 | }\r |
523 | \r |
524 | // inDataSize_Full = inDataSize;\r |
525 | if (!prev)\r |
526 | inDataSize_Start = parse.srcSize; // it's partial size (parsed only)\r |
527 | \r |
528 | finish = False;\r |
529 | break;\r |
530 | }\r |
531 | }\r |
532 | \r |
533 | if (parse.srcSize != size)\r |
534 | {\r |
535 | res = SZ_ERROR_FAIL;\r |
536 | PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));\r |
537 | break;\r |
538 | }\r |
539 | }\r |
540 | }\r |
541 | \r |
542 | prev = link;\r |
543 | link = link->next;\r |
544 | \r |
545 | if (crossSize != 0)\r |
546 | {\r |
547 | crossSize = 0;\r |
548 | p->crossStart = 0;\r |
549 | p->crossEnd = 0;\r |
550 | }\r |
551 | }\r |
552 | }\r |
553 | \r |
554 | if (res == SZ_OK)\r |
555 | res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);\r |
556 | }\r |
557 | \r |
558 | codeRes = SZ_OK;\r |
559 | \r |
560 | if (res == SZ_OK && needCode && !wasInterrupted)\r |
561 | {\r |
562 | codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);\r |
563 | if (codeRes != SZ_OK)\r |
564 | {\r |
565 | needCode = False;\r |
566 | finish = True;\r |
567 | // SZ_ERROR_MEM is expected error here.\r |
568 | // if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.\r |
569 | // if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.\r |
570 | }\r |
571 | }\r |
572 | \r |
573 | if (res != SZ_OK || wasInterrupted)\r |
574 | finish = True;\r |
575 | \r |
576 | nextThread = NULL;\r |
577 | threadingErrorSRes = SZ_OK;\r |
578 | \r |
579 | if (!finish)\r |
580 | {\r |
581 | if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)\r |
582 | {\r |
583 | SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);\r |
584 | if (res2 == SZ_OK)\r |
585 | {\r |
586 | // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));\r |
587 | p->numStartedThreads++;\r |
588 | }\r |
589 | else\r |
590 | {\r |
591 | PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));\r |
592 | if (p->numStartedThreads == 1)\r |
593 | {\r |
594 | // if only one thread is possible, we leave muti-threading code\r |
595 | finish = True;\r |
596 | needCode = False;\r |
597 | threadingErrorSRes = res2;\r |
598 | }\r |
599 | else\r |
600 | p->numStartedThreads_Limit = p->numStartedThreads;\r |
601 | }\r |
602 | }\r |
603 | \r |
604 | if (!finish)\r |
605 | {\r |
606 | unsigned nextIndex = t->index + 1;\r |
607 | nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];\r |
608 | RINOK_THREAD(Event_Set(&nextThread->canRead))\r |
609 | // We have started executing for new iteration (with next thread)\r |
610 | // And that next thread now is responsible for possible exit from decoding (threading_code)\r |
611 | }\r |
612 | }\r |
613 | \r |
614 | // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)\r |
615 | // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case\r |
616 | // if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):\r |
617 | // - if (needContinue) after Write(&needContinue), we restore decoding with new iteration\r |
618 | // - otherwise we stop decoding and exit from ThreadFunc2()\r |
619 | \r |
620 | // Don't change (finish) variable in the further code\r |
621 | \r |
622 | \r |
623 | // ---------- CODE ----------\r |
624 | \r |
625 | inPrev = 0;\r |
626 | outPrev = 0;\r |
627 | inCodePos = 0;\r |
628 | outCodePos = 0;\r |
629 | \r |
630 | if (res == SZ_OK && needCode && codeRes == SZ_OK)\r |
631 | {\r |
632 | BoolInt isStartBlock = True;\r |
633 | CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;\r |
634 | \r |
635 | for (;;)\r |
636 | {\r |
637 | size_t inSize;\r |
638 | int stop;\r |
639 | \r |
640 | if (isStartBlock)\r |
641 | inSize = inDataSize_Start;\r |
642 | else\r |
643 | {\r |
644 | UInt64 rem = inDataSize - inCodePos;\r |
645 | inSize = p->inBufSize;\r |
646 | if (inSize > rem)\r |
647 | inSize = (size_t)rem;\r |
648 | }\r |
649 | \r |
650 | inCodePos += inSize;\r |
651 | stop = True;\r |
652 | \r |
653 | codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,\r |
654 | (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,\r |
655 | (inCodePos == inDataSize), // srcFinished\r |
656 | &inCodePos, &outCodePos, &stop);\r |
657 | \r |
658 | if (codeRes != SZ_OK)\r |
659 | {\r |
660 | PRF(printf("\nCode Interrupt error = %x\n", codeRes));\r |
661 | // we interrupt only later blocks\r |
662 | MtDec_Interrupt(p, blockIndex);\r |
663 | break;\r |
664 | }\r |
665 | \r |
666 | if (stop || inCodePos == inDataSize)\r |
667 | break;\r |
668 | \r |
669 | {\r |
670 | const UInt64 inDelta = inCodePos - inPrev;\r |
671 | const UInt64 outDelta = outCodePos - outPrev;\r |
672 | if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)\r |
673 | {\r |
674 | // Sleep(1);\r |
675 | res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);\r |
676 | if (res != SZ_OK || wasInterrupted)\r |
677 | break;\r |
678 | inPrev = inCodePos;\r |
679 | outPrev = outCodePos;\r |
680 | }\r |
681 | }\r |
682 | \r |
683 | link = link->next;\r |
684 | isStartBlock = False;\r |
685 | }\r |
686 | }\r |
687 | \r |
688 | \r |
689 | // ---------- WRITE ----------\r |
690 | \r |
691 | RINOK_THREAD(Event_Wait(&t->canWrite));\r |
692 | \r |
693 | {\r |
694 | BoolInt isErrorMode = False;\r |
695 | BoolInt canRecode = True;\r |
696 | BoolInt needWriteToStream = needWrite;\r |
697 | \r |
698 | if (p->exitThread) return 0; // it's never executed in normal cases\r |
699 | \r |
700 | if (p->wasInterrupted)\r |
701 | wasInterrupted = True;\r |
702 | else\r |
703 | {\r |
704 | if (codeRes != SZ_OK) // || !needCode // check it !!!\r |
705 | {\r |
706 | p->wasInterrupted = True;\r |
707 | p->codeRes = codeRes;\r |
708 | if (codeRes == SZ_ERROR_MEM)\r |
709 | isAllocError = True;\r |
710 | }\r |
711 | \r |
712 | if (threadingErrorSRes)\r |
713 | {\r |
714 | p->wasInterrupted = True;\r |
715 | p->threadingErrorSRes = threadingErrorSRes;\r |
716 | needWriteToStream = False;\r |
717 | }\r |
718 | if (isAllocError)\r |
719 | {\r |
720 | p->wasInterrupted = True;\r |
721 | p->isAllocError = True;\r |
722 | needWriteToStream = False;\r |
723 | }\r |
724 | if (overflow)\r |
725 | {\r |
726 | p->wasInterrupted = True;\r |
727 | p->overflow = True;\r |
728 | needWriteToStream = False;\r |
729 | }\r |
730 | }\r |
731 | \r |
732 | if (needCode)\r |
733 | {\r |
734 | if (wasInterrupted)\r |
735 | {\r |
736 | inCodePos = 0;\r |
737 | outCodePos = 0;\r |
738 | }\r |
739 | {\r |
740 | const UInt64 inDelta = inCodePos - inPrev;\r |
741 | const UInt64 outDelta = outCodePos - outPrev;\r |
742 | // if (inDelta != 0 || outDelta != 0)\r |
743 | res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);\r |
744 | }\r |
745 | }\r |
746 | \r |
747 | needContinue = (!finish);\r |
748 | \r |
749 | // if (res == SZ_OK && needWrite && !wasInterrupted)\r |
750 | if (needWrite)\r |
751 | {\r |
752 | // p->inProcessed += inCodePos;\r |
753 | \r |
754 | PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));\r |
755 | \r |
756 | res = p->mtCallback->Write(p->mtCallbackObject, t->index,\r |
757 | res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite\r |
758 | afterEndData, afterEndData_Size, afterEndData_IsCross,\r |
759 | &needContinue,\r |
760 | &canRecode);\r |
761 | \r |
762 | // res = SZ_ERROR_FAIL; // for test\r |
763 | \r |
764 | PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));\r |
765 | PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));\r |
766 | \r |
767 | if (res != SZ_OK)\r |
768 | {\r |
769 | PRF(printf("\nWrite error = %d\n", res));\r |
770 | isErrorMode = True;\r |
771 | p->wasInterrupted = True;\r |
772 | }\r |
773 | if (res != SZ_OK\r |
774 | || (!needContinue && !finish))\r |
775 | {\r |
776 | PRF(printf("\nWrite Interrupt error = %x\n", res));\r |
777 | MtDec_Interrupt(p, blockIndex);\r |
778 | }\r |
779 | }\r |
780 | \r |
781 | if (canRecode)\r |
782 | if (!needCode\r |
783 | || res != SZ_OK\r |
784 | || p->wasInterrupted\r |
785 | || codeRes != SZ_OK\r |
786 | || wasInterrupted\r |
787 | || p->numFilledThreads != 0\r |
788 | || isErrorMode)\r |
789 | {\r |
790 | if (p->numFilledThreads == 0)\r |
791 | p->filledThreadStart = t->index;\r |
792 | if (inDataSize != 0 || !finish)\r |
793 | {\r |
794 | t->inDataSize_Start = inDataSize_Start;\r |
795 | t->inDataSize = inDataSize;\r |
796 | p->numFilledThreads++;\r |
797 | }\r |
798 | PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));\r |
799 | PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));\r |
800 | }\r |
801 | \r |
802 | if (!finish)\r |
803 | {\r |
804 | RINOK_THREAD(Event_Set(&nextThread->canWrite));\r |
805 | }\r |
806 | else\r |
807 | {\r |
808 | if (needContinue)\r |
809 | {\r |
810 | // we restore decoding with new iteration\r |
811 | RINOK_THREAD(Event_Set(&p->threads[0].canWrite));\r |
812 | }\r |
813 | else\r |
814 | {\r |
815 | // we exit from decoding\r |
816 | if (t->index == 0)\r |
817 | return SZ_OK;\r |
818 | p->exitThread = True;\r |
819 | }\r |
820 | RINOK_THREAD(Event_Set(&p->threads[0].canRead));\r |
821 | }\r |
822 | }\r |
823 | }\r |
824 | }\r |
825 | \r |
826 | #ifdef _WIN32\r |
827 | #define USE_ALLOCA\r |
828 | #endif\r |
829 | \r |
830 | #ifdef USE_ALLOCA\r |
831 | #ifdef _WIN32\r |
832 | #include <malloc.h>\r |
833 | #else\r |
834 | #include <stdlib.h>\r |
835 | #endif\r |
836 | #endif\r |
837 | \r |
838 | \r |
839 | static THREAD_FUNC_DECL ThreadFunc1(void *pp)\r |
840 | {\r |
841 | WRes res;\r |
842 | \r |
843 | CMtDecThread *t = (CMtDecThread *)pp;\r |
844 | CMtDec *p;\r |
845 | \r |
846 | // fprintf(stdout, "\n%d = %p\n", t->index, &t);\r |
847 | \r |
848 | res = ThreadFunc2(t);\r |
849 | p = t->mtDec;\r |
850 | if (res == 0)\r |
851 | return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;\r |
852 | {\r |
853 | // it's unexpected situation for some threading function error\r |
854 | if (p->exitThreadWRes == 0)\r |
855 | p->exitThreadWRes = res;\r |
856 | PRF(printf("\nthread exit error = %d\n", res));\r |
857 | p->exitThread = True;\r |
858 | Event_Set(&p->threads[0].canRead);\r |
859 | Event_Set(&p->threads[0].canWrite);\r |
860 | MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));\r |
861 | }\r |
862 | return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;\r |
863 | }\r |
864 | \r |
865 | static MY_NO_INLINE THREAD_FUNC_DECL ThreadFunc(void *pp)\r |
866 | {\r |
867 | #ifdef USE_ALLOCA\r |
868 | CMtDecThread *t = (CMtDecThread *)pp;\r |
869 | // fprintf(stderr, "\n%d = %p - before", t->index, &t);\r |
870 | t->allocaPtr = alloca(t->index * 128);\r |
871 | #endif\r |
872 | return ThreadFunc1(pp);\r |
873 | }\r |
874 | \r |
875 | \r |
876 | int MtDec_PrepareRead(CMtDec *p)\r |
877 | {\r |
878 | if (p->crossBlock && p->crossStart == p->crossEnd)\r |
879 | {\r |
880 | ISzAlloc_Free(p->alloc, p->crossBlock);\r |
881 | p->crossBlock = NULL;\r |
882 | }\r |
883 | \r |
884 | {\r |
885 | unsigned i;\r |
886 | for (i = 0; i < MTDEC__THREADS_MAX; i++)\r |
887 | if (i > p->numStartedThreads\r |
888 | || p->numFilledThreads <=\r |
889 | (i >= p->filledThreadStart ?\r |
890 | i - p->filledThreadStart :\r |
891 | i + p->numStartedThreads - p->filledThreadStart))\r |
892 | MtDecThread_FreeInBufs(&p->threads[i]);\r |
893 | }\r |
894 | \r |
895 | return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);\r |
896 | }\r |
897 | \r |
898 | \r |
899 | const Byte *MtDec_Read(CMtDec *p, size_t *inLim)\r |
900 | {\r |
901 | while (p->numFilledThreads != 0)\r |
902 | {\r |
903 | CMtDecThread *t = &p->threads[p->filledThreadStart];\r |
904 | \r |
905 | if (*inLim != 0)\r |
906 | {\r |
907 | {\r |
908 | void *link = t->inBuf;\r |
909 | void *next = ((CMtDecBufLink *)link)->next;\r |
910 | ISzAlloc_Free(p->alloc, link);\r |
911 | t->inBuf = next;\r |
912 | }\r |
913 | \r |
914 | if (t->inDataSize == 0)\r |
915 | {\r |
916 | MtDecThread_FreeInBufs(t);\r |
917 | if (--p->numFilledThreads == 0)\r |
918 | break;\r |
919 | if (++p->filledThreadStart == p->numStartedThreads)\r |
920 | p->filledThreadStart = 0;\r |
921 | t = &p->threads[p->filledThreadStart];\r |
922 | }\r |
923 | }\r |
924 | \r |
925 | {\r |
926 | size_t lim = t->inDataSize_Start;\r |
927 | if (lim != 0)\r |
928 | t->inDataSize_Start = 0;\r |
929 | else\r |
930 | {\r |
931 | UInt64 rem = t->inDataSize;\r |
932 | lim = p->inBufSize;\r |
933 | if (lim > rem)\r |
934 | lim = (size_t)rem;\r |
935 | }\r |
936 | t->inDataSize -= lim;\r |
937 | *inLim = lim;\r |
938 | return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);\r |
939 | }\r |
940 | }\r |
941 | \r |
942 | {\r |
943 | size_t crossSize = p->crossEnd - p->crossStart;\r |
944 | if (crossSize != 0)\r |
945 | {\r |
946 | const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;\r |
947 | *inLim = crossSize;\r |
948 | p->crossStart = 0;\r |
949 | p->crossEnd = 0;\r |
950 | return data;\r |
951 | }\r |
952 | *inLim = 0;\r |
953 | if (p->crossBlock)\r |
954 | {\r |
955 | ISzAlloc_Free(p->alloc, p->crossBlock);\r |
956 | p->crossBlock = NULL;\r |
957 | }\r |
958 | return NULL;\r |
959 | }\r |
960 | }\r |
961 | \r |
962 | \r |
963 | void MtDec_Construct(CMtDec *p)\r |
964 | {\r |
965 | unsigned i;\r |
966 | \r |
967 | p->inBufSize = (size_t)1 << 18;\r |
968 | \r |
969 | p->numThreadsMax = 0;\r |
970 | \r |
971 | p->inStream = NULL;\r |
972 | \r |
973 | // p->inData = NULL;\r |
974 | // p->inDataSize = 0;\r |
975 | \r |
976 | p->crossBlock = NULL;\r |
977 | p->crossStart = 0;\r |
978 | p->crossEnd = 0;\r |
979 | \r |
980 | p->numFilledThreads = 0;\r |
981 | \r |
982 | p->progress = NULL;\r |
983 | p->alloc = NULL;\r |
984 | \r |
985 | p->mtCallback = NULL;\r |
986 | p->mtCallbackObject = NULL;\r |
987 | \r |
988 | p->allocatedBufsSize = 0;\r |
989 | \r |
990 | for (i = 0; i < MTDEC__THREADS_MAX; i++)\r |
991 | {\r |
992 | CMtDecThread *t = &p->threads[i];\r |
993 | t->mtDec = p;\r |
994 | t->index = i;\r |
995 | t->inBuf = NULL;\r |
996 | Event_Construct(&t->canRead);\r |
997 | Event_Construct(&t->canWrite);\r |
998 | Thread_Construct(&t->thread);\r |
999 | }\r |
1000 | \r |
1001 | // Event_Construct(&p->finishedEvent);\r |
1002 | \r |
1003 | CriticalSection_Init(&p->mtProgress.cs);\r |
1004 | }\r |
1005 | \r |
1006 | \r |
1007 | static void MtDec_Free(CMtDec *p)\r |
1008 | {\r |
1009 | unsigned i;\r |
1010 | \r |
1011 | p->exitThread = True;\r |
1012 | \r |
1013 | for (i = 0; i < MTDEC__THREADS_MAX; i++)\r |
1014 | MtDecThread_Destruct(&p->threads[i]);\r |
1015 | \r |
1016 | // Event_Close(&p->finishedEvent);\r |
1017 | \r |
1018 | if (p->crossBlock)\r |
1019 | {\r |
1020 | ISzAlloc_Free(p->alloc, p->crossBlock);\r |
1021 | p->crossBlock = NULL;\r |
1022 | }\r |
1023 | }\r |
1024 | \r |
1025 | \r |
1026 | void MtDec_Destruct(CMtDec *p)\r |
1027 | {\r |
1028 | MtDec_Free(p);\r |
1029 | \r |
1030 | CriticalSection_Delete(&p->mtProgress.cs);\r |
1031 | }\r |
1032 | \r |
1033 | \r |
1034 | SRes MtDec_Code(CMtDec *p)\r |
1035 | {\r |
1036 | unsigned i;\r |
1037 | \r |
1038 | p->inProcessed = 0;\r |
1039 | \r |
1040 | p->blockIndex = 1; // it must be larger than not_defined index (0)\r |
1041 | p->isAllocError = False;\r |
1042 | p->overflow = False;\r |
1043 | p->threadingErrorSRes = SZ_OK;\r |
1044 | \r |
1045 | p->needContinue = True;\r |
1046 | \r |
1047 | p->readWasFinished = False;\r |
1048 | p->needInterrupt = False;\r |
1049 | p->interruptIndex = (UInt64)(Int64)-1;\r |
1050 | \r |
1051 | p->readProcessed = 0;\r |
1052 | p->readRes = SZ_OK;\r |
1053 | p->codeRes = SZ_OK;\r |
1054 | p->wasInterrupted = False;\r |
1055 | \r |
1056 | p->crossStart = 0;\r |
1057 | p->crossEnd = 0;\r |
1058 | \r |
1059 | p->filledThreadStart = 0;\r |
1060 | p->numFilledThreads = 0;\r |
1061 | \r |
1062 | {\r |
1063 | unsigned numThreads = p->numThreadsMax;\r |
1064 | if (numThreads > MTDEC__THREADS_MAX)\r |
1065 | numThreads = MTDEC__THREADS_MAX;\r |
1066 | p->numStartedThreads_Limit = numThreads;\r |
1067 | p->numStartedThreads = 0;\r |
1068 | }\r |
1069 | \r |
1070 | if (p->inBufSize != p->allocatedBufsSize)\r |
1071 | {\r |
1072 | for (i = 0; i < MTDEC__THREADS_MAX; i++)\r |
1073 | {\r |
1074 | CMtDecThread *t = &p->threads[i];\r |
1075 | if (t->inBuf)\r |
1076 | MtDecThread_FreeInBufs(t);\r |
1077 | }\r |
1078 | if (p->crossBlock)\r |
1079 | {\r |
1080 | ISzAlloc_Free(p->alloc, p->crossBlock);\r |
1081 | p->crossBlock = NULL;\r |
1082 | }\r |
1083 | \r |
1084 | p->allocatedBufsSize = p->inBufSize;\r |
1085 | }\r |
1086 | \r |
1087 | MtProgress_Init(&p->mtProgress, p->progress);\r |
1088 | \r |
1089 | // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));\r |
1090 | p->exitThread = False;\r |
1091 | p->exitThreadWRes = 0;\r |
1092 | \r |
1093 | {\r |
1094 | WRes wres;\r |
1095 | SRes sres;\r |
1096 | CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];\r |
1097 | // wres = MtDecThread_CreateAndStart(nextThread);\r |
1098 | wres = MtDecThread_CreateEvents(nextThread);\r |
1099 | if (wres == 0) { wres = Event_Set(&nextThread->canWrite);\r |
1100 | if (wres == 0) { wres = Event_Set(&nextThread->canRead);\r |
1101 | if (wres == 0) { THREAD_FUNC_RET_TYPE res = ThreadFunc(nextThread);\r |
1102 | wres = (WRes)(UINT_PTR)res;\r |
1103 | if (wres != 0)\r |
1104 | {\r |
1105 | p->needContinue = False;\r |
1106 | MtDec_CloseThreads(p);\r |
1107 | }}}}\r |
1108 | \r |
1109 | // wres = 17; // for test\r |
1110 | // wres = Event_Wait(&p->finishedEvent);\r |
1111 | \r |
1112 | sres = MY_SRes_HRESULT_FROM_WRes(wres);\r |
1113 | \r |
1114 | if (sres != 0)\r |
1115 | p->threadingErrorSRes = sres;\r |
1116 | \r |
1117 | if (\r |
1118 | // wres == 0\r |
1119 | // wres != 0\r |
1120 | // || p->mtc.codeRes == SZ_ERROR_MEM\r |
1121 | p->isAllocError\r |
1122 | || p->threadingErrorSRes != SZ_OK\r |
1123 | || p->overflow)\r |
1124 | {\r |
1125 | // p->needContinue = True;\r |
1126 | }\r |
1127 | else\r |
1128 | p->needContinue = False;\r |
1129 | \r |
1130 | if (p->needContinue)\r |
1131 | return SZ_OK;\r |
1132 | \r |
1133 | // if (sres != SZ_OK)\r |
1134 | return sres;\r |
1135 | // return SZ_ERROR_FAIL;\r |
1136 | }\r |
1137 | }\r |
1138 | \r |
1139 | #endif\r |