FFmpeg
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
pthread_slice.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 /**
20  * @file
21  * Slice multithreading support functions
22  * @see doc/multithreading.txt
23  */
24 
25 #include "config.h"
26 
27 #include "avcodec.h"
28 #include "internal.h"
29 #include "pthread_internal.h"
30 #include "thread.h"
31 
32 #include "libavutil/avassert.h"
33 #include "libavutil/common.h"
34 #include "libavutil/cpu.h"
35 #include "libavutil/mem.h"
36 #include "libavutil/thread.h"
37 
38 typedef int (action_func)(AVCodecContext *c, void *arg);
39 typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
40 
41 typedef struct SliceThreadContext {
45  void *args;
46  int *rets;
47  int job_count;
48  int job_size;
49 
53  unsigned current_execute;
55  int done;
56 
57  int *entries;
63 
64 static void* attribute_align_arg worker(void *v)
65 {
66  AVCodecContext *avctx = v;
68  unsigned last_execute = 0;
69  int our_job = c->job_count;
70  int thread_count = avctx->thread_count;
71  int self_id;
72 
74  self_id = c->current_job++;
75  for (;;){
76  int ret;
77  while (our_job >= c->job_count) {
78  if (c->current_job == thread_count + c->job_count)
80 
81  while (last_execute == c->current_execute && !c->done)
83  last_execute = c->current_execute;
84  our_job = self_id;
85 
86  if (c->done) {
88  return NULL;
89  }
90  }
92 
93  ret = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
94  c->func2(avctx, c->args, our_job, self_id);
95  if (c->rets)
96  c->rets[our_job%c->job_count] = ret;
97 
99  our_job = c->current_job++;
100  }
101 }
102 
104 {
106  int i;
107 
109  c->done = 1;
111  for (i = 0; i < c->thread_count; i++)
114 
115  for (i=0; i<avctx->thread_count; i++)
116  pthread_join(c->workers[i], NULL);
117 
118  for (i = 0; i < c->thread_count; i++) {
121  }
122 
126 
127  av_freep(&c->entries);
129  av_freep(&c->progress_cond);
130 
131  av_freep(&c->workers);
132  av_freep(&avctx->internal->thread_ctx);
133 }
134 
136 {
137  while (c->current_job != thread_count + c->job_count)
140 }
141 
142 static int thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
143 {
145 
146  if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1)
147  return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size);
148 
149  if (job_count <= 0)
150  return 0;
151 
153 
154  c->current_job = avctx->thread_count;
155  c->job_count = job_count;
156  c->job_size = job_size;
157  c->args = arg;
158  c->func = func;
159  if (ret) {
160  c->rets = ret;
161  } else {
162  c->rets = NULL;
163  }
164  c->current_execute++;
166 
168 
169  return 0;
170 }
171 
172 static int thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
173 {
175  c->func2 = func2;
176  return thread_execute(avctx, NULL, arg, ret, job_count, 0);
177 }
178 
180 {
181  int i;
183  int thread_count = avctx->thread_count;
184 
185 #if HAVE_W32THREADS
186  w32thread_init();
187 #endif
188 
189  // We cannot do this in the encoder init as the threads are created before
190  if (av_codec_is_encoder(avctx->codec) &&
191  avctx->codec_id == AV_CODEC_ID_MPEG1VIDEO &&
192  avctx->height > 2800)
193  thread_count = avctx->thread_count = 1;
194 
195  if (!thread_count) {
196  int nb_cpus = av_cpu_count();
197  if (avctx->height)
198  nb_cpus = FFMIN(nb_cpus, (avctx->height+15)/16);
199  // use number of cores + 1 as thread count if there is more than one
200  if (nb_cpus > 1)
201  thread_count = avctx->thread_count = FFMIN(nb_cpus + 1, MAX_AUTO_THREADS);
202  else
203  thread_count = avctx->thread_count = 1;
204  }
205 
206  if (thread_count <= 1) {
207  avctx->active_thread_type = 0;
208  return 0;
209  }
210 
211  c = av_mallocz(sizeof(SliceThreadContext));
212  if (!c)
213  return -1;
214 
215  c->workers = av_mallocz_array(thread_count, sizeof(pthread_t));
216  if (!c->workers) {
217  av_free(c);
218  return -1;
219  }
220 
221  avctx->internal->thread_ctx = c;
222  c->current_job = 0;
223  c->job_count = 0;
224  c->job_size = 0;
225  c->done = 0;
230  for (i=0; i<thread_count; i++) {
231  if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
232  avctx->thread_count = i;
234  ff_thread_free(avctx);
235  return -1;
236  }
237  }
238 
239  thread_park_workers(c, thread_count);
240 
241  avctx->execute = thread_execute;
242  avctx->execute2 = thread_execute2;
243  return 0;
244 }
245 
246 void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n)
247 {
249  int *entries = p->entries;
250 
251  pthread_mutex_lock(&p->progress_mutex[thread]);
252  entries[field] +=n;
253  pthread_cond_signal(&p->progress_cond[thread]);
255 }
256 
257 void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift)
258 {
260  int *entries = p->entries;
261 
262  if (!entries || !field) return;
263 
264  thread = thread ? thread - 1 : p->thread_count - 1;
265 
266  pthread_mutex_lock(&p->progress_mutex[thread]);
267  while ((entries[field - 1] - entries[field]) < shift){
268  pthread_cond_wait(&p->progress_cond[thread], &p->progress_mutex[thread]);
269  }
271 }
272 
274 {
275  int i;
276 
277  if (avctx->active_thread_type & FF_THREAD_SLICE) {
279 
280  if (p->entries) {
281  av_assert0(p->thread_count == avctx->thread_count);
282  av_freep(&p->entries);
283  }
284 
285  p->thread_count = avctx->thread_count;
286  p->entries = av_mallocz_array(count, sizeof(int));
287 
288  if (!p->progress_mutex) {
291  }
292 
293  if (!p->entries || !p->progress_mutex || !p->progress_cond) {
294  av_freep(&p->entries);
296  av_freep(&p->progress_cond);
297  return AVERROR(ENOMEM);
298  }
299  p->entries_count = count;
300 
301  for (i = 0; i < p->thread_count; i++) {
304  }
305  }
306 
307  return 0;
308 }
309 
311 {
313  memset(p->entries, 0, p->entries_count * sizeof(int));
314 }
static av_unused void w32thread_init(void)
Definition: w32pthreads.h:397
#define NULL
Definition: coverity.c:32
const struct AVCodec * codec
Definition: avcodec.h:1541
int ff_slice_thread_init(AVCodecContext *avctx)
static int shift(int a, int b)
Definition: sonic.c:82
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:106
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:164
int av_cpu_count(void)
Definition: cpu.c:256
memory handling functions
int( action_func)(AVCodecContext *c, void *arg)
Definition: pthread_slice.c:38
void ff_thread_await_progress2(AVCodecContext *avctx, int field, int thread, int shift)
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:138
pthread_cond_t current_job_cond
Definition: pthread_slice.c:51
int av_codec_is_encoder(const AVCodec *codec)
Definition: utils.c:167
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
HMTX pthread_mutex_t
Definition: os2threads.h:49
void * thread_ctx
Definition: internal.h:138
Multithreading support functions.
pthread_mutex_t * progress_mutex
Definition: pthread_slice.c:61
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:146
static void *attribute_align_arg worker(void *v)
Definition: pthread_slice.c:64
#define AVERROR(e)
Definition: error.h:43
pthread_mutex_t current_job_lock
Definition: pthread_slice.c:52
int active_thread_type
Which multithreading methods are in use by the codec.
Definition: avcodec.h:2973
action_func2 * func2
Definition: pthread_slice.c:44
const char * arg
Definition: jacosubdec.c:66
simple assert() macros that are a bit more flexible than ISO C assert().
GLsizei count
Definition: opengl_enc.c:109
void ff_thread_free(AVCodecContext *avctx)
Definition: pthread.c:82
action_func * func
Definition: pthread_slice.c:43
int ff_alloc_entries(AVCodecContext *avctx, int count)
static int thread_execute2(AVCodecContext *avctx, action_func2 *func2, void *arg, int *ret, int job_count)
#define FFMIN(a, b)
Definition: common.h:96
void ff_slice_thread_free(AVCodecContext *avctx)
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:88
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:98
#define FF_THREAD_SLICE
Decode more than one part of a single frame at once.
Definition: avcodec.h:2966
pthread_cond_t * progress_cond
Definition: pthread_slice.c:60
int n
Definition: avisynth_c.h:547
int thread_count
thread count is used to decide how many independent tasks should be passed to execute() ...
Definition: avcodec.h:2954
void ff_reset_entries(AVCodecContext *avctx)
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:74
unsigned current_execute
Definition: pthread_slice.c:53
Libavcodec external API header.
void ff_thread_report_progress2(AVCodecContext *avctx, int field, int thread, int n)
enum AVCodecID codec_id
Definition: avcodec.h:1549
main external API structure.
Definition: avcodec.h:1532
static av_always_inline void thread_park_workers(SliceThreadContext *c, int thread_count)
int(* func)(AVBPrint *dst, const char *in, const char *arg)
Definition: jacosubdec.c:67
#define MAX_AUTO_THREADS
common internal api header.
common internal and external API header
static double c[64]
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:127
#define av_free(p)
int(* execute)(struct AVCodecContext *c, int(*func)(struct AVCodecContext *c2, void *arg), void *arg2, int *ret, int count, int size)
The codec may call this to execute several independent things.
Definition: avcodec.h:2994
int(* execute2)(struct AVCodecContext *c, int(*func)(struct AVCodecContext *c2, void *arg, int jobnr, int threadnr), void *arg2, int *ret, int count)
The codec may call this to execute several independent things.
Definition: avcodec.h:3014
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:156
static av_always_inline int pthread_mutex_unlock(pthread_mutex_t *mutex)
Definition: os2threads.h:120
struct AVCodecInternal * internal
Private context used for internal data.
Definition: avcodec.h:1582
pthread_t * workers
Definition: pthread_slice.c:42
static void * av_mallocz_array(size_t nmemb, size_t size)
Definition: mem.h:229
#define av_freep(p)
#define av_always_inline
Definition: attributes.h:39
static av_always_inline int pthread_mutex_lock(pthread_mutex_t *mutex)
Definition: os2threads.h:113
#define av_malloc_array(a, b)
int( action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr)
Definition: pthread_slice.c:39
static int thread_execute(AVCodecContext *avctx, action_func *func, void *arg, int *ret, int job_count, int job_size)
pthread_cond_t last_job_cond
Definition: pthread_slice.c:50
void * av_mallocz(size_t size)
Allocate a block of size bytes with alignment suitable for all memory accesses (including vectors if ...
Definition: mem.c:252
int avcodec_default_execute(AVCodecContext *c, int(*func)(AVCodecContext *c2, void *arg2), void *arg, int *ret, int count, int size)
Definition: utils.c:951