ce188d4d |
1 | /* MtCoder.c -- Multi-thread Coder\r |
2 | 2015-10-13 : Igor Pavlov : Public domain */\r |
3 | \r |
4 | #include "Precomp.h"\r |
5 | \r |
6 | #include "MtCoder.h"\r |
7 | \r |
8 | void LoopThread_Construct(CLoopThread *p)\r |
9 | {\r |
10 | Thread_Construct(&p->thread);\r |
11 | Event_Construct(&p->startEvent);\r |
12 | Event_Construct(&p->finishedEvent);\r |
13 | }\r |
14 | \r |
15 | void LoopThread_Close(CLoopThread *p)\r |
16 | {\r |
17 | Thread_Close(&p->thread);\r |
18 | Event_Close(&p->startEvent);\r |
19 | Event_Close(&p->finishedEvent);\r |
20 | }\r |
21 | \r |
22 | static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)\r |
23 | {\r |
24 | CLoopThread *p = (CLoopThread *)pp;\r |
25 | for (;;)\r |
26 | {\r |
27 | if (Event_Wait(&p->startEvent) != 0)\r |
28 | return SZ_ERROR_THREAD;\r |
29 | if (p->stop)\r |
30 | return 0;\r |
31 | p->res = p->func(p->param);\r |
32 | if (Event_Set(&p->finishedEvent) != 0)\r |
33 | return SZ_ERROR_THREAD;\r |
34 | }\r |
35 | }\r |
36 | \r |
37 | WRes LoopThread_Create(CLoopThread *p)\r |
38 | {\r |
39 | p->stop = 0;\r |
40 | RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));\r |
41 | RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));\r |
42 | return Thread_Create(&p->thread, LoopThreadFunc, p);\r |
43 | }\r |
44 | \r |
45 | WRes LoopThread_StopAndWait(CLoopThread *p)\r |
46 | {\r |
47 | p->stop = 1;\r |
48 | if (Event_Set(&p->startEvent) != 0)\r |
49 | return SZ_ERROR_THREAD;\r |
50 | return Thread_Wait(&p->thread);\r |
51 | }\r |
52 | \r |
53 | WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }\r |
54 | WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }\r |
55 | \r |
56 | static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)\r |
57 | {\r |
58 | return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;\r |
59 | }\r |
60 | \r |
61 | static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)\r |
62 | {\r |
63 | unsigned i;\r |
64 | for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)\r |
65 | p->inSizes[i] = p->outSizes[i] = 0;\r |
66 | p->totalInSize = p->totalOutSize = 0;\r |
67 | p->progress = progress;\r |
68 | p->res = SZ_OK;\r |
69 | }\r |
70 | \r |
71 | static void MtProgress_Reinit(CMtProgress *p, unsigned index)\r |
72 | {\r |
73 | p->inSizes[index] = 0;\r |
74 | p->outSizes[index] = 0;\r |
75 | }\r |
76 | \r |
77 | #define UPDATE_PROGRESS(size, prev, total) \\r |
78 | if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }\r |
79 | \r |
80 | SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)\r |
81 | {\r |
82 | SRes res;\r |
83 | CriticalSection_Enter(&p->cs);\r |
84 | UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)\r |
85 | UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)\r |
86 | if (p->res == SZ_OK)\r |
87 | p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);\r |
88 | res = p->res;\r |
89 | CriticalSection_Leave(&p->cs);\r |
90 | return res;\r |
91 | }\r |
92 | \r |
93 | static void MtProgress_SetError(CMtProgress *p, SRes res)\r |
94 | {\r |
95 | CriticalSection_Enter(&p->cs);\r |
96 | if (p->res == SZ_OK)\r |
97 | p->res = res;\r |
98 | CriticalSection_Leave(&p->cs);\r |
99 | }\r |
100 | \r |
101 | static void MtCoder_SetError(CMtCoder* p, SRes res)\r |
102 | {\r |
103 | CriticalSection_Enter(&p->cs);\r |
104 | if (p->res == SZ_OK)\r |
105 | p->res = res;\r |
106 | CriticalSection_Leave(&p->cs);\r |
107 | }\r |
108 | \r |
109 | /* ---------- MtThread ---------- */\r |
110 | \r |
111 | void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)\r |
112 | {\r |
113 | p->mtCoder = mtCoder;\r |
114 | p->outBuf = 0;\r |
115 | p->inBuf = 0;\r |
116 | Event_Construct(&p->canRead);\r |
117 | Event_Construct(&p->canWrite);\r |
118 | LoopThread_Construct(&p->thread);\r |
119 | }\r |
120 | \r |
121 | #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }\r |
122 | \r |
123 | static void CMtThread_CloseEvents(CMtThread *p)\r |
124 | {\r |
125 | Event_Close(&p->canRead);\r |
126 | Event_Close(&p->canWrite);\r |
127 | }\r |
128 | \r |
129 | static void CMtThread_Destruct(CMtThread *p)\r |
130 | {\r |
131 | CMtThread_CloseEvents(p);\r |
132 | \r |
133 | if (Thread_WasCreated(&p->thread.thread))\r |
134 | {\r |
135 | LoopThread_StopAndWait(&p->thread);\r |
136 | LoopThread_Close(&p->thread);\r |
137 | }\r |
138 | \r |
139 | if (p->mtCoder->alloc)\r |
140 | IAlloc_Free(p->mtCoder->alloc, p->outBuf);\r |
141 | p->outBuf = 0;\r |
142 | \r |
143 | if (p->mtCoder->alloc)\r |
144 | IAlloc_Free(p->mtCoder->alloc, p->inBuf);\r |
145 | p->inBuf = 0;\r |
146 | }\r |
147 | \r |
148 | #define MY_BUF_ALLOC(buf, size, newSize) \\r |
149 | if (buf == 0 || size != newSize) \\r |
150 | { IAlloc_Free(p->mtCoder->alloc, buf); \\r |
151 | size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \\r |
152 | if (buf == 0) return SZ_ERROR_MEM; }\r |
153 | \r |
154 | static SRes CMtThread_Prepare(CMtThread *p)\r |
155 | {\r |
156 | MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)\r |
157 | MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)\r |
158 | \r |
159 | p->stopReading = False;\r |
160 | p->stopWriting = False;\r |
161 | RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));\r |
162 | RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));\r |
163 | \r |
164 | return SZ_OK;\r |
165 | }\r |
166 | \r |
167 | static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)\r |
168 | {\r |
169 | size_t size = *processedSize;\r |
170 | *processedSize = 0;\r |
171 | while (size != 0)\r |
172 | {\r |
173 | size_t curSize = size;\r |
174 | SRes res = stream->Read(stream, data, &curSize);\r |
175 | *processedSize += curSize;\r |
176 | data += curSize;\r |
177 | size -= curSize;\r |
178 | RINOK(res);\r |
179 | if (curSize == 0)\r |
180 | return SZ_OK;\r |
181 | }\r |
182 | return SZ_OK;\r |
183 | }\r |
184 | \r |
185 | #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads - 1 ? 0 : p->index + 1]\r |
186 | \r |
187 | static SRes MtThread_Process(CMtThread *p, Bool *stop)\r |
188 | {\r |
189 | CMtThread *next;\r |
190 | *stop = True;\r |
191 | if (Event_Wait(&p->canRead) != 0)\r |
192 | return SZ_ERROR_THREAD;\r |
193 | \r |
194 | next = GET_NEXT_THREAD(p);\r |
195 | \r |
196 | if (p->stopReading)\r |
197 | {\r |
198 | next->stopReading = True;\r |
199 | return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;\r |
200 | }\r |
201 | \r |
202 | {\r |
203 | size_t size = p->mtCoder->blockSize;\r |
204 | size_t destSize = p->outBufSize;\r |
205 | \r |
206 | RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));\r |
207 | next->stopReading = *stop = (size != p->mtCoder->blockSize);\r |
208 | if (Event_Set(&next->canRead) != 0)\r |
209 | return SZ_ERROR_THREAD;\r |
210 | \r |
211 | RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,\r |
212 | p->outBuf, &destSize, p->inBuf, size, *stop));\r |
213 | \r |
214 | MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);\r |
215 | \r |
216 | if (Event_Wait(&p->canWrite) != 0)\r |
217 | return SZ_ERROR_THREAD;\r |
218 | if (p->stopWriting)\r |
219 | return SZ_ERROR_FAIL;\r |
220 | if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)\r |
221 | return SZ_ERROR_WRITE;\r |
222 | return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;\r |
223 | }\r |
224 | }\r |
225 | \r |
226 | static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)\r |
227 | {\r |
228 | CMtThread *p = (CMtThread *)pp;\r |
229 | for (;;)\r |
230 | {\r |
231 | Bool stop;\r |
232 | CMtThread *next = GET_NEXT_THREAD(p);\r |
233 | SRes res = MtThread_Process(p, &stop);\r |
234 | if (res != SZ_OK)\r |
235 | {\r |
236 | MtCoder_SetError(p->mtCoder, res);\r |
237 | MtProgress_SetError(&p->mtCoder->mtProgress, res);\r |
238 | next->stopReading = True;\r |
239 | next->stopWriting = True;\r |
240 | Event_Set(&next->canRead);\r |
241 | Event_Set(&next->canWrite);\r |
242 | return res;\r |
243 | }\r |
244 | if (stop)\r |
245 | return 0;\r |
246 | }\r |
247 | }\r |
248 | \r |
249 | void MtCoder_Construct(CMtCoder* p)\r |
250 | {\r |
251 | unsigned i;\r |
252 | p->alloc = 0;\r |
253 | for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)\r |
254 | {\r |
255 | CMtThread *t = &p->threads[i];\r |
256 | t->index = i;\r |
257 | CMtThread_Construct(t, p);\r |
258 | }\r |
259 | CriticalSection_Init(&p->cs);\r |
260 | CriticalSection_Init(&p->mtProgress.cs);\r |
261 | }\r |
262 | \r |
263 | void MtCoder_Destruct(CMtCoder* p)\r |
264 | {\r |
265 | unsigned i;\r |
266 | for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)\r |
267 | CMtThread_Destruct(&p->threads[i]);\r |
268 | CriticalSection_Delete(&p->cs);\r |
269 | CriticalSection_Delete(&p->mtProgress.cs);\r |
270 | }\r |
271 | \r |
272 | SRes MtCoder_Code(CMtCoder *p)\r |
273 | {\r |
274 | unsigned i, numThreads = p->numThreads;\r |
275 | SRes res = SZ_OK;\r |
276 | p->res = SZ_OK;\r |
277 | \r |
278 | MtProgress_Init(&p->mtProgress, p->progress);\r |
279 | \r |
280 | for (i = 0; i < numThreads; i++)\r |
281 | {\r |
282 | RINOK(CMtThread_Prepare(&p->threads[i]));\r |
283 | }\r |
284 | \r |
285 | for (i = 0; i < numThreads; i++)\r |
286 | {\r |
287 | CMtThread *t = &p->threads[i];\r |
288 | CLoopThread *lt = &t->thread;\r |
289 | \r |
290 | if (!Thread_WasCreated(<->thread))\r |
291 | {\r |
292 | lt->func = ThreadFunc;\r |
293 | lt->param = t;\r |
294 | \r |
295 | if (LoopThread_Create(lt) != SZ_OK)\r |
296 | {\r |
297 | res = SZ_ERROR_THREAD;\r |
298 | break;\r |
299 | }\r |
300 | }\r |
301 | }\r |
302 | \r |
303 | if (res == SZ_OK)\r |
304 | {\r |
305 | unsigned j;\r |
306 | for (i = 0; i < numThreads; i++)\r |
307 | {\r |
308 | CMtThread *t = &p->threads[i];\r |
309 | if (LoopThread_StartSubThread(&t->thread) != SZ_OK)\r |
310 | {\r |
311 | res = SZ_ERROR_THREAD;\r |
312 | p->threads[0].stopReading = True;\r |
313 | break;\r |
314 | }\r |
315 | }\r |
316 | \r |
317 | Event_Set(&p->threads[0].canWrite);\r |
318 | Event_Set(&p->threads[0].canRead);\r |
319 | \r |
320 | for (j = 0; j < i; j++)\r |
321 | LoopThread_WaitSubThread(&p->threads[j].thread);\r |
322 | }\r |
323 | \r |
324 | for (i = 0; i < numThreads; i++)\r |
325 | CMtThread_CloseEvents(&p->threads[i]);\r |
326 | return (res == SZ_OK) ? p->res : res;\r |
327 | }\r |