00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <pthread.h>
00025
00026 #include "avcodec.h"
00027
00028 typedef int (action_func)(AVCodecContext *c, void *arg);
00029
00030 typedef struct ThreadContext {
00031 pthread_t *workers;
00032 action_func *func;
00033 void *args;
00034 int *rets;
00035 int rets_count;
00036 int job_count;
00037 int job_size;
00038
00039 pthread_cond_t last_job_cond;
00040 pthread_cond_t current_job_cond;
00041 pthread_mutex_t current_job_lock;
00042 int current_job;
00043 int done;
00044 } ThreadContext;
00045
00046 static void* attribute_align_arg worker(void *v)
00047 {
00048 AVCodecContext *avctx = v;
00049 ThreadContext *c = avctx->thread_opaque;
00050 int our_job = c->job_count;
00051 int thread_count = avctx->thread_count;
00052 int self_id;
00053
00054 pthread_mutex_lock(&c->current_job_lock);
00055 self_id = c->current_job++;
00056 for (;;){
00057 while (our_job >= c->job_count) {
00058 if (c->current_job == thread_count + c->job_count)
00059 pthread_cond_signal(&c->last_job_cond);
00060
00061 pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
00062 our_job = self_id;
00063
00064 if (c->done) {
00065 pthread_mutex_unlock(&c->current_job_lock);
00066 return NULL;
00067 }
00068 }
00069 pthread_mutex_unlock(&c->current_job_lock);
00070
00071 c->rets[our_job%c->rets_count] = c->func(avctx, (char*)c->args + our_job*c->job_size);
00072
00073 pthread_mutex_lock(&c->current_job_lock);
00074 our_job = c->current_job++;
00075 }
00076 }
00077
00078 static av_always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count)
00079 {
00080 pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
00081 pthread_mutex_unlock(&c->current_job_lock);
00082 }
00083
00084 void avcodec_thread_free(AVCodecContext *avctx)
00085 {
00086 ThreadContext *c = avctx->thread_opaque;
00087 int i;
00088
00089 pthread_mutex_lock(&c->current_job_lock);
00090 c->done = 1;
00091 pthread_cond_broadcast(&c->current_job_cond);
00092 pthread_mutex_unlock(&c->current_job_lock);
00093
00094 for (i=0; i<avctx->thread_count; i++)
00095 pthread_join(c->workers[i], NULL);
00096
00097 pthread_mutex_destroy(&c->current_job_lock);
00098 pthread_cond_destroy(&c->current_job_cond);
00099 pthread_cond_destroy(&c->last_job_cond);
00100 av_free(c->workers);
00101 av_freep(&avctx->thread_opaque);
00102 }
00103
00104 int avcodec_thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
00105 {
00106 ThreadContext *c= avctx->thread_opaque;
00107 int dummy_ret;
00108
00109 if (job_count <= 0)
00110 return 0;
00111
00112 pthread_mutex_lock(&c->current_job_lock);
00113
00114 c->current_job = avctx->thread_count;
00115 c->job_count = job_count;
00116 c->job_size = job_size;
00117 c->args = arg;
00118 c->func = func;
00119 if (ret) {
00120 c->rets = ret;
00121 c->rets_count = job_count;
00122 } else {
00123 c->rets = &dummy_ret;
00124 c->rets_count = 1;
00125 }
00126 pthread_cond_broadcast(&c->current_job_cond);
00127
00128 avcodec_thread_park_workers(c, avctx->thread_count);
00129
00130 return 0;
00131 }
00132
00133 int avcodec_thread_init(AVCodecContext *avctx, int thread_count)
00134 {
00135 int i;
00136 ThreadContext *c;
00137
00138 c = av_mallocz(sizeof(ThreadContext));
00139 if (!c)
00140 return -1;
00141
00142 c->workers = av_mallocz(sizeof(pthread_t)*thread_count);
00143 if (!c->workers) {
00144 av_free(c);
00145 return -1;
00146 }
00147
00148 avctx->thread_opaque = c;
00149 avctx->thread_count = thread_count;
00150 c->current_job = 0;
00151 c->job_count = 0;
00152 c->job_size = 0;
00153 c->done = 0;
00154 pthread_cond_init(&c->current_job_cond, NULL);
00155 pthread_cond_init(&c->last_job_cond, NULL);
00156 pthread_mutex_init(&c->current_job_lock, NULL);
00157 pthread_mutex_lock(&c->current_job_lock);
00158 for (i=0; i<thread_count; i++) {
00159 if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
00160 avctx->thread_count = i;
00161 pthread_mutex_unlock(&c->current_job_lock);
00162 avcodec_thread_free(avctx);
00163 return -1;
00164 }
00165 }
00166
00167 avcodec_thread_park_workers(c, thread_count);
00168
00169 avctx->execute = avcodec_thread_execute;
00170 return 0;
00171 }