API¶
The PublishingMixin adds RabbitMQ publishing capabilities to a request handler, with methods to speed the development of publishing RabbitMQ messages.
Configured using the following environment variables:
AMQP_URL
- The AMQP URL to connect to.AMQP_TIMEOUT
- The optional maximum time to wait for a bad state to resolve before treating the failure as persistent.AMQP_RECONNECT_DELAY
- The optional time in seconds to wait before reconnecting on connection failure.AMQP_CONNECTION_ATTEMPTS
- The optional number of connection attempts to make before giving up.
The AMQP`
prefix is interchangeable with RABBITMQ
. For example, you can
use AMQP_URL
or RABBITMQ_URL
.
-
exception
sprockets.mixins.amqp.
AMQPException
(*args)¶ Base Class for the the AMQP client
-
class
sprockets.mixins.amqp.
Client
(url, enable_confirmations=True, reconnect_delay=5, connection_attempts=3, default_app_id=None, on_ready_callback=None, on_unavailable_callback=None, on_return_callback=None, io_loop=None)¶ This class encompasses all of the AMQP/RabbitMQ specific behaviors.
If RabbitMQ closes the connection, it will reopen it. You should look at the output, as there are limited reasons why the connection may be closed, which usually are tied to permission related issues or socket timeouts.
If the channel is closed, it will indicate a problem with one of the commands that were issued and that should surface in the output as well.
-
close
()¶ Cleanly shutdown the connection to RabbitMQ
Raises: sprockets.mixins.amqp.ConnectionStateError
-
connect
()¶ This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method will be invoked by pika.
Return type: pika.TornadoConnection
-
connecting
¶ Returns
True
if the connection to RabbitMQ is open and a channel is in the process of connecting.Return type: bool
-
static
on_back_pressure_detected
(obj)¶ This method is called by pika if it believes that back pressure is being applied to the TCP socket.
Parameters: obj (unknown) – The connection where back pressure is being applied
-
on_basic_return
(_channel, method, properties, body)¶ Invoke a registered callback or log the returned message.
Parameters: - _channel (pika.channel.Channel) – The channel the message was sent on
- method (pika.spec.Basic.Return) – The method object
- properties (pika.spec.BasicProperties) – The message properties
- unicode, bytes body (str,) – The message body
-
on_channel_closed
(channel, reply_code, reply_text)¶ Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that violates the protocol, such as re-declare an exchange or queue with different parameters.
In this case, we just want to log the error and create a new channel after setting the state back to connecting.
Parameters:
-
on_channel_flow
(method)¶ When RabbitMQ indicates the connection is unblocked, set the state appropriately.
Parameters: method (pika.spec.Channel.Flow) – The Channel flow frame
-
on_channel_open
(channel)¶ This method is invoked by pika when the channel has been opened. The channel object is passed in so we can make use of it.
Parameters: channel (pika.channel.Channel) – The channel object
-
on_connection_blocked
(method_frame)¶ This method is called by pika if RabbitMQ sends a connection blocked method, to let us know we need to throttle our publishing.
Parameters: method_frame (pika.amqp_object.Method) – The blocked method frame
-
on_connection_closed
(connection, reply_code, reply_text)¶ This method is invoked by pika when the connection to RabbitMQ is closed unexpectedly. Since it is unexpected, we will reconnect to RabbitMQ if it disconnects.
Parameters:
-
on_connection_open
(connection)¶ This method is called by pika once the connection to RabbitMQ has been established.
-
on_connection_open_error
(connection, error)¶ Invoked if the connection to RabbitMQ can not be made.
Parameters: error (Exception) – The exception indicating failure
-
on_connection_unblocked
(method_frame)¶ When RabbitMQ indicates the connection is unblocked, set the state appropriately.
Parameters: method_frame (pika.amqp_object.Method) – Unblocked method frame
-
on_delivery_confirmation
(method_frame)¶ Invoked by pika when RabbitMQ responds to a Basic.Publish RPC command, passing in either a Basic.Ack or Basic.Nack frame with the delivery tag of the message that was published. The delivery tag is an integer counter indicating the message number that was sent on the channel via Basic.Publish. Here we’re just doing house keeping to keep track of stats and remove message numbers that we expect a delivery confirmation of from the list used to keep track of messages that are pending confirmation.
Parameters: method_frame (pika.frame.Method) – Basic.Ack or Basic.Nack frame
-
publish
(exchange, routing_key, body, properties=None)¶ Publish a message to RabbitMQ. If the RabbitMQ connection is not established or is blocked, attempt to wait until sending is possible.
Parameters: Return type: Raises: Raises: sprockets.mixins.amqp.PublishingError
-
-
exception
sprockets.mixins.amqp.
ConnectionStateError
(*args)¶ Invoked when reconnect is attempted but the state is incorrect
-
exception
sprockets.mixins.amqp.
NotReadyError
(*args)¶ Raised if the
Client.publish()
is invoked and the connection is not ready for publishing.
-
exception
sprockets.mixins.amqp.
PublishingFailure
(*args)¶ Raised if the
Client.publish()
is invoked and an error occurs or the message delivery is not confirmed.
-
class
sprockets.mixins.amqp.
PublishingMixin
¶ This mixin adds publishing messages to RabbitMQ. It uses a persistent connection and channel opened when the application start up and automatically reopened if closed by RabbitMQ
-
amqp_publish
(exchange, routing_key, body, properties=None)¶ Publish a message to RabbitMQ
Parameters: Return type: Raises: sprockets.mixins.amqp.AMQPError
Raises: Raises: sprockets.mixins.amqp.PublishingError
-
-
sprockets.mixins.amqp.
install
(application, io_loop=None, **kwargs)¶ Call this to install AMQP for the Tornado application. Additional keyword arguments are passed through to the constructor of the AMQP object.
Parameters: - application (tornado.web.Application) – The tornado application
- io_loop (tornado.ioloop.IOLoop) – The current IOLoop.
Return type: