23 #include <amqp_tcp_socket.h>
34 amqp_connection_state_t
conn;
45 #define DEFAULT_CHANNEL 1
47 #define OFFSET(x) offsetof(AMQPContext, x)
48 #define D AV_OPT_FLAG_DECODING_PARAM
49 #define E AV_OPT_FLAG_ENCODING_PARAM
51 {
"pkt_size",
"Maximum send/read packet size",
OFFSET(pkt_size),
AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags =
D |
E },
54 {
"connection_timeout",
"Initial connection timeout",
OFFSET(connection_timeout),
AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags =
D |
E},
55 {
"delivery_mode",
"Delivery mode",
OFFSET(delivery_mode),
AV_OPT_TYPE_INT, { .i64 = AMQP_DELIVERY_PERSISTENT }, 1, 2, .flags =
E, .unit =
"delivery_mode"},
56 {
"persistent",
"Persistent delivery mode", 0,
AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_PERSISTENT }, 0, 0,
E, .unit =
"delivery_mode" },
57 {
"non-persistent",
"Non-persistent delivery mode", 0,
AV_OPT_TYPE_CONST, { .i64 = AMQP_DELIVERY_NONPERSISTENT }, 0, 0,
E, .unit =
"delivery_mode" },
66 const char *user, *password =
NULL, *vhost;
67 const char *user_decoded, *password_decoded, *vhost_decoded;
69 amqp_rpc_reply_t broker_reply;
70 struct timeval tval = { 0 };
75 h->max_packet_size =
s->pkt_size;
78 hostname,
sizeof(hostname), &port, path,
sizeof(path), uri);
83 if (hostname[0] ==
'\0' || port <= 0 || port > 65535 ) {
88 p = strchr(credentials,
':');
94 if (!password || *password ==
'\0')
98 if (!password_decoded)
112 p = strchr(path,
'?');
123 if (!vhost_decoded) {
129 s->conn = amqp_new_connection();
138 s->socket = amqp_tcp_socket_new(
s->conn);
141 goto destroy_connection;
144 if (
s->connection_timeout < 0)
145 s->connection_timeout = (
h->rw_timeout > 0 ?
h->rw_timeout : 5000000);
147 tval.tv_sec =
s->connection_timeout / 1000000;
148 tval.tv_usec =
s->connection_timeout % 1000000;
149 ret = amqp_socket_open_noblock(
s->socket, hostname, port, &tval);
153 amqp_error_string2(
ret));
154 goto destroy_connection;
157 broker_reply = amqp_login(
s->conn, vhost_decoded, 0,
s->pkt_size, 0,
158 AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
160 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
162 server_msg = AMQP_ACCESS_REFUSED;
163 goto close_connection;
167 broker_reply = amqp_get_rpc_reply(
s->conn);
169 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
171 server_msg = AMQP_CHANNEL_ERROR;
172 goto close_connection;
176 amqp_bytes_t queuename;
178 amqp_queue_declare_ok_t *
r;
181 0, 0, 0, 1, amqp_empty_table);
182 broker_reply = amqp_get_rpc_reply(
s->conn);
183 if (!
r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
185 server_msg = AMQP_RESOURCE_ERROR;
190 queuename.bytes = queuename_buff;
192 memcpy(queuename.bytes,
r->queue.bytes, queuename.len);
195 amqp_cstring_bytes(
s->exchange),
196 amqp_cstring_bytes(
s->routing_key), amqp_empty_table);
198 broker_reply = amqp_get_rpc_reply(
s->conn);
199 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
201 server_msg = AMQP_INTERNAL_ERROR;
206 0, 1, 0, amqp_empty_table);
208 broker_reply = amqp_get_rpc_reply(
s->conn);
209 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
211 server_msg = AMQP_INTERNAL_ERROR;
224 amqp_connection_close(
s->conn, server_msg);
226 amqp_destroy_connection(
s->conn);
238 int fd = amqp_socket_get_sockfd(
s->socket);
241 amqp_basic_properties_t props;
247 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
248 props.content_type = amqp_cstring_bytes(
"octet/stream");
249 props.delivery_mode =
s->delivery_mode;
252 amqp_cstring_bytes(
s->routing_key), 0, 0,
266 int fd = amqp_socket_get_sockfd(
s->socket);
269 amqp_rpc_reply_t broker_reply;
276 amqp_maybe_release_buffers(
s->conn);
277 broker_reply = amqp_consume_message(
s->conn, &
envelope,
NULL, 0);
279 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
283 s->pkt_size_overflow =
FFMAX(
s->pkt_size_overflow,
envelope.message.body.len);
285 "Message will be truncated. Setting -pkt_size %d "
286 "may resolve this issue.\n",
s->pkt_size_overflow);
300 amqp_connection_close(
s->conn, AMQP_REPLY_SUCCESS);
301 amqp_destroy_connection(
s->conn);