FFmpeg
executor.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2023 Nuo Mi
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with FFmpeg; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include "config.h"
22 
23 #include <stdbool.h>
24 
25 #include "mem.h"
26 #include "thread.h"
27 
28 #include "executor.h"
29 
30 #if !HAVE_THREADS
31 
32 #define ExecutorThread char
33 
34 #define executor_thread_create(t, a, s, ar) 0
35 #define executor_thread_join(t, r) do {} while(0)
36 
37 #else
38 
39 #define ExecutorThread pthread_t
40 
41 #define executor_thread_create(t, a, s, ar) pthread_create(t, a, s, ar)
42 #define executor_thread_join(t, r) pthread_join(t, r)
43 
44 #endif //!HAVE_THREADS
45 
46 typedef struct ThreadInfo {
49 } ThreadInfo;
50 
51 struct AVExecutor {
54  bool recursive;
55 
57  uint8_t *local_contexts;
58 
61  int die;
62 
64 };
65 
66 static AVTask* remove_task(AVTask **prev, AVTask *t)
67 {
68  *prev = t->next;
69  t->next = NULL;
70  return t;
71 }
72 
73 static void add_task(AVTask **prev, AVTask *t)
74 {
75  t->next = *prev;
76  *prev = t;
77 }
78 
79 static int run_one_task(AVExecutor *e, void *lc)
80 {
81  AVTaskCallbacks *cb = &e->cb;
82  AVTask **prev;
83 
84  for (prev = &e->tasks; *prev && !cb->ready(*prev, cb->user_data); prev = &(*prev)->next)
85  /* nothing */;
86  if (*prev) {
87  AVTask *t = remove_task(prev, *prev);
88  if (e->thread_count > 0)
89  ff_mutex_unlock(&e->lock);
90  cb->run(t, lc, cb->user_data);
91  if (e->thread_count > 0)
92  ff_mutex_lock(&e->lock);
93  return 1;
94  }
95  return 0;
96 }
97 
98 #if HAVE_THREADS
99 static void *executor_worker_task(void *data)
100 {
101  ThreadInfo *ti = (ThreadInfo*)data;
102  AVExecutor *e = ti->e;
103  void *lc = e->local_contexts + (ti - e->threads) * e->cb.local_context_size;
104 
105  ff_mutex_lock(&e->lock);
106  while (1) {
107  if (e->die) break;
108 
109  if (!run_one_task(e, lc)) {
110  //no task in one loop
111  ff_cond_wait(&e->cond, &e->lock);
112  }
113  }
114  ff_mutex_unlock(&e->lock);
115  return NULL;
116 }
117 #endif
118 
119 static void executor_free(AVExecutor *e, const int has_lock, const int has_cond)
120 {
121  if (e->thread_count) {
122  //signal die
123  ff_mutex_lock(&e->lock);
124  e->die = 1;
125  ff_cond_broadcast(&e->cond);
126  ff_mutex_unlock(&e->lock);
127 
128  for (int i = 0; i < e->thread_count; i++)
130  }
131  if (has_cond)
132  ff_cond_destroy(&e->cond);
133  if (has_lock)
134  ff_mutex_destroy(&e->lock);
135 
136  av_free(e->threads);
138 
139  av_free(e);
140 }
141 
142 AVExecutor* av_executor_alloc(const AVTaskCallbacks *cb, int thread_count)
143 {
144  AVExecutor *e;
145  int has_lock = 0, has_cond = 0;
146  if (!cb || !cb->user_data || !cb->ready || !cb->run || !cb->priority_higher)
147  return NULL;
148 
149  e = av_mallocz(sizeof(*e));
150  if (!e)
151  return NULL;
152  e->cb = *cb;
153 
154  e->local_contexts = av_calloc(FFMAX(thread_count, 1), e->cb.local_context_size);
155  if (!e->local_contexts)
156  goto free_executor;
157 
158  e->threads = av_calloc(FFMAX(thread_count, 1), sizeof(*e->threads));
159  if (!e->threads)
160  goto free_executor;
161 
162  if (!thread_count)
163  return e;
164 
165  has_lock = !ff_mutex_init(&e->lock, NULL);
166  has_cond = !ff_cond_init(&e->cond, NULL);
167 
168  if (!has_lock || !has_cond)
169  goto free_executor;
170 
171  for (/* nothing */; e->thread_count < thread_count; e->thread_count++) {
172  ThreadInfo *ti = e->threads + e->thread_count;
173  ti->e = e;
174  if (executor_thread_create(&ti->thread, NULL, executor_worker_task, ti))
175  goto free_executor;
176  }
177  return e;
178 
179 free_executor:
180  executor_free(e, has_lock, has_cond);
181  return NULL;
182 }
183 
184 void av_executor_free(AVExecutor **executor)
185 {
186  int thread_count;
187 
188  if (!executor || !*executor)
189  return;
190  thread_count = (*executor)->thread_count;
191  executor_free(*executor, thread_count, thread_count);
192  *executor = NULL;
193 }
194 
196 {
197  AVTaskCallbacks *cb = &e->cb;
198  AVTask **prev;
199 
200  if (e->thread_count)
201  ff_mutex_lock(&e->lock);
202  if (t) {
203  for (prev = &e->tasks; *prev && cb->priority_higher(*prev, t); prev = &(*prev)->next)
204  /* nothing */;
205  add_task(prev, t);
206  }
207  if (e->thread_count) {
208  ff_cond_signal(&e->cond);
209  ff_mutex_unlock(&e->lock);
210  }
211 
212  if (!e->thread_count || !HAVE_THREADS) {
213  if (e->recursive)
214  return;
215  e->recursive = true;
216  // We are running in a single-threaded environment, so we must handle all tasks ourselves
217  while (run_one_task(e, e->local_contexts))
218  /* nothing */;
219  e->recursive = false;
220  }
221 }
ThreadInfo::thread
ExecutorThread thread
Definition: executor.c:48
AVExecutor::threads
ThreadInfo * threads
Definition: executor.c:56
ff_mutex_init
static int ff_mutex_init(AVMutex *mutex, const void *attr)
Definition: thread.h:187
cb
static double cb(void *priv, double x, double y)
Definition: vf_geq.c:247
ExecutorThread
#define ExecutorThread
Definition: executor.c:32
thread.h
AVTask::next
AVTask * next
Definition: executor.h:28
data
const char data[16]
Definition: mxf.c:149
FFMAX
#define FFMAX(a, b)
Definition: macros.h:47
ff_cond_broadcast
static int ff_cond_broadcast(AVCond *cond)
Definition: thread.h:197
ff_mutex_unlock
static int ff_mutex_unlock(AVMutex *mutex)
Definition: thread.h:189
add_task
static void add_task(AVTask **prev, AVTask *t)
Definition: executor.c:73
AVTaskCallbacks
Definition: executor.h:31
AVTaskCallbacks::local_context_size
int local_context_size
Definition: executor.h:34
AVExecutor::die
int die
Definition: executor.c:61
AVExecutor::recursive
bool recursive
Definition: executor.c:54
AVMutex
#define AVMutex
Definition: thread.h:184
ff_cond_wait
static int ff_cond_wait(AVCond *cond, AVMutex *mutex)
Definition: thread.h:198
AVCond
#define AVCond
Definition: thread.h:192
av_executor_alloc
AVExecutor * av_executor_alloc(const AVTaskCallbacks *cb, int thread_count)
Alloc executor.
Definition: executor.c:142
AVTask
Definition: executor.h:27
NULL
#define NULL
Definition: coverity.c:32
AVExecutor
Definition: executor.c:51
AVExecutor::cond
AVCond cond
Definition: executor.c:60
ThreadInfo::e
FFExecutor * e
Definition: executor.c:47
ff_mutex_destroy
static int ff_mutex_destroy(AVMutex *mutex)
Definition: thread.h:190
AVExecutor::thread_count
int thread_count
Definition: executor.c:53
AVExecutor::cb
AVTaskCallbacks cb
Definition: executor.c:52
ThreadInfo::e
AVExecutor * e
Definition: executor.c:47
executor_free
static void executor_free(AVExecutor *e, const int has_lock, const int has_cond)
Definition: executor.c:119
ThreadInfo
HAVE_THREADS.
Definition: executor.c:46
av_executor_execute
void av_executor_execute(AVExecutor *e, AVTask *t)
Add task to executor.
Definition: executor.c:195
ff_mutex_lock
static int ff_mutex_lock(AVMutex *mutex)
Definition: thread.h:188
i
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:256
av_mallocz
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:256
AVExecutor::tasks
AVTask * tasks
Definition: executor.c:63
av_calloc
void * av_calloc(size_t nmemb, size_t size)
Definition: mem.c:264
remove_task
static AVTask * remove_task(AVTask **prev, AVTask *t)
Definition: executor.c:66
ff_cond_signal
static int ff_cond_signal(AVCond *cond)
Definition: thread.h:196
executor_thread_join
#define executor_thread_join(t, r)
Definition: executor.c:35
mem.h
av_free
#define av_free(p)
Definition: tableprint_vlc.h:33
ff_cond_destroy
static int ff_cond_destroy(AVCond *cond)
Definition: thread.h:195
run_one_task
static int run_one_task(AVExecutor *e, void *lc)
Definition: executor.c:79
executor_thread_create
#define executor_thread_create(t, a, s, ar)
Definition: executor.c:34
ff_cond_init
static int ff_cond_init(AVCond *cond, const void *attr)
Definition: thread.h:194
AVExecutor::lock
AVMutex lock
Definition: executor.c:59
av_executor_free
void av_executor_free(AVExecutor **executor)
Free executor.
Definition: executor.c:184
executor.h
AVExecutor::local_contexts
uint8_t * local_contexts
Definition: executor.c:57