23 #include <amqp_tcp_socket.h>
35 amqp_connection_state_t
conn;
46 #define DEFAULT_CHANNEL 1
48 #define OFFSET(x) offsetof(AMQPContext, x)
49 #define D AV_OPT_FLAG_DECODING_PARAM
50 #define E AV_OPT_FLAG_ENCODING_PARAM
52 {
"pkt_size",
"Maximum send/read packet size",
OFFSET(pkt_size),
AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags =
D |
E },
55 {
"connection_timeout",
"Initial connection timeout",
OFFSET(connection_timeout),
AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags =
D |
E},
56 {
"delivery_mode",
"Delivery mode",
OFFSET(delivery_mode),
AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags =
E, .unit =
"delivery_mode"},
57 {
"persistent",
"Persistent delivery mode", 0,
AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0,
E, .unit =
"delivery_mode" },
58 {
"non-persistent",
"Non-persistent delivery mode", 0,
AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0,
E, .unit =
"delivery_mode" },
67 const char *user, *password =
NULL, *vhost;
68 const char *user_decoded, *password_decoded, *vhost_decoded;
70 amqp_rpc_reply_t broker_reply;
71 struct timeval tval = { 0 };
76 h->max_packet_size =
s->pkt_size;
79 hostname,
sizeof(hostname), &port, path,
sizeof(path), uri);
84 if (hostname[0] ==
'\0' || port <= 0 || port > 65535 ) {
89 p = strchr(credentials,
':');
95 if (!password || *password ==
'\0')
99 if (!password_decoded)
113 p = strchr(path,
'?');
124 if (!vhost_decoded) {
130 s->conn = amqp_new_connection();
139 s->socket = amqp_tcp_socket_new(
s->conn);
142 goto destroy_connection;
145 if (
s->connection_timeout < 0)
146 s->connection_timeout = (
h->rw_timeout > 0 ?
h->rw_timeout : 5000000);
148 tval.tv_sec =
s->connection_timeout / 1000000;
149 tval.tv_usec =
s->connection_timeout % 1000000;
150 ret = amqp_socket_open_noblock(
s->socket, hostname, port, &tval);
154 amqp_error_string2(
ret));
155 goto destroy_connection;
158 broker_reply = amqp_login(
s->conn, vhost_decoded, 0,
s->pkt_size, 0,
159 AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
161 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
163 server_msg = AMQP_ACCESS_REFUSED;
164 goto close_connection;
168 broker_reply = amqp_get_rpc_reply(
s->conn);
170 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
172 server_msg = AMQP_CHANNEL_ERROR;
173 goto close_connection;
177 amqp_bytes_t queuename;
179 amqp_queue_declare_ok_t *
r;
182 0, 0, 0, 1, amqp_empty_table);
183 broker_reply = amqp_get_rpc_reply(
s->conn);
184 if (!
r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
186 server_msg = AMQP_RESOURCE_ERROR;
191 queuename.bytes = queuename_buff;
193 memcpy(queuename.bytes,
r->queue.bytes, queuename.len);
196 amqp_cstring_bytes(
s->exchange),
197 amqp_cstring_bytes(
s->routing_key), amqp_empty_table);
199 broker_reply = amqp_get_rpc_reply(
s->conn);
200 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
202 server_msg = AMQP_INTERNAL_ERROR;
207 0, 1, 0, amqp_empty_table);
209 broker_reply = amqp_get_rpc_reply(
s->conn);
210 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
212 server_msg = AMQP_INTERNAL_ERROR;
225 amqp_connection_close(
s->conn, server_msg);
227 amqp_destroy_connection(
s->conn);
239 int fd = amqp_socket_get_sockfd(
s->socket);
242 amqp_basic_properties_t props;
248 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
249 props.content_type = amqp_cstring_bytes(
"octet/stream");
250 props.delivery_mode =
s->delivery_mode;
253 amqp_cstring_bytes(
s->routing_key), 0, 0,
267 int fd = amqp_socket_get_sockfd(
s->socket);
270 amqp_rpc_reply_t broker_reply;
277 amqp_maybe_release_buffers(
s->conn);
278 broker_reply = amqp_consume_message(
s->conn, &
envelope,
NULL, 0);
280 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
284 s->pkt_size_overflow =
FFMAX(
s->pkt_size_overflow,
envelope.message.body.len);
286 "Message will be truncated. Setting -pkt_size %d "
287 "may resolve this issue.\n",
s->pkt_size_overflow);
301 amqp_connection_close(
s->conn, AMQP_REPLY_SUCCESS);
302 amqp_destroy_connection(
s->conn);