Go to the documentation of this file.
22 #include <stdatomic.h>
45 #define SCHEDULE_TOLERANCE (100 * 1000)
374 if (queue_size <= 0) {
443 for (
unsigned i = 0;
i < sch->
nb_mux;
i++) {
446 for (
unsigned j = 0; j < mux->
nb_streams; j++) {
473 for (
unsigned j = 0; j <
d->nb_streams; j++) {
486 for (
unsigned i = 0;
i < sch->
nb_mux;
i++) {
489 for (
unsigned j = 0; j < mux->
nb_streams; j++) {
509 for (
unsigned i = 0;
i < sch->
nb_dec;
i++) {
523 for (
unsigned i = 0;
i < sch->
nb_enc;
i++) {
617 .parent_log_context_offset = offsetof(
SchMux, task.func_arg),
621 void *
arg,
int sdp_auto,
unsigned thread_queue_size)
623 const unsigned idx = sch->
nb_mux;
632 mux = &sch->
mux[idx];
652 mux = &sch->
mux[mux_idx];
659 ms = &mux->
streams[stream_idx];
673 .parent_log_context_offset = offsetof(
SchDemux, task.func_arg),
709 d = &sch->
demux[demux_idx];
712 return ret < 0 ?
ret :
d->nb_streams - 1;
718 .parent_log_context_offset = offsetof(
SchDec, task.func_arg),
724 const unsigned idx = sch->
nb_dec;
733 dec = &sch->
dec[idx];
758 .parent_log_context_offset = offsetof(
SchEnc, task.func_arg),
764 const unsigned idx = sch->
nb_enc;
773 enc = &sch->
enc[idx];
796 .parent_log_context_offset = offsetof(
SchFilterGraph, task.func_arg),
867 int limiting, uint64_t max_frames)
874 sq = &sch->
sq_enc[sq_idx];
877 enc = &sch->
enc[enc_idx];
891 if (max_frames != INT64_MAX)
1007 enc = &sch->
enc[
src.idx];
1033 dec = &sch->
dec[dst.
idx];
1107 for (
unsigned i = 0;
i < sch->
nb_mux;
i++) {
1122 size_t data_threshold,
int max_packets)
1128 mux = &sch->
mux[mux_idx];
1131 ms = &mux->
streams[stream_idx];
1143 mux = &sch->
mux[mux_idx];
1170 mux = &sch->
mux[mux_idx];
1173 ms = &mux->
streams[stream_idx];
1219 int have_unchoked = 0;
1239 for (
unsigned i = 0;
i < sch->
nb_mux;
i++) {
1242 for (
unsigned j = 0; j < mux->
nb_streams; j++) {
1261 for (
unsigned type = 0; !have_unchoked &&
type < 2;
type++)
1276 if (
w->choked_prev !=
w->choked_next)
1292 unsigned nb_filters_stack = 0;
1294 memset(filters_visited, 0, sch->
nb_filters *
sizeof(*filters_visited));
1317 av_assert0(nb_filters_stack < sch->nb_filters);
1318 filters_stack[nb_filters_stack++] =
src;
1326 if (nb_filters_stack) {
1327 src = filters_stack[--nb_filters_stack];
1336 uint8_t *filters_visited =
NULL;
1345 if (!filters_visited)
1349 if (!filters_stack) {
1356 for (
unsigned i = 0;
i < sch->
nb_mux;
i++) {
1359 for (
unsigned j = 0; j < mux->
nb_streams; j++) {
1388 for (
unsigned j = 0; j <
d->nb_streams; j++) {
1393 "Demuxer stream %u not connected to any sink\n", j);
1403 for (
unsigned i = 0;
i < sch->
nb_dec;
i++) {
1408 "Decoder not connected to a source\n");
1413 "Decoder not connected to any sink\n");
1422 for (
unsigned i = 0;
i < sch->
nb_enc;
i++) {
1427 "Encoder not connected to a source\n");
1432 "Encoder not connected to any sink\n");
1441 for (
unsigned i = 0;
i < sch->
nb_mux;
i++) {
1444 for (
unsigned j = 0; j < mux->
nb_streams; j++) {
1464 "Muxer stream #%u not connected to a source\n", j);
1478 for (
unsigned j = 0; j < fg->
nb_inputs; j++) {
1484 "Filtergraph input %u not connected to a source\n", j);
1497 for (
unsigned j = 0; j < fg->
nb_outputs; j++) {
1502 "Filtergraph %u output %u not connected to a sink\n",
i, j);
1527 for (
unsigned i = 0;
i < sch->
nb_mux;
i++) {
1537 for (
unsigned i = 0;
i < sch->
nb_enc;
i++) {
1553 for (
unsigned i = 0;
i < sch->
nb_dec;
i++) {
1592 struct timespec tv = { .tv_sec = timeout_us / 1000000,
1593 .tv_nsec = (timeout_us % 1000000) * 1000 };
1665 for (
unsigned i = 0;
i < enc->
nb_dst;
i++) {
1745 return (enc->
sq_idx[0] >= 0) ?
1760 size_t max_packets = thresh_reached ? q->
max_packets : SIZE_MAX;
1761 size_t new_size =
FFMIN(2 * packets, max_packets);
1763 if (new_size <= packets) {
1765 "Too many packets buffered for output stream.\n");
1804 queued =
ret < 0 ?
ret : 1;
1812 goto update_schedule;
1883 unsigned nb_done = 0;
1885 for (
unsigned i = 0;
i < ds->
nb_dst;
i++) {
1892 if (
pkt && i < ds->nb_dst - 1) {
1893 to_send =
d->send_pkt;
1918 for (
unsigned i = 0;
i <
d->nb_streams;
i++) {
1921 for (
unsigned j = 0; j < ds->
nb_dst; j++) {
1929 dec = &sch->
dec[dst->
idx];
1963 d = &sch->
demux[demux_idx];
1983 for (
unsigned i = 0;
i <
d->nb_streams;
i++) {
2003 int ret, stream_idx;
2006 mux = &sch->
mux[mux_idx];
2018 mux = &sch->
mux[mux_idx];
2038 mux = &sch->
mux[mux_idx];
2041 ms = &mux->
streams[stream_idx];
2090 dec = &sch->
dec[dec_idx];
2164 unsigned nb_done = 0;
2167 dec = &sch->
dec[dec_idx];
2169 for (
unsigned i = 0;
i < dec->
nb_dst;
i++) {
2174 if (i < dec->nb_dst - 1) {
2212 for (
unsigned i = 0;
i < dec->
nb_dst;
i++) {
2227 enc = &sch->
enc[enc_idx];
2271 enc = &sch->
enc[enc_idx];
2273 for (
unsigned i = 0;
i < enc->
nb_dst;
i++) {
2278 if (i < enc->nb_dst - 1) {
2307 for (
unsigned i = 0;
i < enc->
nb_dst;
i++) {
2350 else if (
ret >= 0) {
2369 fi = &fg->
inputs[in_idx];
2430 switch (node.
type) {
2462 "Terminating thread with return code %d (%s)\n",
ret,
2465 return (
void*)(intptr_t)
ret;
2481 return (intptr_t)thread_ret;
2506 for (
unsigned i = 0;
i < sch->
nb_dec;
i++) {
2520 for (
unsigned i = 0;
i < sch->
nb_enc;
i++) {
2527 for (
unsigned i = 0;
i < sch->
nb_mux;
i++) {
int(* func)(AVBPrint *dst, const char *in, const char *arg)
void av_packet_unref(AVPacket *pkt)
Wipe the packet.
static int mux_task_start(SchMux *mux)
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
static int waiter_init(SchWaiter *w)
size_t av_fifo_can_write(const AVFifo *f)
#define atomic_store(object, desired)
int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx, AVFrame *frame)
Called by filtergraph tasks to send a filtered frame or EOF to consumers.
static int err_merge(int err0, int err1)
Merge two return codes - return one of the error codes if at least one of them was negative,...
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
int av_compare_ts(int64_t ts_a, AVRational tb_a, int64_t ts_b, AVRational tb_b)
Compare two timestamps each in its own time base.
void sch_mux_receive_finish(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Called by muxer tasks to signal that a stream will no longer accept input.
static int demux_flush(Scheduler *sch, SchDemux *d, AVPacket *pkt)
#define AVERROR_EOF
End of file.
AVPacket * sub_heartbeat_pkt
void sq_limit_frames(SyncQueue *sq, unsigned int stream_idx, uint64_t frames)
Limit the number of output frames for stream with index stream_idx to max_frames.
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
#define SCHEDULE_TOLERANCE
int sch_add_demux(Scheduler *sch, SchThreadFunc func, void *ctx)
Add a demuxer to the scheduler.
#define AV_TIME_BASE_Q
Internal time base represented as fractional value.
static int mux_done(Scheduler *sch, unsigned mux_idx)
void av_frame_free(AVFrame **frame)
Free the frame and any dynamically allocated objects in it, e.g.
static int check_acyclic_for_output(const Scheduler *sch, SchedulerNode src, uint8_t *filters_visited, SchedulerNode *filters_stack)
This structure describes decoded (raw) audio or video data.
static int task_cleanup(Scheduler *sch, SchedulerNode node)
static void frame_move(void *dst, void *src)
unsigned nb_streams_ready
static int send_to_mux(Scheduler *sch, SchMux *mux, unsigned stream_idx, AVPacket *pkt)
void objpool_free(ObjPool **pop)
static int enc_done(Scheduler *sch, unsigned enc_idx)
#define AV_LOG_VERBOSE
Detailed information.
int64_t duration
Duration of this packet in AVStream->time_base units, 0 if unknown.
ObjPool * objpool_alloc_packets(void)
AVBufferRef * buf[AV_NUM_DATA_POINTERS]
AVBuffer references backing the data for this frame.
AVFifo * fifo
Queue for buffering the packets before the muxer task can be started.
void av_packet_free(AVPacket **pkt)
Free the packet, if the packet is reference counted, it will be unreferenced first.
#define DEFAULT_FRAME_THREAD_QUEUE_SIZE
Default size of a frame thread queue.
atomic_int_least64_t last_dts
int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
Signal to the scheduler that the specified muxed stream is initialized and ready.
static int send_to_enc_thread(Scheduler *sch, SchEnc *enc, AVFrame *frame)
static int task_stop(Scheduler *sch, SchTask *task)
enum SchedulerNodeType type
int sch_stop(Scheduler *sch, int64_t *finish_ts)
int av_fifo_write(AVFifo *f, const void *buf, size_t nb_elems)
Write data into a FIFO.
int sch_dec_send(Scheduler *sch, unsigned dec_idx, AVFrame *frame)
Called by decoder tasks to send a decoded frame downstream.
int(* SchThreadFunc)(void *arg)
int av_fifo_grow2(AVFifo *f, size_t inc)
Enlarge an AVFifo.
int sch_add_mux_stream(Scheduler *sch, unsigned mux_idx)
Add a muxed stream for a previously added muxer.
atomic_uint nb_inputs_finished_send
int sq_receive(SyncQueue *sq, int stream_idx, SyncQueueFrame frame)
Read a frame from the queue.
it s the only field you need to keep assuming you have a context There is some magic you don t need to care about around this just let it vf type
int av_thread_message_queue_recv(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Receive a message from the queue.
int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs, SchThreadFunc func, void *ctx)
Add a filtergraph to the scheduler.
AVFrame * av_frame_alloc(void)
Allocate an AVFrame and set its fields to default values.
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
void sch_free(Scheduler **psch)
enum SchedulerState state
int av_thread_message_queue_send(AVThreadMessageQueue *mq, void *msg, unsigned flags)
Send a message on the queue.
pthread_cond_t mux_done_cond
int av_fifo_read(AVFifo *f, void *buf, size_t nb_elems)
Read data from a FIFO.
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int(*init)(void *), void *arg, int sdp_auto, unsigned thread_queue_size)
Add a muxer to the scheduler.
static void waiter_set(SchWaiter *w, int choked)
static int enc_open(Scheduler *sch, SchEnc *enc, const AVFrame *frame)
Scheduler * sch_alloc(void)
static int dec_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVFrame *frame)
static void task_init(Scheduler *sch, SchTask *task, enum SchedulerNodeType type, unsigned idx, SchThreadFunc func, void *func_arg)
unsigned nb_sub_heartbeat_dst
static int op(uint8_t **dst, const uint8_t *dst_end, GetByteContext *gb, int pixel, int count, int *x, int width, int linesize)
Perform decode operation.
int sch_add_demux_stream(Scheduler *sch, unsigned demux_idx)
Add a demuxed stream for a previously added demuxer.
#define av_assert0(cond)
assert() equivalent, that is always enabled.
int64_t av_rescale_q(int64_t a, AVRational bq, AVRational cq)
Rescale a 64-bit integer by 2 rational numbers.
static int filter_done(Scheduler *sch, unsigned fg_idx)
int sch_enc_receive(Scheduler *sch, unsigned enc_idx, AVFrame *frame)
Called by encoder tasks to obtain frames for encoding.
#define atomic_load(object)
ObjPool * objpool_alloc_frames(void)
int sch_sdp_filename(Scheduler *sch, const char *sdp_filename)
Set the file path for the SDP.
static int demux_send_for_stream(Scheduler *sch, SchDemux *d, SchDemuxStream *ds, AVPacket *pkt, unsigned flags)
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
PreMuxQueue pre_mux_queue
int sq_add_stream(SyncQueue *sq, int limiting)
Add a new stream to the sync queue.
atomic_int mux_started
Set to 1 after starting the muxer task and flushing the pre-muxing queues.
static void pkt_move(void *dst, void *src)
AVBufferRef * buf
A reference to the reference-counted buffer where the packet data is stored.
void tq_free(ThreadQueue **ptq)
#define LIBAVUTIL_VERSION_INT
static void waiter_uninit(SchWaiter *w)
Describe the class of an AVClass context structure.
static int send_to_enc_sq(Scheduler *sch, SchEnc *enc, AVFrame *frame)
int(* open_cb)(void *opaque, const AVFrame *frame)
int av_frame_copy_props(AVFrame *dst, const AVFrame *src)
Copy only "metadata" fields from src to dst.
static void schedule_update_locked(Scheduler *sch)
void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the receiving side.
int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx, int(*open_cb)(void *opaque, const AVFrame *frame))
static int dec_done(Scheduler *sch, unsigned dec_idx)
size_t av_fifo_can_read(const AVFifo *f)
int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts)
Add a decoder to the scheduler.
@ DEMUX_SEND_STREAMCOPY_EOF
Treat the packet as an EOF for SCH_NODE_TYPE_MUX destinations send normally to other types.
static const AVClass sch_fg_class
int av_packet_ref(AVPacket *dst, const AVPacket *src)
Setup a new reference to the data described by a given packet.
void av_packet_move_ref(AVPacket *dst, AVPacket *src)
Move every field in src to dst and reset src.
int sch_enc_send(Scheduler *sch, unsigned enc_idx, AVPacket *pkt)
Called by encoder tasks to send encoded packets downstream.
#define pthread_mutex_unlock(a)
unsigned * sub_heartbeat_dst
void sq_frame_samples(SyncQueue *sq, unsigned int stream_idx, int frame_samples)
Set a constant output audio frame size, in samples.
unsigned nb_inputs_finished_receive
int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data)
Send an item for the given stream to the queue.
int(* init)(AVBSFContext *ctx)
int av_frame_ref(AVFrame *dst, const AVFrame *src)
Set up a new reference to the data described by the source frame.
int max_packets
Maximum number of packets in fifo.
#define av_err2str(errnum)
Convenience macro, the return value should be used only directly in function arguments but never stan...
int sq_send(SyncQueue *sq, unsigned int stream_idx, SyncQueueFrame frame)
Submit a frame for the stream with index stream_idx.
void sq_free(SyncQueue **psq)
#define AV_NOPTS_VALUE
Undefined timestamp value.
static const AVClass sch_dec_class
pthread_mutex_t schedule_lock
@ SCH_NODE_TYPE_FILTER_OUT
int64_t dts
Decompression timestamp in AVStream->time_base units; the time at which the packet is decompressed.
static int enc_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt)
static const AVClass sch_mux_class
int sch_filter_receive(Scheduler *sch, unsigned fg_idx, unsigned *in_idx, AVFrame *frame)
Called by filtergraph tasks to obtain frames for filtering.
AVPacket * av_packet_alloc(void)
Allocate an AVPacket and set its fields to default values.
ThreadQueue * tq_alloc(unsigned int nb_streams, size_t queue_size, ObjPool *obj_pool, void(*obj_move)(void *dst, void *src))
Allocate a queue for sending data between threads.
static const AVClass scheduler_class
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
int av_thread_message_queue_alloc(AVThreadMessageQueue **mq, unsigned nelem, unsigned elsize)
Allocate a new message queue.
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
int av_packet_copy_props(AVPacket *dst, const AVPacket *src)
Copy only "properties" fields from src to dst.
@ SCH_NODE_TYPE_FILTER_IN
static int task_start(SchTask *task)
#define i(width, name, range_min, range_max)
int64_t pts
Presentation timestamp in AVStream->time_base units; the time at which the decompressed packet will b...
static int demux_done(Scheduler *sch, unsigned demux_idx)
#define av_malloc_array(a, b)
pthread_mutex_t mux_ready_lock
#define DEFAULT_PACKET_THREAD_QUEUE_SIZE
Default size of a packet thread queue.
void av_frame_unref(AVFrame *frame)
Unreference all the buffers referenced by frame and reset the frame fields.
static int64_t trailing_dts(const Scheduler *sch, int count_finished)
pthread_mutex_t mux_done_lock
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
static const AVClass sch_enc_class
static int mux_init(Scheduler *sch, SchMux *mux)
static int send_to_filter(Scheduler *sch, SchFilterGraph *fg, unsigned in_idx, AVFrame *frame)
int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
Read the next item from the queue.
void * av_calloc(size_t nmemb, size_t size)
int sch_connect(Scheduler *sch, SchedulerNode src, SchedulerNode dst)
static int send_to_enc(Scheduler *sch, SchEnc *enc, AVFrame *frame)
int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
intptr_t atomic_int_least64_t
int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt)
Called by decoder tasks to receive a packet for decoding.
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
static int mux_queue_packet(SchMux *mux, SchMuxStream *ms, AVPacket *pkt)
static const AVClass sch_demux_class
AVFifo * av_fifo_alloc2(size_t nb_elems, size_t elem_size, unsigned int flags)
Allocate and initialize an AVFifo with a given element size.
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
static void * task_wrapper(void *arg)
A sync queue provides timestamp synchronization between multiple streams.
int sch_demux_send(Scheduler *sch, unsigned demux_idx, AVPacket *pkt, unsigned flags)
Called by demuxer tasks to communicate with their downstreams.
#define atomic_fetch_add(object, operand)
AVThreadMessageQueue * queue_end_ts
static int demux_stream_send_to_dst(Scheduler *sch, const SchedulerNode dst, uint8_t *dst_finished, AVPacket *pkt, unsigned flags)
#define GROW_ARRAY(array, nb_elems)
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
int64_t av_gettime(void)
Get the current time in microseconds.
static int waiter_wait(Scheduler *sch, SchWaiter *w)
Wait until this task is allowed to proceed.
char * av_strdup(const char *s)
Duplicate a string.
int sch_sq_add_enc(Scheduler *sch, unsigned sq_idx, unsigned enc_idx, int limiting, uint64_t max_frames)
int print_sdp(const char *filename)
static int start_prepare(Scheduler *sch)
int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, unsigned dec_idx)
void sch_filter_receive_finish(Scheduler *sch, unsigned fg_idx, unsigned in_idx)
Called by filter tasks to signal that a filter input will no longer accept input.
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
Wait until transcoding terminates or the specified timeout elapses.
This structure stores compressed data.
void av_thread_message_queue_free(AVThreadMessageQueue **mq)
Free a message queue.
static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_size, enum QueueType type)
int sch_start(Scheduler *sch)
#define flags(name, subs,...)
void av_thread_message_queue_set_err_recv(AVThreadMessageQueue *mq, int err)
Set the receiving error code.
static av_always_inline int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime)
int sch_mux_sub_heartbeat(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, const AVPacket *pkt)
void av_fifo_freep2(AVFifo **f)
Free an AVFifo and reset pointer to NULL.
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
SyncQueue * sq_alloc(enum SyncQueueType type, int64_t buf_size_us, void *logctx)
Allocate a sync queue of the given type.
#define atomic_init(obj, value)
void sch_mux_stream_buffering(Scheduler *sch, unsigned mux_idx, unsigned stream_idx, size_t data_threshold, int max_packets)
Configure limits on packet buffering performed before the muxer task is started.
static int check_acyclic(Scheduler *sch)
AVRational time_base
Time base of the packet's timestamps.
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
int sch_mux_receive(Scheduler *sch, unsigned mux_idx, AVPacket *pkt)
Called by muxer tasks to obtain packets for muxing.
int sch_add_sq_enc(Scheduler *sch, uint64_t buf_size_us, void *logctx)
Add an pre-encoding sync queue to the scheduler.
#define pthread_mutex_lock(a)
void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
Mark the given stream finished from the sending side.