00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <pthread.h>
00025
00026 #include "avcodec.h"
00027
00028 typedef int (action_func)(AVCodecContext *c, void *arg);
00029 typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
00030
00031 typedef struct ThreadContext {
00032 pthread_t *workers;
00033 action_func *func;
00034 action_func2 *func2;
00035 void *args;
00036 int *rets;
00037 int rets_count;
00038 int job_count;
00039 int job_size;
00040
00041 pthread_cond_t last_job_cond;
00042 pthread_cond_t current_job_cond;
00043 pthread_mutex_t current_job_lock;
00044 int current_job;
00045 int done;
00046 } ThreadContext;
00047
00048 static void* attribute_align_arg worker(void *v)
00049 {
00050 AVCodecContext *avctx = v;
00051 ThreadContext *c = avctx->thread_opaque;
00052 int our_job = c->job_count;
00053 int thread_count = avctx->thread_count;
00054 int self_id;
00055
00056 pthread_mutex_lock(&c->current_job_lock);
00057 self_id = c->current_job++;
00058 for (;;){
00059 while (our_job >= c->job_count) {
00060 if (c->current_job == thread_count + c->job_count)
00061 pthread_cond_signal(&c->last_job_cond);
00062
00063 pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
00064 our_job = self_id;
00065
00066 if (c->done) {
00067 pthread_mutex_unlock(&c->current_job_lock);
00068 return NULL;
00069 }
00070 }
00071 pthread_mutex_unlock(&c->current_job_lock);
00072
00073 c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
00074 c->func2(avctx, c->args, our_job, self_id);
00075
00076 pthread_mutex_lock(&c->current_job_lock);
00077 our_job = c->current_job++;
00078 }
00079 }
00080
00081 static av_always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count)
00082 {
00083 pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
00084 pthread_mutex_unlock(&c->current_job_lock);
00085 }
00086
00087 void avcodec_thread_free(AVCodecContext *avctx)
00088 {
00089 ThreadContext *c = avctx->thread_opaque;
00090 int i;
00091
00092 pthread_mutex_lock(&c->current_job_lock);
00093 c->done = 1;
00094 pthread_cond_broadcast(&c->current_job_cond);
00095 pthread_mutex_unlock(&c->current_job_lock);
00096
00097 for (i=0; i<avctx->thread_count; i++)
00098 pthread_join(c->workers[i], NULL);
00099
00100 pthread_mutex_destroy(&c->current_job_lock);
00101 pthread_cond_destroy(&c->current_job_cond);
00102 pthread_cond_destroy(&c->last_job_cond);
00103 av_free(c->workers);
00104 av_freep(&avctx->thread_opaque);
00105 }
00106
00107 static int avcodec_thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
00108 {
00109 ThreadContext *c= avctx->thread_opaque;
00110 int dummy_ret;
00111
00112 if (job_count <= 0)
00113 return 0;
00114
00115 pthread_mutex_lock(&c->current_job_lock);
00116
00117 c->current_job = avctx->thread_count;
00118 c->job_count = job_count;
00119 c->job_size = job_size;
00120 c->args = arg;
00121 c->func = func;
00122 if (ret) {
00123 c->rets = ret;
00124 c->rets_count = job_count;
00125 } else {
00126 c->rets = &dummy_ret;
00127 c->rets_count = 1;
00128 }
00129 pthread_cond_broadcast(&c->current_job_cond);
00130
00131 avcodec_thread_park_workers(c, avctx->thread_count);
00132
00133 return 0;
00134 }
00135
00136 static int avcodec_thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
00137 {
00138 ThreadContext *c= avctx->thread_opaque;
00139 c->func2 = func2;
00140 return avcodec_thread_execute(avctx, NULL, arg, ret, job_count, 0);
00141 }
00142
00143 int avcodec_thread_init(AVCodecContext *avctx, int thread_count)
00144 {
00145 int i;
00146 ThreadContext *c;
00147
00148 avctx->thread_count = thread_count;
00149
00150 if (thread_count <= 1)
00151 return 0;
00152
00153 c = av_mallocz(sizeof(ThreadContext));
00154 if (!c)
00155 return -1;
00156
00157 c->workers = av_mallocz(sizeof(pthread_t)*thread_count);
00158 if (!c->workers) {
00159 av_free(c);
00160 return -1;
00161 }
00162
00163 avctx->thread_opaque = c;
00164 c->current_job = 0;
00165 c->job_count = 0;
00166 c->job_size = 0;
00167 c->done = 0;
00168 pthread_cond_init(&c->current_job_cond, NULL);
00169 pthread_cond_init(&c->last_job_cond, NULL);
00170 pthread_mutex_init(&c->current_job_lock, NULL);
00171 pthread_mutex_lock(&c->current_job_lock);
00172 for (i=0; i<thread_count; i++) {
00173 if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
00174 avctx->thread_count = i;
00175 pthread_mutex_unlock(&c->current_job_lock);
00176 avcodec_thread_free(avctx);
00177 return -1;
00178 }
00179 }
00180
00181 avcodec_thread_park_workers(c, thread_count);
00182
00183 avctx->execute = avcodec_thread_execute;
00184 avctx->execute2 = avcodec_thread_execute2;
00185 return 0;
00186 }