FFmpeg
libzmq.c
Go to the documentation of this file.
1 /*
2  * ZeroMQ Protocol
3  * Copyright (c) 2019 Andriy Gelman
4  *
5  * This file is part of FFmpeg.
6  *
7  * FFmpeg is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * FFmpeg is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with FFmpeg; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
22 #include <zmq.h>
23 #include "url.h"
24 #include "network.h"
25 #include "libavutil/avstring.h"
26 #include "libavutil/opt.h"
27 #include "libavutil/time.h"
28 
29 #define ZMQ_STRERROR zmq_strerror(zmq_errno())
30 
31 typedef struct ZMQContext {
32  const AVClass *class;
33  void *context;
34  void *socket;
35  int pkt_size;
36  int pkt_size_overflow; /*keep track of the largest packet during overflow*/
37 } ZMQContext;
38 
39 #define OFFSET(x) offsetof(ZMQContext, x)
40 #define D AV_OPT_FLAG_DECODING_PARAM
41 #define E AV_OPT_FLAG_ENCODING_PARAM
42 static const AVOption options[] = {
43  { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size), AV_OPT_TYPE_INT, { .i64 = 131072 }, -1, INT_MAX, .flags = D | E },
44  { NULL }
45 };
46 
47 static int zmq_proto_wait(URLContext *h, void *socket, int write)
48 {
49  int ret;
50  int ev = write ? ZMQ_POLLOUT : ZMQ_POLLIN;
51  zmq_pollitem_t items = { .socket = socket, .fd = 0, .events = ev, .revents = 0 };
52  ret = zmq_poll(&items, 1, POLLING_TIME);
53  if (ret == -1) {
54  av_log(h, AV_LOG_ERROR, "Error occured during zmq_poll(): %s\n", ZMQ_STRERROR);
55  return AVERROR_EXTERNAL;
56  }
57  return items.revents & ev ? 0 : AVERROR(EAGAIN);
58 }
59 
60 static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
61 {
62  int ret;
63  int64_t wait_start = 0;
64 
65  while (1) {
67  return AVERROR_EXIT;
68  ret = zmq_proto_wait(h, socket, write);
69  if (ret != AVERROR(EAGAIN))
70  return ret;
71  if (timeout > 0) {
72  if (!wait_start)
73  wait_start = av_gettime_relative();
74  else if (av_gettime_relative() - wait_start > timeout)
75  return AVERROR(ETIMEDOUT);
76  }
77  }
78 }
79 
80 static int zmq_proto_open(URLContext *h, const char *uri, int flags)
81 {
82  int ret;
83  ZMQContext *s = h->priv_data;
84  s->pkt_size_overflow = 0;
85  h->is_streamed = 1;
86 
87  if (s->pkt_size > 0)
88  h->max_packet_size = s->pkt_size;
89 
90  s->context = zmq_ctx_new();
91  if (!s->context) {
92  /*errno not set on failure during zmq_ctx_new()*/
93  av_log(h, AV_LOG_ERROR, "Error occured during zmq_ctx_new()\n");
94  return AVERROR_EXTERNAL;
95  }
96 
97  av_strstart(uri, "zmq:", &uri);
98 
99  /*publish during write*/
100  if (h->flags & AVIO_FLAG_WRITE) {
101  s->socket = zmq_socket(s->context, ZMQ_PUB);
102  if (!s->socket) {
103  av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
104  goto fail_term;
105  }
106 
107  ret = zmq_bind(s->socket, uri);
108  if (ret == -1) {
109  av_log(h, AV_LOG_ERROR, "Error occured during zmq_bind(): %s\n", ZMQ_STRERROR);
110  goto fail_close;
111  }
112  }
113 
114  /*subscribe for read*/
115  if (h->flags & AVIO_FLAG_READ) {
116  s->socket = zmq_socket(s->context, ZMQ_SUB);
117  if (!s->socket) {
118  av_log(h, AV_LOG_ERROR, "Error occured during zmq_socket(): %s\n", ZMQ_STRERROR);
119  goto fail_term;
120  }
121 
122  ret = zmq_setsockopt(s->socket, ZMQ_SUBSCRIBE, "", 0);
123  if (ret == -1) {
124  av_log(h, AV_LOG_ERROR, "Error occured during zmq_setsockopt(): %s\n", ZMQ_STRERROR);
125  goto fail_close;
126  }
127 
128  ret = zmq_connect(s->socket, uri);
129  if (ret == -1) {
130  av_log(h, AV_LOG_ERROR, "Error occured during zmq_connect(): %s\n", ZMQ_STRERROR);
131  goto fail_close;
132  }
133  }
134  return 0;
135 
136 fail_close:
137  zmq_close(s->socket);
138 fail_term:
139  zmq_ctx_term(s->context);
140  return AVERROR_EXTERNAL;
141 }
142 
143 static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
144 {
145  int ret;
146  ZMQContext *s = h->priv_data;
147 
148  ret = zmq_proto_wait_timeout(h, s->socket, 1, h->rw_timeout, &h->interrupt_callback);
149  if (ret)
150  return ret;
151  ret = zmq_send(s->socket, buf, size, 0);
152  if (ret == -1) {
153  av_log(h, AV_LOG_ERROR, "Error occured during zmq_send(): %s\n", ZMQ_STRERROR);
154  return AVERROR_EXTERNAL;
155  }
156  return ret; /*number of bytes sent*/
157 }
158 
159 static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
160 {
161  int ret;
162  ZMQContext *s = h->priv_data;
163 
164  ret = zmq_proto_wait_timeout(h, s->socket, 0, h->rw_timeout, &h->interrupt_callback);
165  if (ret)
166  return ret;
167  ret = zmq_recv(s->socket, buf, size, 0);
168  if (ret == -1) {
169  av_log(h, AV_LOG_ERROR, "Error occured during zmq_recv(): %s\n", ZMQ_STRERROR);
170  return AVERROR_EXTERNAL;
171  }
172  if (ret > size) {
173  s->pkt_size_overflow = FFMAX(s->pkt_size_overflow, ret);
174  av_log(h, AV_LOG_WARNING, "Message exceeds available space in the buffer. Message will be truncated. Setting -pkt_size %d may resolve the issue.\n", s->pkt_size_overflow);
175  ret = size;
176  }
177  return ret; /*number of bytes read*/
178 }
179 
181 {
182  ZMQContext *s = h->priv_data;
183  zmq_close(s->socket);
184  zmq_ctx_term(s->context);
185  return 0;
186 }
187 
188 static const AVClass zmq_context_class = {
189  .class_name = "zmq",
190  .item_name = av_default_item_name,
191  .option = options,
192  .version = LIBAVUTIL_VERSION_INT,
193 };
194 
196  .name = "zmq",
197  .url_close = zmq_proto_close,
198  .url_open = zmq_proto_open,
199  .url_read = zmq_proto_read,
200  .url_write = zmq_proto_write,
201  .priv_data_size = sizeof(ZMQContext),
202  .priv_data_class = &zmq_context_class,
204 };
av_gettime_relative
int64_t av_gettime_relative(void)
Get the current time in microseconds since some unspecified starting point.
Definition: time.c:56
AV_LOG_WARNING
#define AV_LOG_WARNING
Something somehow does not look correct.
Definition: log.h:182
AVERROR
Filter the word “frame” indicates either a video frame or a group of audio as stored in an AVFrame structure Format for each input and each output the list of supported formats For video that means pixel format For audio that means channel sample they are references to shared objects When the negotiation mechanism computes the intersection of the formats supported at each end of a all references to both lists are replaced with a reference to the intersection And when a single format is eventually chosen for a link amongst the remaining all references to the list are updated That means that if a filter requires that its input and output have the same format amongst a supported all it has to do is use a reference to the same list of formats query_formats can leave some formats unset and return AVERROR(EAGAIN) to cause the negotiation mechanism toagain later. That can be used by filters with complex requirements to use the format negotiated on one link to set the formats supported on another. Frame references ownership and permissions
opt.h
URL_PROTOCOL_FLAG_NETWORK
#define URL_PROTOCOL_FLAG_NETWORK
Definition: url.h:34
AVOption
AVOption.
Definition: opt.h:246
E
#define E
Definition: libzmq.c:41
ZMQContext::pkt_size
int pkt_size
Definition: libzmq.c:35
ZMQContext
Definition: f_zmq.c:35
URLProtocol
Definition: url.h:54
AVIOInterruptCB
Callback for checking whether to abort blocking functions.
Definition: avio.h:58
ZMQContext::context
void * context
Definition: libzmq.c:33
ff_check_interrupt
int ff_check_interrupt(AVIOInterruptCB *cb)
Check if the user has requested to interrupt a blocking function associated with cb.
Definition: avio.c:666
ZMQ_STRERROR
#define ZMQ_STRERROR
Definition: libzmq.c:29
AV_LOG_ERROR
#define AV_LOG_ERROR
Something went wrong and cannot losslessly be recovered.
Definition: log.h:176
zmq_proto_close
static int zmq_proto_close(URLContext *h)
Definition: libzmq.c:180
s
#define s(width, name)
Definition: cbs_vp9.c:257
AVIO_FLAG_WRITE
#define AVIO_FLAG_WRITE
write-only
Definition: avio.h:675
LIBAVUTIL_VERSION_INT
#define LIBAVUTIL_VERSION_INT
Definition: version.h:85
ff_libzmq_protocol
const URLProtocol ff_libzmq_protocol
Definition: libzmq.c:195
AVClass
Describe the class of an AVClass context structure.
Definition: log.h:67
NULL
#define NULL
Definition: coverity.c:32
zmq_proto_write
static int zmq_proto_write(URLContext *h, const unsigned char *buf, int size)
Definition: libzmq.c:143
av_default_item_name
const char * av_default_item_name(void *ptr)
Return the context name.
Definition: log.c:235
ZMQContext::pkt_size_overflow
int pkt_size_overflow
Definition: libzmq.c:36
time.h
zmq_proto_wait_timeout
static int zmq_proto_wait_timeout(URLContext *h, void *socket, int write, int64_t timeout, AVIOInterruptCB *int_cb)
Definition: libzmq.c:60
FFMAX
#define FFMAX(a, b)
Definition: common.h:94
size
int size
Definition: twinvq_data.h:11134
URLProtocol::name
const char * name
Definition: url.h:55
AVERROR_EXTERNAL
#define AVERROR_EXTERNAL
Generic error in an external library.
Definition: error.h:57
av_strstart
int av_strstart(const char *str, const char *pfx, const char **ptr)
Return non-zero if pfx is a prefix of str.
Definition: avstring.c:34
URLContext
Definition: url.h:38
url.h
int_cb
const AVIOInterruptCB int_cb
Definition: ffmpeg.c:489
zmq_proto_open
static int zmq_proto_open(URLContext *h, const char *uri, int flags)
Definition: libzmq.c:80
ret
ret
Definition: filter_design.txt:187
AVClass::class_name
const char * class_name
The name of the class; usually it is the same name as the context structure type to which the AVClass...
Definition: log.h:72
network.h
options
static const AVOption options[]
Definition: libzmq.c:42
zmq_context_class
static const AVClass zmq_context_class
Definition: libzmq.c:188
AV_OPT_TYPE_INT
@ AV_OPT_TYPE_INT
Definition: opt.h:223
D
#define D
Definition: libzmq.c:40
ZMQContext::socket
void * socket
Definition: libzmq.c:34
AVIO_FLAG_READ
#define AVIO_FLAG_READ
read-only
Definition: avio.h:674
POLLING_TIME
#define POLLING_TIME
Definition: network.h:249
flags
#define flags(name, subs,...)
Definition: cbs_av1.c:565
av_log
#define av_log(a,...)
Definition: tableprint_vlc.h:28
zmq_proto_wait
static int zmq_proto_wait(URLContext *h, void *socket, int write)
Definition: libzmq.c:47
h
h
Definition: vp9dsp_template.c:2038
AVERROR_EXIT
#define AVERROR_EXIT
Immediate exit was requested; the called function should not be restarted.
Definition: error.h:56
avstring.h
zmq_proto_read
static int zmq_proto_read(URLContext *h, unsigned char *buf, int size)
Definition: libzmq.c:159
OFFSET
#define OFFSET(x)
Definition: libzmq.c:39