23 #include <amqp_tcp_socket.h>
35 amqp_connection_state_t
conn;
45 #define DEFAULT_CHANNEL 1
47 #define OFFSET(x) offsetof(AMQPContext, x)
48 #define D AV_OPT_FLAG_DECODING_PARAM
49 #define E AV_OPT_FLAG_ENCODING_PARAM
51 {
"pkt_size",
"Maximum send/read packet size",
OFFSET(pkt_size),
AV_OPT_TYPE_INT, { .i64 = 131072 }, 4096, INT_MAX, .flags =
D |
E },
54 {
"connection_timeout",
"Initial connection timeout",
OFFSET(connection_timeout),
AV_OPT_TYPE_DURATION, { .i64 = -1 }, -1, INT64_MAX, .flags =
D |
E},
63 const char *user, *password =
NULL;
64 const char *user_decoded, *password_decoded;
66 amqp_rpc_reply_t broker_reply;
67 struct timeval tval = { 0 };
72 h->max_packet_size =
s->pkt_size;
75 hostname,
sizeof(hostname), &port,
NULL, 0, uri);
80 if (hostname[0] ==
'\0' || port <= 0 || port > 65535 ) {
85 p = strchr(credentials,
':');
91 if (!password || *password ==
'\0')
95 if (!password_decoded)
108 s->conn = amqp_new_connection();
116 s->socket = amqp_tcp_socket_new(
s->conn);
119 goto destroy_connection;
122 if (
s->connection_timeout < 0)
123 s->connection_timeout = (
h->rw_timeout > 0 ?
h->rw_timeout : 5000000);
125 tval.tv_sec =
s->connection_timeout / 1000000;
126 tval.tv_usec =
s->connection_timeout % 1000000;
127 ret = amqp_socket_open_noblock(
s->socket, hostname, port, &tval);
131 amqp_error_string2(
ret));
132 goto destroy_connection;
135 broker_reply = amqp_login(
s->conn,
"/", 0,
s->pkt_size, 0,
136 AMQP_SASL_METHOD_PLAIN, user_decoded, password_decoded);
138 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
140 server_msg = AMQP_ACCESS_REFUSED;
141 goto close_connection;
145 broker_reply = amqp_get_rpc_reply(
s->conn);
147 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
149 server_msg = AMQP_CHANNEL_ERROR;
150 goto close_connection;
154 amqp_bytes_t queuename;
156 amqp_queue_declare_ok_t *
r;
159 0, 0, 0, 1, amqp_empty_table);
160 broker_reply = amqp_get_rpc_reply(
s->conn);
161 if (!
r || broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
163 server_msg = AMQP_RESOURCE_ERROR;
168 queuename.bytes = queuename_buff;
170 memcpy(queuename.bytes,
r->queue.bytes, queuename.len);
173 amqp_cstring_bytes(
s->exchange),
174 amqp_cstring_bytes(
s->routing_key), amqp_empty_table);
176 broker_reply = amqp_get_rpc_reply(
s->conn);
177 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
179 server_msg = AMQP_INTERNAL_ERROR;
184 0, 1, 0, amqp_empty_table);
186 broker_reply = amqp_get_rpc_reply(
s->conn);
187 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL) {
189 server_msg = AMQP_INTERNAL_ERROR;
201 amqp_connection_close(
s->conn, server_msg);
203 amqp_destroy_connection(
s->conn);
214 int fd = amqp_socket_get_sockfd(
s->socket);
217 amqp_basic_properties_t props;
223 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
224 props.content_type = amqp_cstring_bytes(
"octet/stream");
225 props.delivery_mode = 2;
228 amqp_cstring_bytes(
s->routing_key), 0, 0,
242 int fd = amqp_socket_get_sockfd(
s->socket);
245 amqp_rpc_reply_t broker_reply;
252 amqp_maybe_release_buffers(
s->conn);
253 broker_reply = amqp_consume_message(
s->conn, &
envelope,
NULL, 0);
255 if (broker_reply.reply_type != AMQP_RESPONSE_NORMAL)
259 s->pkt_size_overflow =
FFMAX(
s->pkt_size_overflow,
envelope.message.body.len);
261 "Message will be truncated. Setting -pkt_size %d "
262 "may resolve this issue.\n",
s->pkt_size_overflow);
276 amqp_connection_close(
s->conn, AMQP_REPLY_SUCCESS);
277 amqp_destroy_connection(
s->conn);