FFmpeg
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
threadmessage.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Nicolas George
3  *
4  * This file is part of FFmpeg.
5  *
6  * FFmpeg is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public License
8  * as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * FFmpeg is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with FFmpeg; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include "fifo.h"
22 #include "threadmessage.h"
23 #include "thread.h"
24 
26 #if HAVE_THREADS
27  AVFifoBuffer *fifo;
29  pthread_cond_t cond_recv;
30  pthread_cond_t cond_send;
31  int err_send;
32  int err_recv;
33  unsigned elsize;
34  void (*free_func)(void *msg);
35 #else
36  int dummy;
37 #endif
38 };
39 
41  unsigned nelem,
42  unsigned elsize)
43 {
44 #if HAVE_THREADS
46  int ret = 0;
47 
48  if (nelem > INT_MAX / elsize)
49  return AVERROR(EINVAL);
50  if (!(rmq = av_mallocz(sizeof(*rmq))))
51  return AVERROR(ENOMEM);
52  if ((ret = pthread_mutex_init(&rmq->lock, NULL))) {
53  av_free(rmq);
54  return AVERROR(ret);
55  }
56  if ((ret = pthread_cond_init(&rmq->cond_recv, NULL))) {
57  pthread_mutex_destroy(&rmq->lock);
58  av_free(rmq);
59  return AVERROR(ret);
60  }
61  if ((ret = pthread_cond_init(&rmq->cond_send, NULL))) {
62  pthread_cond_destroy(&rmq->cond_recv);
63  pthread_mutex_destroy(&rmq->lock);
64  av_free(rmq);
65  return AVERROR(ret);
66  }
67  if (!(rmq->fifo = av_fifo_alloc(elsize * nelem))) {
68  pthread_cond_destroy(&rmq->cond_send);
69  pthread_cond_destroy(&rmq->cond_recv);
70  pthread_mutex_destroy(&rmq->lock);
71  av_free(rmq);
72  return AVERROR(ENOMEM);
73  }
74  rmq->elsize = elsize;
75  *mq = rmq;
76  return 0;
77 #else
78  *mq = NULL;
79  return AVERROR(ENOSYS);
80 #endif /* HAVE_THREADS */
81 }
82 
84  void (*free_func)(void *msg))
85 {
86 #if HAVE_THREADS
87  mq->free_func = free_func;
88 #endif
89 }
90 
92 {
93 #if HAVE_THREADS
94  if (*mq) {
96  av_fifo_freep(&(*mq)->fifo);
97  pthread_cond_destroy(&(*mq)->cond_send);
98  pthread_cond_destroy(&(*mq)->cond_recv);
99  pthread_mutex_destroy(&(*mq)->lock);
100  av_freep(mq);
101  }
102 #endif
103 }
104 
105 #if HAVE_THREADS
106 
107 static int av_thread_message_queue_send_locked(AVThreadMessageQueue *mq,
108  void *msg,
109  unsigned flags)
110 {
111  while (!mq->err_send && av_fifo_space(mq->fifo) < mq->elsize) {
112  if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
113  return AVERROR(EAGAIN);
114  pthread_cond_wait(&mq->cond_send, &mq->lock);
115  }
116  if (mq->err_send)
117  return mq->err_send;
118  av_fifo_generic_write(mq->fifo, msg, mq->elsize, NULL);
119  /* one message is sent, signal one receiver */
120  pthread_cond_signal(&mq->cond_recv);
121  return 0;
122 }
123 
124 static int av_thread_message_queue_recv_locked(AVThreadMessageQueue *mq,
125  void *msg,
126  unsigned flags)
127 {
128  while (!mq->err_recv && av_fifo_size(mq->fifo) < mq->elsize) {
129  if ((flags & AV_THREAD_MESSAGE_NONBLOCK))
130  return AVERROR(EAGAIN);
131  pthread_cond_wait(&mq->cond_recv, &mq->lock);
132  }
133  if (av_fifo_size(mq->fifo) < mq->elsize)
134  return mq->err_recv;
135  av_fifo_generic_read(mq->fifo, msg, mq->elsize, NULL);
136  /* one message space appeared, signal one sender */
137  pthread_cond_signal(&mq->cond_send);
138  return 0;
139 }
140 
141 #endif /* HAVE_THREADS */
142 
144  void *msg,
145  unsigned flags)
146 {
147 #if HAVE_THREADS
148  int ret;
149 
150  pthread_mutex_lock(&mq->lock);
151  ret = av_thread_message_queue_send_locked(mq, msg, flags);
152  pthread_mutex_unlock(&mq->lock);
153  return ret;
154 #else
155  return AVERROR(ENOSYS);
156 #endif /* HAVE_THREADS */
157 }
158 
160  void *msg,
161  unsigned flags)
162 {
163 #if HAVE_THREADS
164  int ret;
165 
166  pthread_mutex_lock(&mq->lock);
167  ret = av_thread_message_queue_recv_locked(mq, msg, flags);
168  pthread_mutex_unlock(&mq->lock);
169  return ret;
170 #else
171  return AVERROR(ENOSYS);
172 #endif /* HAVE_THREADS */
173 }
174 
176  int err)
177 {
178 #if HAVE_THREADS
179  pthread_mutex_lock(&mq->lock);
180  mq->err_send = err;
181  pthread_cond_broadcast(&mq->cond_send);
182  pthread_mutex_unlock(&mq->lock);
183 #endif /* HAVE_THREADS */
184 }
185 
187  int err)
188 {
189 #if HAVE_THREADS
190  pthread_mutex_lock(&mq->lock);
191  mq->err_recv = err;
192  pthread_cond_broadcast(&mq->cond_recv);
193  pthread_mutex_unlock(&mq->lock);
194 #endif /* HAVE_THREADS */
195 }
196 
197 #if HAVE_THREADS
198 static void free_func_wrap(void *arg, void *msg, int size)
199 {
201  mq->free_func(msg);
202 }
203 #endif
204 
206 {
207 #if HAVE_THREADS
208  int used, off;
209  void *free_func = mq->free_func;
210 
211  pthread_mutex_lock(&mq->lock);
212  used = av_fifo_size(mq->fifo);
213  if (free_func)
214  for (off = 0; off < used; off += mq->elsize)
215  av_fifo_generic_peek_at(mq->fifo, mq, off, mq->elsize, free_func_wrap);
216  av_fifo_drain(mq->fifo, used);
217  /* only the senders need to be notified since the queue is empty and there
218  * is nothing to read */
219  pthread_cond_broadcast(&mq->cond_send);
220  pthread_mutex_unlock(&mq->lock);
221 #endif /* HAVE_THREADS */
222 }
#define NULL
Definition: coverity.c:32
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:106
#define pthread_mutex_lock(a)
Definition: ffprobe.c:61
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:164
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
void av_thread_message_queue_set_free_func(AVThreadMessageQueue *mq, void(*free_func)(void *msg))
Set the optional free message callback function which will be called if an operation is removing mess...
Definition: threadmessage.c:83
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:222
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:138
int av_fifo_generic_write(AVFifoBuffer *f, void *src, int size, int(*func)(void *, void *, int))
Feed data from a user-supplied callback to an AVFifoBuffer.
Definition: fifo.c:122
HMTX pthread_mutex_t
Definition: os2threads.h:49
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
int av_fifo_space(const AVFifoBuffer *f)
Return the amount of space in bytes in the AVFifoBuffer, that is the amount of data you can write int...
Definition: fifo.c:82
static int flags
Definition: log.c:57
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:146
ptrdiff_t size
Definition: opengl_enc.c:101
void av_thread_message_flush(AVThreadMessageQueue *mq)
Flush the message queue.
#define AVERROR(e)
Definition: error.h:43
int av_fifo_generic_read(AVFifoBuffer *f, void *dest, int buf_size, void(*func)(void *, void *, int))
Feed data from an AVFifoBuffer to a user-supplied callback.
Definition: fifo.c:213
const char * arg
Definition: jacosubdec.c:66
typedef void(APIENTRY *FF_PFNGLACTIVETEXTUREPROC)(GLenum texture)
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:98
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:65
void av_thread_message_queue_set_err_send(AVThreadMessageQueue *mq, int err)
Set the sending error code.
int av_fifo_size(const AVFifoBuffer *f)
Return the amount of data in bytes in the AVFifoBuffer, that is the amount of data you can read from ...
Definition: fifo.c:77
int av_fifo_generic_peek_at(AVFifoBuffer *f, void *dest, int offset, int buf_size, void(*func)(void *, void *, int))
Feed data at specific position from an AVFifoBuffer to a user-supplied callback.
Definition: fifo.c:151
a very simple circular buffer FIFO implementation
Perform non-blocking operation.
Definition: threadmessage.h:31
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
Definition: threadmessage.c:40
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
Definition: threadmessage.c:91
static pthread_mutex_t lock
Definition: ffjni.c:37
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:127
#define av_free(p)
AVFifoBuffer * av_fifo_alloc(unsigned int size)
Initialize an AVFifoBuffer.
Definition: fifo.c:43
static av_always_inline int pthread_cond_broadcast(pthread_cond_t *cond)
Definition: os2threads.h:156
#define av_freep(p)
void av_fifo_freep(AVFifoBuffer **f)
Free an AVFifoBuffer and reset pointer to NULL.
Definition: fifo.c:63
void av_fifo_drain(AVFifoBuffer *f, int size)
Discard data from the FIFO.
Definition: fifo.c:233