Merge pull request #511 from negativeExponent/updates
[pcsx_rearmed.git] / deps / lzma-16.04 / C / MtCoder.c
CommitLineData
ce188d4d 1/* MtCoder.c -- Multi-thread Coder\r
22015-10-13 : Igor Pavlov : Public domain */\r
3\r
4#include "Precomp.h"\r
5\r
6#include "MtCoder.h"\r
7\r
8void LoopThread_Construct(CLoopThread *p)\r
9{\r
10 Thread_Construct(&p->thread);\r
11 Event_Construct(&p->startEvent);\r
12 Event_Construct(&p->finishedEvent);\r
13}\r
14\r
15void LoopThread_Close(CLoopThread *p)\r
16{\r
17 Thread_Close(&p->thread);\r
18 Event_Close(&p->startEvent);\r
19 Event_Close(&p->finishedEvent);\r
20}\r
21\r
22static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)\r
23{\r
24 CLoopThread *p = (CLoopThread *)pp;\r
25 for (;;)\r
26 {\r
27 if (Event_Wait(&p->startEvent) != 0)\r
28 return SZ_ERROR_THREAD;\r
29 if (p->stop)\r
30 return 0;\r
31 p->res = p->func(p->param);\r
32 if (Event_Set(&p->finishedEvent) != 0)\r
33 return SZ_ERROR_THREAD;\r
34 }\r
35}\r
36\r
37WRes LoopThread_Create(CLoopThread *p)\r
38{\r
39 p->stop = 0;\r
40 RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));\r
41 RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));\r
42 return Thread_Create(&p->thread, LoopThreadFunc, p);\r
43}\r
44\r
45WRes LoopThread_StopAndWait(CLoopThread *p)\r
46{\r
47 p->stop = 1;\r
48 if (Event_Set(&p->startEvent) != 0)\r
49 return SZ_ERROR_THREAD;\r
50 return Thread_Wait(&p->thread);\r
51}\r
52\r
53WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }\r
54WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }\r
55\r
56static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)\r
57{\r
58 return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;\r
59}\r
60\r
61static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)\r
62{\r
63 unsigned i;\r
64 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)\r
65 p->inSizes[i] = p->outSizes[i] = 0;\r
66 p->totalInSize = p->totalOutSize = 0;\r
67 p->progress = progress;\r
68 p->res = SZ_OK;\r
69}\r
70\r
71static void MtProgress_Reinit(CMtProgress *p, unsigned index)\r
72{\r
73 p->inSizes[index] = 0;\r
74 p->outSizes[index] = 0;\r
75}\r
76\r
77#define UPDATE_PROGRESS(size, prev, total) \\r
78 if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }\r
79\r
80SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)\r
81{\r
82 SRes res;\r
83 CriticalSection_Enter(&p->cs);\r
84 UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)\r
85 UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)\r
86 if (p->res == SZ_OK)\r
87 p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);\r
88 res = p->res;\r
89 CriticalSection_Leave(&p->cs);\r
90 return res;\r
91}\r
92\r
93static void MtProgress_SetError(CMtProgress *p, SRes res)\r
94{\r
95 CriticalSection_Enter(&p->cs);\r
96 if (p->res == SZ_OK)\r
97 p->res = res;\r
98 CriticalSection_Leave(&p->cs);\r
99}\r
100\r
101static void MtCoder_SetError(CMtCoder* p, SRes res)\r
102{\r
103 CriticalSection_Enter(&p->cs);\r
104 if (p->res == SZ_OK)\r
105 p->res = res;\r
106 CriticalSection_Leave(&p->cs);\r
107}\r
108\r
109/* ---------- MtThread ---------- */\r
110\r
111void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)\r
112{\r
113 p->mtCoder = mtCoder;\r
114 p->outBuf = 0;\r
115 p->inBuf = 0;\r
116 Event_Construct(&p->canRead);\r
117 Event_Construct(&p->canWrite);\r
118 LoopThread_Construct(&p->thread);\r
119}\r
120\r
121#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }\r
122\r
123static void CMtThread_CloseEvents(CMtThread *p)\r
124{\r
125 Event_Close(&p->canRead);\r
126 Event_Close(&p->canWrite);\r
127}\r
128\r
129static void CMtThread_Destruct(CMtThread *p)\r
130{\r
131 CMtThread_CloseEvents(p);\r
132\r
133 if (Thread_WasCreated(&p->thread.thread))\r
134 {\r
135 LoopThread_StopAndWait(&p->thread);\r
136 LoopThread_Close(&p->thread);\r
137 }\r
138\r
139 if (p->mtCoder->alloc)\r
140 IAlloc_Free(p->mtCoder->alloc, p->outBuf);\r
141 p->outBuf = 0;\r
142\r
143 if (p->mtCoder->alloc)\r
144 IAlloc_Free(p->mtCoder->alloc, p->inBuf);\r
145 p->inBuf = 0;\r
146}\r
147\r
148#define MY_BUF_ALLOC(buf, size, newSize) \\r
149 if (buf == 0 || size != newSize) \\r
150 { IAlloc_Free(p->mtCoder->alloc, buf); \\r
151 size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \\r
152 if (buf == 0) return SZ_ERROR_MEM; }\r
153\r
154static SRes CMtThread_Prepare(CMtThread *p)\r
155{\r
156 MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)\r
157 MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)\r
158\r
159 p->stopReading = False;\r
160 p->stopWriting = False;\r
161 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));\r
162 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));\r
163\r
164 return SZ_OK;\r
165}\r
166\r
167static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)\r
168{\r
169 size_t size = *processedSize;\r
170 *processedSize = 0;\r
171 while (size != 0)\r
172 {\r
173 size_t curSize = size;\r
174 SRes res = stream->Read(stream, data, &curSize);\r
175 *processedSize += curSize;\r
176 data += curSize;\r
177 size -= curSize;\r
178 RINOK(res);\r
179 if (curSize == 0)\r
180 return SZ_OK;\r
181 }\r
182 return SZ_OK;\r
183}\r
184\r
185#define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads - 1 ? 0 : p->index + 1]\r
186\r
187static SRes MtThread_Process(CMtThread *p, Bool *stop)\r
188{\r
189 CMtThread *next;\r
190 *stop = True;\r
191 if (Event_Wait(&p->canRead) != 0)\r
192 return SZ_ERROR_THREAD;\r
193 \r
194 next = GET_NEXT_THREAD(p);\r
195 \r
196 if (p->stopReading)\r
197 {\r
198 next->stopReading = True;\r
199 return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;\r
200 }\r
201\r
202 {\r
203 size_t size = p->mtCoder->blockSize;\r
204 size_t destSize = p->outBufSize;\r
205\r
206 RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));\r
207 next->stopReading = *stop = (size != p->mtCoder->blockSize);\r
208 if (Event_Set(&next->canRead) != 0)\r
209 return SZ_ERROR_THREAD;\r
210\r
211 RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,\r
212 p->outBuf, &destSize, p->inBuf, size, *stop));\r
213\r
214 MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);\r
215\r
216 if (Event_Wait(&p->canWrite) != 0)\r
217 return SZ_ERROR_THREAD;\r
218 if (p->stopWriting)\r
219 return SZ_ERROR_FAIL;\r
220 if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)\r
221 return SZ_ERROR_WRITE;\r
222 return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;\r
223 }\r
224}\r
225\r
226static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)\r
227{\r
228 CMtThread *p = (CMtThread *)pp;\r
229 for (;;)\r
230 {\r
231 Bool stop;\r
232 CMtThread *next = GET_NEXT_THREAD(p);\r
233 SRes res = MtThread_Process(p, &stop);\r
234 if (res != SZ_OK)\r
235 {\r
236 MtCoder_SetError(p->mtCoder, res);\r
237 MtProgress_SetError(&p->mtCoder->mtProgress, res);\r
238 next->stopReading = True;\r
239 next->stopWriting = True;\r
240 Event_Set(&next->canRead);\r
241 Event_Set(&next->canWrite);\r
242 return res;\r
243 }\r
244 if (stop)\r
245 return 0;\r
246 }\r
247}\r
248\r
249void MtCoder_Construct(CMtCoder* p)\r
250{\r
251 unsigned i;\r
252 p->alloc = 0;\r
253 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)\r
254 {\r
255 CMtThread *t = &p->threads[i];\r
256 t->index = i;\r
257 CMtThread_Construct(t, p);\r
258 }\r
259 CriticalSection_Init(&p->cs);\r
260 CriticalSection_Init(&p->mtProgress.cs);\r
261}\r
262\r
263void MtCoder_Destruct(CMtCoder* p)\r
264{\r
265 unsigned i;\r
266 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)\r
267 CMtThread_Destruct(&p->threads[i]);\r
268 CriticalSection_Delete(&p->cs);\r
269 CriticalSection_Delete(&p->mtProgress.cs);\r
270}\r
271\r
272SRes MtCoder_Code(CMtCoder *p)\r
273{\r
274 unsigned i, numThreads = p->numThreads;\r
275 SRes res = SZ_OK;\r
276 p->res = SZ_OK;\r
277\r
278 MtProgress_Init(&p->mtProgress, p->progress);\r
279\r
280 for (i = 0; i < numThreads; i++)\r
281 {\r
282 RINOK(CMtThread_Prepare(&p->threads[i]));\r
283 }\r
284\r
285 for (i = 0; i < numThreads; i++)\r
286 {\r
287 CMtThread *t = &p->threads[i];\r
288 CLoopThread *lt = &t->thread;\r
289\r
290 if (!Thread_WasCreated(&lt->thread))\r
291 {\r
292 lt->func = ThreadFunc;\r
293 lt->param = t;\r
294\r
295 if (LoopThread_Create(lt) != SZ_OK)\r
296 {\r
297 res = SZ_ERROR_THREAD;\r
298 break;\r
299 }\r
300 }\r
301 }\r
302\r
303 if (res == SZ_OK)\r
304 {\r
305 unsigned j;\r
306 for (i = 0; i < numThreads; i++)\r
307 {\r
308 CMtThread *t = &p->threads[i];\r
309 if (LoopThread_StartSubThread(&t->thread) != SZ_OK)\r
310 {\r
311 res = SZ_ERROR_THREAD;\r
312 p->threads[0].stopReading = True;\r
313 break;\r
314 }\r
315 }\r
316\r
317 Event_Set(&p->threads[0].canWrite);\r
318 Event_Set(&p->threads[0].canRead);\r
319\r
320 for (j = 0; j < i; j++)\r
321 LoopThread_WaitSubThread(&p->threads[j].thread);\r
322 }\r
323\r
324 for (i = 0; i < numThreads; i++)\r
325 CMtThread_CloseEvents(&p->threads[i]);\r
326 return (res == SZ_OK) ? p->res : res;\r
327}\r