9e052883 |
1 | /* MtCoder.c -- Multi-thread Coder\r |
2 | 2021-12-21 : Igor Pavlov : Public domain */\r |
3 | \r |
4 | #include "Precomp.h"\r |
5 | \r |
6 | #include "MtCoder.h"\r |
7 | \r |
8 | #ifndef _7ZIP_ST\r |
9 | \r |
10 | static SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)\r |
11 | {\r |
12 | CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);\r |
13 | UInt64 inSize2 = 0;\r |
14 | UInt64 outSize2 = 0;\r |
15 | if (inSize != (UInt64)(Int64)-1)\r |
16 | {\r |
17 | inSize2 = inSize - thunk->inSize;\r |
18 | thunk->inSize = inSize;\r |
19 | }\r |
20 | if (outSize != (UInt64)(Int64)-1)\r |
21 | {\r |
22 | outSize2 = outSize - thunk->outSize;\r |
23 | thunk->outSize = outSize;\r |
24 | }\r |
25 | return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);\r |
26 | }\r |
27 | \r |
28 | \r |
29 | void MtProgressThunk_CreateVTable(CMtProgressThunk *p)\r |
30 | {\r |
31 | p->vt.Progress = MtProgressThunk_Progress;\r |
32 | }\r |
33 | \r |
34 | \r |
35 | \r |
36 | #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }\r |
37 | \r |
38 | \r |
39 | static WRes ArEvent_OptCreate_And_Reset(CEvent *p)\r |
40 | {\r |
41 | if (Event_IsCreated(p))\r |
42 | return Event_Reset(p);\r |
43 | return AutoResetEvent_CreateNotSignaled(p);\r |
44 | }\r |
45 | \r |
46 | \r |
47 | static THREAD_FUNC_DECL ThreadFunc(void *pp);\r |
48 | \r |
49 | \r |
50 | static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)\r |
51 | {\r |
52 | WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);\r |
53 | if (wres == 0)\r |
54 | {\r |
55 | t->stop = False;\r |
56 | if (!Thread_WasCreated(&t->thread))\r |
57 | wres = Thread_Create(&t->thread, ThreadFunc, t);\r |
58 | if (wres == 0)\r |
59 | wres = Event_Set(&t->startEvent);\r |
60 | }\r |
61 | if (wres == 0)\r |
62 | return SZ_OK;\r |
63 | return MY_SRes_HRESULT_FROM_WRes(wres);\r |
64 | }\r |
65 | \r |
66 | \r |
67 | static void MtCoderThread_Destruct(CMtCoderThread *t)\r |
68 | {\r |
69 | if (Thread_WasCreated(&t->thread))\r |
70 | {\r |
71 | t->stop = 1;\r |
72 | Event_Set(&t->startEvent);\r |
73 | Thread_Wait_Close(&t->thread);\r |
74 | }\r |
75 | \r |
76 | Event_Close(&t->startEvent);\r |
77 | \r |
78 | if (t->inBuf)\r |
79 | {\r |
80 | ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);\r |
81 | t->inBuf = NULL;\r |
82 | }\r |
83 | }\r |
84 | \r |
85 | \r |
86 | \r |
87 | static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)\r |
88 | {\r |
89 | size_t size = *processedSize;\r |
90 | *processedSize = 0;\r |
91 | while (size != 0)\r |
92 | {\r |
93 | size_t cur = size;\r |
94 | SRes res = ISeqInStream_Read(stream, data, &cur);\r |
95 | *processedSize += cur;\r |
96 | data += cur;\r |
97 | size -= cur;\r |
98 | RINOK(res);\r |
99 | if (cur == 0)\r |
100 | return SZ_OK;\r |
101 | }\r |
102 | return SZ_OK;\r |
103 | }\r |
104 | \r |
105 | \r |
106 | /*\r |
107 | ThreadFunc2() returns:\r |
108 | SZ_OK - in all normal cases (even for stream error or memory allocation error)\r |
109 | SZ_ERROR_THREAD - in case of failure in system synch function\r |
110 | */\r |
111 | \r |
112 | static SRes ThreadFunc2(CMtCoderThread *t)\r |
113 | {\r |
114 | CMtCoder *mtc = t->mtCoder;\r |
115 | \r |
116 | for (;;)\r |
117 | {\r |
118 | unsigned bi;\r |
119 | SRes res;\r |
120 | SRes res2;\r |
121 | BoolInt finished;\r |
122 | unsigned bufIndex;\r |
123 | size_t size;\r |
124 | const Byte *inData;\r |
125 | UInt64 readProcessed = 0;\r |
126 | \r |
127 | RINOK_THREAD(Event_Wait(&mtc->readEvent))\r |
128 | \r |
129 | /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */\r |
130 | \r |
131 | if (mtc->stopReading)\r |
132 | {\r |
133 | return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;\r |
134 | }\r |
135 | \r |
136 | res = MtProgress_GetError(&mtc->mtProgress);\r |
137 | \r |
138 | size = 0;\r |
139 | inData = NULL;\r |
140 | finished = True;\r |
141 | \r |
142 | if (res == SZ_OK)\r |
143 | {\r |
144 | size = mtc->blockSize;\r |
145 | if (mtc->inStream)\r |
146 | {\r |
147 | if (!t->inBuf)\r |
148 | {\r |
149 | t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);\r |
150 | if (!t->inBuf)\r |
151 | res = SZ_ERROR_MEM;\r |
152 | }\r |
153 | if (res == SZ_OK)\r |
154 | {\r |
155 | res = FullRead(mtc->inStream, t->inBuf, &size);\r |
156 | readProcessed = mtc->readProcessed + size;\r |
157 | mtc->readProcessed = readProcessed;\r |
158 | }\r |
159 | if (res != SZ_OK)\r |
160 | {\r |
161 | mtc->readRes = res;\r |
162 | /* after reading error - we can stop encoding of previous blocks */\r |
163 | MtProgress_SetError(&mtc->mtProgress, res);\r |
164 | }\r |
165 | else\r |
166 | finished = (size != mtc->blockSize);\r |
167 | }\r |
168 | else\r |
169 | {\r |
170 | size_t rem;\r |
171 | readProcessed = mtc->readProcessed;\r |
172 | rem = mtc->inDataSize - (size_t)readProcessed;\r |
173 | if (size > rem)\r |
174 | size = rem;\r |
175 | inData = mtc->inData + (size_t)readProcessed;\r |
176 | readProcessed += size;\r |
177 | mtc->readProcessed = readProcessed;\r |
178 | finished = (mtc->inDataSize == (size_t)readProcessed);\r |
179 | }\r |
180 | }\r |
181 | \r |
182 | /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */\r |
183 | \r |
184 | res2 = SZ_OK;\r |
185 | \r |
186 | if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)\r |
187 | {\r |
188 | res2 = SZ_ERROR_THREAD;\r |
189 | if (res == SZ_OK)\r |
190 | {\r |
191 | res = res2;\r |
192 | // MtProgress_SetError(&mtc->mtProgress, res);\r |
193 | }\r |
194 | }\r |
195 | \r |
196 | bi = mtc->blockIndex;\r |
197 | \r |
198 | if (++mtc->blockIndex >= mtc->numBlocksMax)\r |
199 | mtc->blockIndex = 0;\r |
200 | \r |
201 | bufIndex = (unsigned)(int)-1;\r |
202 | \r |
203 | if (res == SZ_OK)\r |
204 | res = MtProgress_GetError(&mtc->mtProgress);\r |
205 | \r |
206 | if (res != SZ_OK)\r |
207 | finished = True;\r |
208 | \r |
209 | if (!finished)\r |
210 | {\r |
211 | if (mtc->numStartedThreads < mtc->numStartedThreadsLimit\r |
212 | && mtc->expectedDataSize != readProcessed)\r |
213 | {\r |
214 | res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);\r |
215 | if (res == SZ_OK)\r |
216 | mtc->numStartedThreads++;\r |
217 | else\r |
218 | {\r |
219 | MtProgress_SetError(&mtc->mtProgress, res);\r |
220 | finished = True;\r |
221 | }\r |
222 | }\r |
223 | }\r |
224 | \r |
225 | if (finished)\r |
226 | mtc->stopReading = True;\r |
227 | \r |
228 | RINOK_THREAD(Event_Set(&mtc->readEvent))\r |
229 | \r |
230 | if (res2 != SZ_OK)\r |
231 | return res2;\r |
232 | \r |
233 | if (res == SZ_OK)\r |
234 | {\r |
235 | CriticalSection_Enter(&mtc->cs);\r |
236 | bufIndex = mtc->freeBlockHead;\r |
237 | mtc->freeBlockHead = mtc->freeBlockList[bufIndex];\r |
238 | CriticalSection_Leave(&mtc->cs);\r |
239 | \r |
240 | res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,\r |
241 | mtc->inStream ? t->inBuf : inData, size, finished);\r |
242 | \r |
243 | // MtProgress_Reinit(&mtc->mtProgress, t->index);\r |
244 | \r |
245 | if (res != SZ_OK)\r |
246 | MtProgress_SetError(&mtc->mtProgress, res);\r |
247 | }\r |
248 | \r |
249 | {\r |
250 | CMtCoderBlock *block = &mtc->blocks[bi];\r |
251 | block->res = res;\r |
252 | block->bufIndex = bufIndex;\r |
253 | block->finished = finished;\r |
254 | }\r |
255 | \r |
256 | #ifdef MTCODER__USE_WRITE_THREAD\r |
257 | RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))\r |
258 | #else\r |
259 | {\r |
260 | unsigned wi;\r |
261 | {\r |
262 | CriticalSection_Enter(&mtc->cs);\r |
263 | wi = mtc->writeIndex;\r |
264 | if (wi == bi)\r |
265 | mtc->writeIndex = (unsigned)(int)-1;\r |
266 | else\r |
267 | mtc->ReadyBlocks[bi] = True;\r |
268 | CriticalSection_Leave(&mtc->cs);\r |
269 | }\r |
270 | \r |
271 | if (wi != bi)\r |
272 | {\r |
273 | if (res != SZ_OK || finished)\r |
274 | return 0;\r |
275 | continue;\r |
276 | }\r |
277 | \r |
278 | if (mtc->writeRes != SZ_OK)\r |
279 | res = mtc->writeRes;\r |
280 | \r |
281 | for (;;)\r |
282 | {\r |
283 | if (res == SZ_OK && bufIndex != (unsigned)(int)-1)\r |
284 | {\r |
285 | res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);\r |
286 | if (res != SZ_OK)\r |
287 | {\r |
288 | mtc->writeRes = res;\r |
289 | MtProgress_SetError(&mtc->mtProgress, res);\r |
290 | }\r |
291 | }\r |
292 | \r |
293 | if (++wi >= mtc->numBlocksMax)\r |
294 | wi = 0;\r |
295 | {\r |
296 | BoolInt isReady;\r |
297 | \r |
298 | CriticalSection_Enter(&mtc->cs);\r |
299 | \r |
300 | if (bufIndex != (unsigned)(int)-1)\r |
301 | {\r |
302 | mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;\r |
303 | mtc->freeBlockHead = bufIndex;\r |
304 | }\r |
305 | \r |
306 | isReady = mtc->ReadyBlocks[wi];\r |
307 | \r |
308 | if (isReady)\r |
309 | mtc->ReadyBlocks[wi] = False;\r |
310 | else\r |
311 | mtc->writeIndex = wi;\r |
312 | \r |
313 | CriticalSection_Leave(&mtc->cs);\r |
314 | \r |
315 | RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))\r |
316 | \r |
317 | if (!isReady)\r |
318 | break;\r |
319 | }\r |
320 | \r |
321 | {\r |
322 | CMtCoderBlock *block = &mtc->blocks[wi];\r |
323 | if (res == SZ_OK && block->res != SZ_OK)\r |
324 | res = block->res;\r |
325 | bufIndex = block->bufIndex;\r |
326 | finished = block->finished;\r |
327 | }\r |
328 | }\r |
329 | }\r |
330 | #endif\r |
331 | \r |
332 | if (finished || res != SZ_OK)\r |
333 | return 0;\r |
334 | }\r |
335 | }\r |
336 | \r |
337 | \r |
338 | static THREAD_FUNC_DECL ThreadFunc(void *pp)\r |
339 | {\r |
340 | CMtCoderThread *t = (CMtCoderThread *)pp;\r |
341 | for (;;)\r |
342 | {\r |
343 | if (Event_Wait(&t->startEvent) != 0)\r |
344 | return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;\r |
345 | if (t->stop)\r |
346 | return 0;\r |
347 | {\r |
348 | SRes res = ThreadFunc2(t);\r |
349 | CMtCoder *mtc = t->mtCoder;\r |
350 | if (res != SZ_OK)\r |
351 | {\r |
352 | MtProgress_SetError(&mtc->mtProgress, res);\r |
353 | }\r |
354 | \r |
355 | #ifndef MTCODER__USE_WRITE_THREAD\r |
356 | {\r |
357 | unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);\r |
358 | if (numFinished == mtc->numStartedThreads)\r |
359 | if (Event_Set(&mtc->finishedEvent) != 0)\r |
360 | return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;\r |
361 | }\r |
362 | #endif\r |
363 | }\r |
364 | }\r |
365 | }\r |
366 | \r |
367 | \r |
368 | \r |
369 | void MtCoder_Construct(CMtCoder *p)\r |
370 | {\r |
371 | unsigned i;\r |
372 | \r |
373 | p->blockSize = 0;\r |
374 | p->numThreadsMax = 0;\r |
375 | p->expectedDataSize = (UInt64)(Int64)-1;\r |
376 | \r |
377 | p->inStream = NULL;\r |
378 | p->inData = NULL;\r |
379 | p->inDataSize = 0;\r |
380 | \r |
381 | p->progress = NULL;\r |
382 | p->allocBig = NULL;\r |
383 | \r |
384 | p->mtCallback = NULL;\r |
385 | p->mtCallbackObject = NULL;\r |
386 | \r |
387 | p->allocatedBufsSize = 0;\r |
388 | \r |
389 | Event_Construct(&p->readEvent);\r |
390 | Semaphore_Construct(&p->blocksSemaphore);\r |
391 | \r |
392 | for (i = 0; i < MTCODER__THREADS_MAX; i++)\r |
393 | {\r |
394 | CMtCoderThread *t = &p->threads[i];\r |
395 | t->mtCoder = p;\r |
396 | t->index = i;\r |
397 | t->inBuf = NULL;\r |
398 | t->stop = False;\r |
399 | Event_Construct(&t->startEvent);\r |
400 | Thread_Construct(&t->thread);\r |
401 | }\r |
402 | \r |
403 | #ifdef MTCODER__USE_WRITE_THREAD\r |
404 | for (i = 0; i < MTCODER__BLOCKS_MAX; i++)\r |
405 | Event_Construct(&p->writeEvents[i]);\r |
406 | #else\r |
407 | Event_Construct(&p->finishedEvent);\r |
408 | #endif\r |
409 | \r |
410 | CriticalSection_Init(&p->cs);\r |
411 | CriticalSection_Init(&p->mtProgress.cs);\r |
412 | }\r |
413 | \r |
414 | \r |
415 | \r |
416 | \r |
417 | static void MtCoder_Free(CMtCoder *p)\r |
418 | {\r |
419 | unsigned i;\r |
420 | \r |
421 | /*\r |
422 | p->stopReading = True;\r |
423 | if (Event_IsCreated(&p->readEvent))\r |
424 | Event_Set(&p->readEvent);\r |
425 | */\r |
426 | \r |
427 | for (i = 0; i < MTCODER__THREADS_MAX; i++)\r |
428 | MtCoderThread_Destruct(&p->threads[i]);\r |
429 | \r |
430 | Event_Close(&p->readEvent);\r |
431 | Semaphore_Close(&p->blocksSemaphore);\r |
432 | \r |
433 | #ifdef MTCODER__USE_WRITE_THREAD\r |
434 | for (i = 0; i < MTCODER__BLOCKS_MAX; i++)\r |
435 | Event_Close(&p->writeEvents[i]);\r |
436 | #else\r |
437 | Event_Close(&p->finishedEvent);\r |
438 | #endif\r |
439 | }\r |
440 | \r |
441 | \r |
442 | void MtCoder_Destruct(CMtCoder *p)\r |
443 | {\r |
444 | MtCoder_Free(p);\r |
445 | \r |
446 | CriticalSection_Delete(&p->cs);\r |
447 | CriticalSection_Delete(&p->mtProgress.cs);\r |
448 | }\r |
449 | \r |
450 | \r |
451 | SRes MtCoder_Code(CMtCoder *p)\r |
452 | {\r |
453 | unsigned numThreads = p->numThreadsMax;\r |
454 | unsigned numBlocksMax;\r |
455 | unsigned i;\r |
456 | SRes res = SZ_OK;\r |
457 | \r |
458 | if (numThreads > MTCODER__THREADS_MAX)\r |
459 | numThreads = MTCODER__THREADS_MAX;\r |
460 | numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);\r |
461 | \r |
462 | if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;\r |
463 | if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;\r |
464 | if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;\r |
465 | \r |
466 | if (numBlocksMax > MTCODER__BLOCKS_MAX)\r |
467 | numBlocksMax = MTCODER__BLOCKS_MAX;\r |
468 | \r |
469 | if (p->blockSize != p->allocatedBufsSize)\r |
470 | {\r |
471 | for (i = 0; i < MTCODER__THREADS_MAX; i++)\r |
472 | {\r |
473 | CMtCoderThread *t = &p->threads[i];\r |
474 | if (t->inBuf)\r |
475 | {\r |
476 | ISzAlloc_Free(p->allocBig, t->inBuf);\r |
477 | t->inBuf = NULL;\r |
478 | }\r |
479 | }\r |
480 | p->allocatedBufsSize = p->blockSize;\r |
481 | }\r |
482 | \r |
483 | p->readRes = SZ_OK;\r |
484 | \r |
485 | MtProgress_Init(&p->mtProgress, p->progress);\r |
486 | \r |
487 | #ifdef MTCODER__USE_WRITE_THREAD\r |
488 | for (i = 0; i < numBlocksMax; i++)\r |
489 | {\r |
490 | RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));\r |
491 | }\r |
492 | #else\r |
493 | RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));\r |
494 | #endif\r |
495 | \r |
496 | {\r |
497 | RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));\r |
498 | RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax));\r |
499 | }\r |
500 | \r |
501 | for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)\r |
502 | p->freeBlockList[i] = i + 1;\r |
503 | p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;\r |
504 | p->freeBlockHead = 0;\r |
505 | \r |
506 | p->readProcessed = 0;\r |
507 | p->blockIndex = 0;\r |
508 | p->numBlocksMax = numBlocksMax;\r |
509 | p->stopReading = False;\r |
510 | \r |
511 | #ifndef MTCODER__USE_WRITE_THREAD\r |
512 | p->writeIndex = 0;\r |
513 | p->writeRes = SZ_OK;\r |
514 | for (i = 0; i < MTCODER__BLOCKS_MAX; i++)\r |
515 | p->ReadyBlocks[i] = False;\r |
516 | p->numFinishedThreads = 0;\r |
517 | #endif\r |
518 | \r |
519 | p->numStartedThreadsLimit = numThreads;\r |
520 | p->numStartedThreads = 0;\r |
521 | \r |
522 | // for (i = 0; i < numThreads; i++)\r |
523 | {\r |
524 | CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];\r |
525 | RINOK(MtCoderThread_CreateAndStart(nextThread));\r |
526 | }\r |
527 | \r |
528 | RINOK_THREAD(Event_Set(&p->readEvent))\r |
529 | \r |
530 | #ifdef MTCODER__USE_WRITE_THREAD\r |
531 | {\r |
532 | unsigned bi = 0;\r |
533 | \r |
534 | for (;; bi++)\r |
535 | {\r |
536 | if (bi >= numBlocksMax)\r |
537 | bi = 0;\r |
538 | \r |
539 | RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))\r |
540 | \r |
541 | {\r |
542 | const CMtCoderBlock *block = &p->blocks[bi];\r |
543 | unsigned bufIndex = block->bufIndex;\r |
544 | BoolInt finished = block->finished;\r |
545 | if (res == SZ_OK && block->res != SZ_OK)\r |
546 | res = block->res;\r |
547 | \r |
548 | if (bufIndex != (unsigned)(int)-1)\r |
549 | {\r |
550 | if (res == SZ_OK)\r |
551 | {\r |
552 | res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);\r |
553 | if (res != SZ_OK)\r |
554 | MtProgress_SetError(&p->mtProgress, res);\r |
555 | }\r |
556 | \r |
557 | CriticalSection_Enter(&p->cs);\r |
558 | {\r |
559 | p->freeBlockList[bufIndex] = p->freeBlockHead;\r |
560 | p->freeBlockHead = bufIndex;\r |
561 | }\r |
562 | CriticalSection_Leave(&p->cs);\r |
563 | }\r |
564 | \r |
565 | RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))\r |
566 | \r |
567 | if (finished)\r |
568 | break;\r |
569 | }\r |
570 | }\r |
571 | }\r |
572 | #else\r |
573 | {\r |
574 | WRes wres = Event_Wait(&p->finishedEvent);\r |
575 | res = MY_SRes_HRESULT_FROM_WRes(wres);\r |
576 | }\r |
577 | #endif\r |
578 | \r |
579 | if (res == SZ_OK)\r |
580 | res = p->readRes;\r |
581 | \r |
582 | if (res == SZ_OK)\r |
583 | res = p->mtProgress.res;\r |
584 | \r |
585 | #ifndef MTCODER__USE_WRITE_THREAD\r |
586 | if (res == SZ_OK)\r |
587 | res = p->writeRes;\r |
588 | #endif\r |
589 | \r |
590 | if (res != SZ_OK)\r |
591 | MtCoder_Free(p);\r |
592 | return res;\r |
593 | }\r |
594 | \r |
595 | #endif\r |