git subrepo pull (merge) --force deps/libchdr
[pcsx_rearmed.git] / deps / libchdr / deps / lzma-22.01 / src / MtCoder.c
1 /* MtCoder.c -- Multi-thread Coder\r
2 2021-12-21 : Igor Pavlov : Public domain */\r
3 \r
4 #include "Precomp.h"\r
5 \r
6 #include "MtCoder.h"\r
7 \r
8 #ifndef _7ZIP_ST\r
9 \r
10 static SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)\r
11 {\r
12   CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);\r
13   UInt64 inSize2 = 0;\r
14   UInt64 outSize2 = 0;\r
15   if (inSize != (UInt64)(Int64)-1)\r
16   {\r
17     inSize2 = inSize - thunk->inSize;\r
18     thunk->inSize = inSize;\r
19   }\r
20   if (outSize != (UInt64)(Int64)-1)\r
21   {\r
22     outSize2 = outSize - thunk->outSize;\r
23     thunk->outSize = outSize;\r
24   }\r
25   return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);\r
26 }\r
27 \r
28 \r
29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p)\r
30 {\r
31   p->vt.Progress = MtProgressThunk_Progress;\r
32 }\r
33 \r
34 \r
35 \r
36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }\r
37 \r
38 \r
39 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)\r
40 {\r
41   if (Event_IsCreated(p))\r
42     return Event_Reset(p);\r
43   return AutoResetEvent_CreateNotSignaled(p);\r
44 }\r
45 \r
46 \r
47 static THREAD_FUNC_DECL ThreadFunc(void *pp);\r
48 \r
49 \r
50 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)\r
51 {\r
52   WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);\r
53   if (wres == 0)\r
54   {\r
55     t->stop = False;\r
56     if (!Thread_WasCreated(&t->thread))\r
57       wres = Thread_Create(&t->thread, ThreadFunc, t);\r
58     if (wres == 0)\r
59       wres = Event_Set(&t->startEvent);\r
60   }\r
61   if (wres == 0)\r
62     return SZ_OK;\r
63   return MY_SRes_HRESULT_FROM_WRes(wres);\r
64 }\r
65 \r
66 \r
67 static void MtCoderThread_Destruct(CMtCoderThread *t)\r
68 {\r
69   if (Thread_WasCreated(&t->thread))\r
70   {\r
71     t->stop = 1;\r
72     Event_Set(&t->startEvent);\r
73     Thread_Wait_Close(&t->thread);\r
74   }\r
75 \r
76   Event_Close(&t->startEvent);\r
77 \r
78   if (t->inBuf)\r
79   {\r
80     ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);\r
81     t->inBuf = NULL;\r
82   }\r
83 }\r
84 \r
85 \r
86 \r
87 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)\r
88 {\r
89   size_t size = *processedSize;\r
90   *processedSize = 0;\r
91   while (size != 0)\r
92   {\r
93     size_t cur = size;\r
94     SRes res = ISeqInStream_Read(stream, data, &cur);\r
95     *processedSize += cur;\r
96     data += cur;\r
97     size -= cur;\r
98     RINOK(res);\r
99     if (cur == 0)\r
100       return SZ_OK;\r
101   }\r
102   return SZ_OK;\r
103 }\r
104 \r
105 \r
106 /*\r
107   ThreadFunc2() returns:\r
108   SZ_OK           - in all normal cases (even for stream error or memory allocation error)\r
109   SZ_ERROR_THREAD - in case of failure in system synch function\r
110 */\r
111 \r
112 static SRes ThreadFunc2(CMtCoderThread *t)\r
113 {\r
114   CMtCoder *mtc = t->mtCoder;\r
115 \r
116   for (;;)\r
117   {\r
118     unsigned bi;\r
119     SRes res;\r
120     SRes res2;\r
121     BoolInt finished;\r
122     unsigned bufIndex;\r
123     size_t size;\r
124     const Byte *inData;\r
125     UInt64 readProcessed = 0;\r
126     \r
127     RINOK_THREAD(Event_Wait(&mtc->readEvent))\r
128 \r
129     /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */\r
130 \r
131     if (mtc->stopReading)\r
132     {\r
133       return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;\r
134     }\r
135 \r
136     res = MtProgress_GetError(&mtc->mtProgress);\r
137     \r
138     size = 0;\r
139     inData = NULL;\r
140     finished = True;\r
141 \r
142     if (res == SZ_OK)\r
143     {\r
144       size = mtc->blockSize;\r
145       if (mtc->inStream)\r
146       {\r
147         if (!t->inBuf)\r
148         {\r
149           t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);\r
150           if (!t->inBuf)\r
151             res = SZ_ERROR_MEM;\r
152         }\r
153         if (res == SZ_OK)\r
154         {\r
155           res = FullRead(mtc->inStream, t->inBuf, &size);\r
156           readProcessed = mtc->readProcessed + size;\r
157           mtc->readProcessed = readProcessed;\r
158         }\r
159         if (res != SZ_OK)\r
160         {\r
161           mtc->readRes = res;\r
162           /* after reading error - we can stop encoding of previous blocks */\r
163           MtProgress_SetError(&mtc->mtProgress, res);\r
164         }\r
165         else\r
166           finished = (size != mtc->blockSize);\r
167       }\r
168       else\r
169       {\r
170         size_t rem;\r
171         readProcessed = mtc->readProcessed;\r
172         rem = mtc->inDataSize - (size_t)readProcessed;\r
173         if (size > rem)\r
174           size = rem;\r
175         inData = mtc->inData + (size_t)readProcessed;\r
176         readProcessed += size;\r
177         mtc->readProcessed = readProcessed;\r
178         finished = (mtc->inDataSize == (size_t)readProcessed);\r
179       }\r
180     }\r
181 \r
182     /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */\r
183 \r
184     res2 = SZ_OK;\r
185 \r
186     if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)\r
187     {\r
188       res2 = SZ_ERROR_THREAD;\r
189       if (res == SZ_OK)\r
190       {\r
191         res = res2;\r
192         // MtProgress_SetError(&mtc->mtProgress, res);\r
193       }\r
194     }\r
195 \r
196     bi = mtc->blockIndex;\r
197 \r
198     if (++mtc->blockIndex >= mtc->numBlocksMax)\r
199       mtc->blockIndex = 0;\r
200 \r
201     bufIndex = (unsigned)(int)-1;\r
202 \r
203     if (res == SZ_OK)\r
204       res = MtProgress_GetError(&mtc->mtProgress);\r
205 \r
206     if (res != SZ_OK)\r
207       finished = True;\r
208 \r
209     if (!finished)\r
210     {\r
211       if (mtc->numStartedThreads < mtc->numStartedThreadsLimit\r
212           && mtc->expectedDataSize != readProcessed)\r
213       {\r
214         res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);\r
215         if (res == SZ_OK)\r
216           mtc->numStartedThreads++;\r
217         else\r
218         {\r
219           MtProgress_SetError(&mtc->mtProgress, res);\r
220           finished = True;\r
221         }\r
222       }\r
223     }\r
224 \r
225     if (finished)\r
226       mtc->stopReading = True;\r
227 \r
228     RINOK_THREAD(Event_Set(&mtc->readEvent))\r
229 \r
230     if (res2 != SZ_OK)\r
231       return res2;\r
232 \r
233     if (res == SZ_OK)\r
234     {\r
235       CriticalSection_Enter(&mtc->cs);\r
236       bufIndex = mtc->freeBlockHead;\r
237       mtc->freeBlockHead = mtc->freeBlockList[bufIndex];\r
238       CriticalSection_Leave(&mtc->cs);\r
239       \r
240       res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,\r
241           mtc->inStream ? t->inBuf : inData, size, finished);\r
242       \r
243       // MtProgress_Reinit(&mtc->mtProgress, t->index);\r
244 \r
245       if (res != SZ_OK)\r
246         MtProgress_SetError(&mtc->mtProgress, res);\r
247     }\r
248 \r
249     {\r
250       CMtCoderBlock *block = &mtc->blocks[bi];\r
251       block->res = res;\r
252       block->bufIndex = bufIndex;\r
253       block->finished = finished;\r
254     }\r
255     \r
256     #ifdef MTCODER__USE_WRITE_THREAD\r
257       RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))\r
258     #else\r
259     {\r
260       unsigned wi;\r
261       {\r
262         CriticalSection_Enter(&mtc->cs);\r
263         wi = mtc->writeIndex;\r
264         if (wi == bi)\r
265           mtc->writeIndex = (unsigned)(int)-1;\r
266         else\r
267           mtc->ReadyBlocks[bi] = True;\r
268         CriticalSection_Leave(&mtc->cs);\r
269       }\r
270 \r
271       if (wi != bi)\r
272       {\r
273         if (res != SZ_OK || finished)\r
274           return 0;\r
275         continue;\r
276       }\r
277 \r
278       if (mtc->writeRes != SZ_OK)\r
279         res = mtc->writeRes;\r
280 \r
281       for (;;)\r
282       {\r
283         if (res == SZ_OK && bufIndex != (unsigned)(int)-1)\r
284         {\r
285           res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);\r
286           if (res != SZ_OK)\r
287           {\r
288             mtc->writeRes = res;\r
289             MtProgress_SetError(&mtc->mtProgress, res);\r
290           }\r
291         }\r
292 \r
293         if (++wi >= mtc->numBlocksMax)\r
294           wi = 0;\r
295         {\r
296           BoolInt isReady;\r
297 \r
298           CriticalSection_Enter(&mtc->cs);\r
299           \r
300           if (bufIndex != (unsigned)(int)-1)\r
301           {\r
302             mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;\r
303             mtc->freeBlockHead = bufIndex;\r
304           }\r
305           \r
306           isReady = mtc->ReadyBlocks[wi];\r
307           \r
308           if (isReady)\r
309             mtc->ReadyBlocks[wi] = False;\r
310           else\r
311             mtc->writeIndex = wi;\r
312           \r
313           CriticalSection_Leave(&mtc->cs);\r
314 \r
315           RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))\r
316 \r
317           if (!isReady)\r
318             break;\r
319         }\r
320 \r
321         {\r
322           CMtCoderBlock *block = &mtc->blocks[wi];\r
323           if (res == SZ_OK && block->res != SZ_OK)\r
324             res = block->res;\r
325           bufIndex = block->bufIndex;\r
326           finished = block->finished;\r
327         }\r
328       }\r
329     }\r
330     #endif\r
331       \r
332     if (finished || res != SZ_OK)\r
333       return 0;\r
334   }\r
335 }\r
336 \r
337 \r
338 static THREAD_FUNC_DECL ThreadFunc(void *pp)\r
339 {\r
340   CMtCoderThread *t = (CMtCoderThread *)pp;\r
341   for (;;)\r
342   {\r
343     if (Event_Wait(&t->startEvent) != 0)\r
344       return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;\r
345     if (t->stop)\r
346       return 0;\r
347     {\r
348       SRes res = ThreadFunc2(t);\r
349       CMtCoder *mtc = t->mtCoder;\r
350       if (res != SZ_OK)\r
351       {\r
352         MtProgress_SetError(&mtc->mtProgress, res);\r
353       }\r
354       \r
355       #ifndef MTCODER__USE_WRITE_THREAD\r
356       {\r
357         unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);\r
358         if (numFinished == mtc->numStartedThreads)\r
359           if (Event_Set(&mtc->finishedEvent) != 0)\r
360             return (THREAD_FUNC_RET_TYPE)SZ_ERROR_THREAD;\r
361       }\r
362       #endif\r
363     }\r
364   }\r
365 }\r
366 \r
367 \r
368 \r
369 void MtCoder_Construct(CMtCoder *p)\r
370 {\r
371   unsigned i;\r
372   \r
373   p->blockSize = 0;\r
374   p->numThreadsMax = 0;\r
375   p->expectedDataSize = (UInt64)(Int64)-1;\r
376 \r
377   p->inStream = NULL;\r
378   p->inData = NULL;\r
379   p->inDataSize = 0;\r
380 \r
381   p->progress = NULL;\r
382   p->allocBig = NULL;\r
383 \r
384   p->mtCallback = NULL;\r
385   p->mtCallbackObject = NULL;\r
386 \r
387   p->allocatedBufsSize = 0;\r
388 \r
389   Event_Construct(&p->readEvent);\r
390   Semaphore_Construct(&p->blocksSemaphore);\r
391 \r
392   for (i = 0; i < MTCODER__THREADS_MAX; i++)\r
393   {\r
394     CMtCoderThread *t = &p->threads[i];\r
395     t->mtCoder = p;\r
396     t->index = i;\r
397     t->inBuf = NULL;\r
398     t->stop = False;\r
399     Event_Construct(&t->startEvent);\r
400     Thread_Construct(&t->thread);\r
401   }\r
402 \r
403   #ifdef MTCODER__USE_WRITE_THREAD\r
404     for (i = 0; i < MTCODER__BLOCKS_MAX; i++)\r
405       Event_Construct(&p->writeEvents[i]);\r
406   #else\r
407     Event_Construct(&p->finishedEvent);\r
408   #endif\r
409 \r
410   CriticalSection_Init(&p->cs);\r
411   CriticalSection_Init(&p->mtProgress.cs);\r
412 }\r
413 \r
414 \r
415 \r
416 \r
417 static void MtCoder_Free(CMtCoder *p)\r
418 {\r
419   unsigned i;\r
420 \r
421   /*\r
422   p->stopReading = True;\r
423   if (Event_IsCreated(&p->readEvent))\r
424     Event_Set(&p->readEvent);\r
425   */\r
426 \r
427   for (i = 0; i < MTCODER__THREADS_MAX; i++)\r
428     MtCoderThread_Destruct(&p->threads[i]);\r
429 \r
430   Event_Close(&p->readEvent);\r
431   Semaphore_Close(&p->blocksSemaphore);\r
432 \r
433   #ifdef MTCODER__USE_WRITE_THREAD\r
434     for (i = 0; i < MTCODER__BLOCKS_MAX; i++)\r
435       Event_Close(&p->writeEvents[i]);\r
436   #else\r
437     Event_Close(&p->finishedEvent);\r
438   #endif\r
439 }\r
440 \r
441 \r
442 void MtCoder_Destruct(CMtCoder *p)\r
443 {\r
444   MtCoder_Free(p);\r
445 \r
446   CriticalSection_Delete(&p->cs);\r
447   CriticalSection_Delete(&p->mtProgress.cs);\r
448 }\r
449 \r
450 \r
451 SRes MtCoder_Code(CMtCoder *p)\r
452 {\r
453   unsigned numThreads = p->numThreadsMax;\r
454   unsigned numBlocksMax;\r
455   unsigned i;\r
456   SRes res = SZ_OK;\r
457 \r
458   if (numThreads > MTCODER__THREADS_MAX)\r
459     numThreads = MTCODER__THREADS_MAX;\r
460   numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);\r
461   \r
462   if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;\r
463   if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;\r
464   if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;\r
465 \r
466   if (numBlocksMax > MTCODER__BLOCKS_MAX)\r
467     numBlocksMax = MTCODER__BLOCKS_MAX;\r
468 \r
469   if (p->blockSize != p->allocatedBufsSize)\r
470   {\r
471     for (i = 0; i < MTCODER__THREADS_MAX; i++)\r
472     {\r
473       CMtCoderThread *t = &p->threads[i];\r
474       if (t->inBuf)\r
475       {\r
476         ISzAlloc_Free(p->allocBig, t->inBuf);\r
477         t->inBuf = NULL;\r
478       }\r
479     }\r
480     p->allocatedBufsSize = p->blockSize;\r
481   }\r
482 \r
483   p->readRes = SZ_OK;\r
484 \r
485   MtProgress_Init(&p->mtProgress, p->progress);\r
486 \r
487   #ifdef MTCODER__USE_WRITE_THREAD\r
488     for (i = 0; i < numBlocksMax; i++)\r
489     {\r
490       RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));\r
491     }\r
492   #else\r
493     RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));\r
494   #endif\r
495 \r
496   {\r
497     RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));\r
498     RINOK_THREAD(Semaphore_OptCreateInit(&p->blocksSemaphore, numBlocksMax, numBlocksMax));\r
499   }\r
500 \r
501   for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)\r
502     p->freeBlockList[i] = i + 1;\r
503   p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;\r
504   p->freeBlockHead = 0;\r
505 \r
506   p->readProcessed = 0;\r
507   p->blockIndex = 0;\r
508   p->numBlocksMax = numBlocksMax;\r
509   p->stopReading = False;\r
510 \r
511   #ifndef MTCODER__USE_WRITE_THREAD\r
512     p->writeIndex = 0;\r
513     p->writeRes = SZ_OK;\r
514     for (i = 0; i < MTCODER__BLOCKS_MAX; i++)\r
515       p->ReadyBlocks[i] = False;\r
516     p->numFinishedThreads = 0;\r
517   #endif\r
518 \r
519   p->numStartedThreadsLimit = numThreads;\r
520   p->numStartedThreads = 0;\r
521 \r
522   // for (i = 0; i < numThreads; i++)\r
523   {\r
524     CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];\r
525     RINOK(MtCoderThread_CreateAndStart(nextThread));\r
526   }\r
527 \r
528   RINOK_THREAD(Event_Set(&p->readEvent))\r
529 \r
530   #ifdef MTCODER__USE_WRITE_THREAD\r
531   {\r
532     unsigned bi = 0;\r
533 \r
534     for (;; bi++)\r
535     {\r
536       if (bi >= numBlocksMax)\r
537         bi = 0;\r
538 \r
539       RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))\r
540 \r
541       {\r
542         const CMtCoderBlock *block = &p->blocks[bi];\r
543         unsigned bufIndex = block->bufIndex;\r
544         BoolInt finished = block->finished;\r
545         if (res == SZ_OK && block->res != SZ_OK)\r
546           res = block->res;\r
547 \r
548         if (bufIndex != (unsigned)(int)-1)\r
549         {\r
550           if (res == SZ_OK)\r
551           {\r
552             res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);\r
553             if (res != SZ_OK)\r
554               MtProgress_SetError(&p->mtProgress, res);\r
555           }\r
556           \r
557           CriticalSection_Enter(&p->cs);\r
558           {\r
559             p->freeBlockList[bufIndex] = p->freeBlockHead;\r
560             p->freeBlockHead = bufIndex;\r
561           }\r
562           CriticalSection_Leave(&p->cs);\r
563         }\r
564         \r
565         RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))\r
566 \r
567         if (finished)\r
568           break;\r
569       }\r
570     }\r
571   }\r
572   #else\r
573   {\r
574     WRes wres = Event_Wait(&p->finishedEvent);\r
575     res = MY_SRes_HRESULT_FROM_WRes(wres);\r
576   }\r
577   #endif\r
578 \r
579   if (res == SZ_OK)\r
580     res = p->readRes;\r
581 \r
582   if (res == SZ_OK)\r
583     res = p->mtProgress.res;\r
584 \r
585   #ifndef MTCODER__USE_WRITE_THREAD\r
586     if (res == SZ_OK)\r
587       res = p->writeRes;\r
588   #endif\r
589 \r
590   if (res != SZ_OK)\r
591     MtCoder_Free(p);\r
592   return res;\r
593 }\r
594 \r
595 #endif\r