1 /* MtCoder.c -- Multi-thread Coder
\r
2 2015-10-13 : Igor Pavlov : Public domain */
\r
8 void LoopThread_Construct(CLoopThread *p)
\r
10 Thread_Construct(&p->thread);
\r
11 Event_Construct(&p->startEvent);
\r
12 Event_Construct(&p->finishedEvent);
\r
15 void LoopThread_Close(CLoopThread *p)
\r
17 Thread_Close(&p->thread);
\r
18 Event_Close(&p->startEvent);
\r
19 Event_Close(&p->finishedEvent);
\r
22 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)
\r
24 CLoopThread *p = (CLoopThread *)pp;
\r
27 if (Event_Wait(&p->startEvent) != 0)
\r
28 return SZ_ERROR_THREAD;
\r
31 p->res = p->func(p->param);
\r
32 if (Event_Set(&p->finishedEvent) != 0)
\r
33 return SZ_ERROR_THREAD;
\r
37 WRes LoopThread_Create(CLoopThread *p)
\r
40 RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));
\r
41 RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));
\r
42 return Thread_Create(&p->thread, LoopThreadFunc, p);
\r
45 WRes LoopThread_StopAndWait(CLoopThread *p)
\r
48 if (Event_Set(&p->startEvent) != 0)
\r
49 return SZ_ERROR_THREAD;
\r
50 return Thread_Wait(&p->thread);
\r
53 WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }
\r
54 WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }
\r
56 static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)
\r
58 return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;
\r
61 static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
\r
64 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
\r
65 p->inSizes[i] = p->outSizes[i] = 0;
\r
66 p->totalInSize = p->totalOutSize = 0;
\r
67 p->progress = progress;
\r
71 static void MtProgress_Reinit(CMtProgress *p, unsigned index)
\r
73 p->inSizes[index] = 0;
\r
74 p->outSizes[index] = 0;
\r
77 #define UPDATE_PROGRESS(size, prev, total) \
\r
78 if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }
\r
80 SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)
\r
83 CriticalSection_Enter(&p->cs);
\r
84 UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)
\r
85 UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)
\r
86 if (p->res == SZ_OK)
\r
87 p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);
\r
89 CriticalSection_Leave(&p->cs);
\r
93 static void MtProgress_SetError(CMtProgress *p, SRes res)
\r
95 CriticalSection_Enter(&p->cs);
\r
96 if (p->res == SZ_OK)
\r
98 CriticalSection_Leave(&p->cs);
\r
101 static void MtCoder_SetError(CMtCoder* p, SRes res)
\r
103 CriticalSection_Enter(&p->cs);
\r
104 if (p->res == SZ_OK)
\r
106 CriticalSection_Leave(&p->cs);
\r
109 /* ---------- MtThread ---------- */
\r
111 void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)
\r
113 p->mtCoder = mtCoder;
\r
116 Event_Construct(&p->canRead);
\r
117 Event_Construct(&p->canWrite);
\r
118 LoopThread_Construct(&p->thread);
\r
121 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
\r
123 static void CMtThread_CloseEvents(CMtThread *p)
\r
125 Event_Close(&p->canRead);
\r
126 Event_Close(&p->canWrite);
\r
129 static void CMtThread_Destruct(CMtThread *p)
\r
131 CMtThread_CloseEvents(p);
\r
133 if (Thread_WasCreated(&p->thread.thread))
\r
135 LoopThread_StopAndWait(&p->thread);
\r
136 LoopThread_Close(&p->thread);
\r
139 if (p->mtCoder->alloc)
\r
140 IAlloc_Free(p->mtCoder->alloc, p->outBuf);
\r
143 if (p->mtCoder->alloc)
\r
144 IAlloc_Free(p->mtCoder->alloc, p->inBuf);
\r
148 #define MY_BUF_ALLOC(buf, size, newSize) \
\r
149 if (buf == 0 || size != newSize) \
\r
150 { IAlloc_Free(p->mtCoder->alloc, buf); \
\r
151 size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \
\r
152 if (buf == 0) return SZ_ERROR_MEM; }
\r
154 static SRes CMtThread_Prepare(CMtThread *p)
\r
156 MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)
\r
157 MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)
\r
159 p->stopReading = False;
\r
160 p->stopWriting = False;
\r
161 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));
\r
162 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));
\r
167 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
\r
169 size_t size = *processedSize;
\r
170 *processedSize = 0;
\r
173 size_t curSize = size;
\r
174 SRes res = stream->Read(stream, data, &curSize);
\r
175 *processedSize += curSize;
\r
185 #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads - 1 ? 0 : p->index + 1]
\r
187 static SRes MtThread_Process(CMtThread *p, Bool *stop)
\r
191 if (Event_Wait(&p->canRead) != 0)
\r
192 return SZ_ERROR_THREAD;
\r
194 next = GET_NEXT_THREAD(p);
\r
196 if (p->stopReading)
\r
198 next->stopReading = True;
\r
199 return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;
\r
203 size_t size = p->mtCoder->blockSize;
\r
204 size_t destSize = p->outBufSize;
\r
206 RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));
\r
207 next->stopReading = *stop = (size != p->mtCoder->blockSize);
\r
208 if (Event_Set(&next->canRead) != 0)
\r
209 return SZ_ERROR_THREAD;
\r
211 RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,
\r
212 p->outBuf, &destSize, p->inBuf, size, *stop));
\r
214 MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);
\r
216 if (Event_Wait(&p->canWrite) != 0)
\r
217 return SZ_ERROR_THREAD;
\r
218 if (p->stopWriting)
\r
219 return SZ_ERROR_FAIL;
\r
220 if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)
\r
221 return SZ_ERROR_WRITE;
\r
222 return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;
\r
226 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
\r
228 CMtThread *p = (CMtThread *)pp;
\r
232 CMtThread *next = GET_NEXT_THREAD(p);
\r
233 SRes res = MtThread_Process(p, &stop);
\r
236 MtCoder_SetError(p->mtCoder, res);
\r
237 MtProgress_SetError(&p->mtCoder->mtProgress, res);
\r
238 next->stopReading = True;
\r
239 next->stopWriting = True;
\r
240 Event_Set(&next->canRead);
\r
241 Event_Set(&next->canWrite);
\r
249 void MtCoder_Construct(CMtCoder* p)
\r
253 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
\r
255 CMtThread *t = &p->threads[i];
\r
257 CMtThread_Construct(t, p);
\r
259 CriticalSection_Init(&p->cs);
\r
260 CriticalSection_Init(&p->mtProgress.cs);
\r
263 void MtCoder_Destruct(CMtCoder* p)
\r
266 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
\r
267 CMtThread_Destruct(&p->threads[i]);
\r
268 CriticalSection_Delete(&p->cs);
\r
269 CriticalSection_Delete(&p->mtProgress.cs);
\r
272 SRes MtCoder_Code(CMtCoder *p)
\r
274 unsigned i, numThreads = p->numThreads;
\r
278 MtProgress_Init(&p->mtProgress, p->progress);
\r
280 for (i = 0; i < numThreads; i++)
\r
282 RINOK(CMtThread_Prepare(&p->threads[i]));
\r
285 for (i = 0; i < numThreads; i++)
\r
287 CMtThread *t = &p->threads[i];
\r
288 CLoopThread *lt = &t->thread;
\r
290 if (!Thread_WasCreated(<->thread))
\r
292 lt->func = ThreadFunc;
\r
295 if (LoopThread_Create(lt) != SZ_OK)
\r
297 res = SZ_ERROR_THREAD;
\r
306 for (i = 0; i < numThreads; i++)
\r
308 CMtThread *t = &p->threads[i];
\r
309 if (LoopThread_StartSubThread(&t->thread) != SZ_OK)
\r
311 res = SZ_ERROR_THREAD;
\r
312 p->threads[0].stopReading = True;
\r
317 Event_Set(&p->threads[0].canWrite);
\r
318 Event_Set(&p->threads[0].canRead);
\r
320 for (j = 0; j < i; j++)
\r
321 LoopThread_WaitSubThread(&p->threads[j].thread);
\r
324 for (i = 0; i < numThreads; i++)
\r
325 CMtThread_CloseEvents(&p->threads[i]);
\r
326 return (res == SZ_OK) ? p->res : res;
\r