Prueba de los ejemplos de publicación de RabbitMQ Pika
Con RabbitMQ Pika publicación asíncrona no se olvide de incluir todas las condiciones para evitar la pérdida de datos.
Se ha escrito mucho sobre la publicación síncrona vs asíncrona con RabbitMQ, ver enlaces más abajo, no voy a repetirlo aquí. Como es la primera vez que utilizo RabbitMQ, he querido probar tanto la versión de publicación síncrona como la asíncrona,
utilizando los ejemplos incluidos con Pika, la librería cliente RabbitMQ (AMQP 0-9-1) para Python.
Para mi sorpresa, el ejemplo asíncrono de Pika no detectaba cuando se eliminaba la cola, sino que seguía enviando mensajes. ¿Qué está pasando aquí?
Cómo probarlo
Usamos el RabbitMQ con gestión Docker image, la interfaz de gestión está en:
http://127.0.0.1:15672
Aquí he cambiado las credenciales a 'user'/'password'.
He creado dos scripts Python basados en los ejemplos Pika :
- publicación sincrónica, utilizando BlockingConnection
- publicación asíncrona, utilizando SelectConnection
Los scripts siguen publicando mensajes cada 2 segundos.
Cuando un script se está ejecutando (publicando mensajes en una cola) creo las siguientes condiciones:
- Terminate RabbitMQ
- Borrar el intercambio
- Borrar la cola
Terminar RabbitMQ se hace matando el contenedor Docker . La eliminación del intercambio y de la cola se realiza desde la interfaz de gestión de RabbitMQ . Veamos qué ocurre.
Publicación síncrona utilizando BlockingConnection
El código:
# blockingconnection.py
# based on:
# Using Delivery Confirmations with the BlockingConnection
# https://pika.readthedocs.io/en/stable/examples/blocking_delivery_confirmations.html?highlight=blockingconnection
# changes made:
# - username/password = user/password
# - added to exchange_declare: durable=True, auto_delete=False
# - added to queue_declare: durable=True, exclusive=False, auto_delete=False
import pika
from pika.exchange_type import ExchangeType
import time
username = 'user'
password = 'password'
my_exchange = 'my_test_exchange'
my_exchange_type = ExchangeType.direct
my_queue = 'my_test_queue'
my_routing_key = my_queue
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(
pika.ConnectionParameters(
credentials=pika.PlainCredentials(
username,
password
),
),
)
# get channel
channel = connection.channel()
# declare our exchange
channel.exchange_declare(
exchange=my_exchange,
exchange_type=my_exchange_type,
durable=True,
auto_delete=False,
)
# declare our queue
channel.queue_declare(
queue=my_queue,
durable=True,
exclusive=False,
auto_delete=False,
)
# bind queue to exchange
channel.queue_bind(
my_queue,
my_exchange,
routing_key=my_routing_key,
)
# Turn on delivery confirmations
channel.confirm_delivery()
# Send messages
for i in range(20):
try:
channel.basic_publish(
exchange=my_exchange,
routing_key=my_routing_key,
body='Hello World!',
properties=pika.BasicProperties(
content_type='text/plain',
delivery_mode=2,
),
mandatory=True,
),
print('Message {} publish was confirmed'.format(i))
except Exception as e:
e_name = type(e).__name__
print('Message {} could not be confirmed, exception = {}, e = {}'.format(i, e_name, e))
time.sleep(2)
1. Sincrónico: Terminar RabbitMQ
Cuando se termina RabbitMQ , el mensaje no puede ser confirmado. Bien.
...
Message 1 publish was confirmed
Message 2 publish was confirmed
Message 3 publish was confirmed
Message 4 could not be confirmed, exception = ConnectionClosedByBroker, e = (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
Message 5 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 6 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 7 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
...
2. Sincrónico: Borrar intercambio
Cuando se elimina el intercambio, el mensaje no puede confirmarse. Bien.
...
Message 3 publish was confirmed
Message 4 publish was confirmed
Message 5 publish was confirmed
Message 6 could not be confirmed, exception = ChannelClosedByBroker, e = (404, "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'")
Message 7 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 8 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 9 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
...
3. Sincrónico: Borrar cola
Cuando se borra la cola, el mensaje no se puede confirmar. Bien.
...
Message 6 publish was confirmed
Message 7 publish was confirmed
Message 8 publish was confirmed
Message 9 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
Message 10 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
Message 11 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
...
Publicación asíncrona utilizando SelectConnection
El código:
# selectconnection.py
# based on:
# https://github.com/pika/pika/blob/master/examples/asynchronous_publisher_example.py
# changes made:
# - username/password = user/password
# - added username, password to connect string
# - added to exchange_declare: durable=True, auto_delete=False
# - added to queue_declare: durable=True, exclusive=False, auto_delete=False
# - added to basic_publish: mandatory=True
# - added to basic_publish properties: delivery_mode=2
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205
import functools
import logging
import json
import pika
from pika.exchange_type import ExchangeType
LOG_FORMAT = ('%(asctime)s %(levelname) -8s [%(funcName) -35s %(lineno) 5d] %(message)s')
LOGGER = logging.getLogger(__name__)
logging_level = logging.INFO
username = 'user'
password = 'password'
my_exchange = 'my_test_exchange'
my_exchange_type = ExchangeType.direct
my_queue = 'my_test_queue'
my_routing_key = my_queue
class ExamplePublisher(object):
"""This is an example publisher that will handle unexpected interactions
with RabbitMQ such as channel and connection closures.
If RabbitMQ closes the connection, it will reopen it. You should
look at the output, as there are limited reasons why the connection may
be closed, which usually are tied to permission related issues or
socket timeouts.
It uses delivery confirmations and illustrates one way to keep track of
messages that have been sent and if they've been confirmed by RabbitMQ.
"""
EXCHANGE = my_exchange
EXCHANGE_TYPE = my_exchange_type
QUEUE = my_queue
ROUTING_KEY = my_routing_key
PUBLISH_INTERVAL = 2
def __init__(self, amqp_url):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param str amqp_url: The URL for connecting to RabbitMQ
"""
self._connection = None
self._channel = None
self._deliveries = None
self._acked = None
self._nacked = None
self._message_number = None
self._stopping = False
self._url = amqp_url
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(
pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)
def on_connection_open(self, _unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object in
case we need it, but in this case, we'll just mark it unused.
:param pika.SelectConnection _unused_connection: The connection
"""
LOGGER.info('Connection opened')
self.open_channel()
def on_connection_open_error(self, _unused_connection, err):
"""This method is called by pika if the connection to RabbitMQ
can't be established.
:param pika.SelectConnection _unused_connection: The connection
:param Exception err: The error
"""
LOGGER.error('Connection open failed, reopening in 5 seconds: %s', err)
self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
def on_connection_closed(self, _unused_connection, reason):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param Exception reason: exception representing reason for loss of
connection.
"""
self._channel = None
if self._stopping:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
def open_channel(self):
"""This method will open a new channel with RabbitMQ by issuing the
Channel.Open RPC command. When RabbitMQ confirms the channel is open
by sending the Channel.OpenOK RPC reply, the on_channel_open method
will be invoked.
"""
LOGGER.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel):
"""This method is invoked by pika when the channel has been opened.
The channel object is passed in so we can make use of it.
Since the channel is now open, we'll declare the exchange to use.
:param pika.channel.Channel channel: The channel object
"""
LOGGER.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
# # (1 of 3) add to detect unroutable messages
# # self.add_on_channel_return_callback()
self.setup_exchange(self.EXCHANGE)
def add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
RabbitMQ unexpectedly closes the channel.
"""
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reason):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel channel: The closed channel
:param Exception reason: why the channel was closed
"""
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self._channel = None
if not self._stopping:
self._connection.close()
# # (2 of 3) add to detect unroutable messages
def add_on_channel_return_callback(self):
LOGGER.info('Adding channel return callback')
self._channel.add_on_return_callback(self.on_channel_returned)
# # (3 of 3) add to detect unroutable messages
def on_channel_returned(self, channel, method, properties, body):
"""Invoked by pika when RabbitMQ unexpectedly returns from the channel.
"""
LOGGER.warning('Channel = {} returned, method = {}, properties = {}, body = {}'.format(channel, method, properties, body))
self._channel = None
if not self._stopping:
self._connection.close()
def setup_exchange(self, exchange_name):
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
command. When it is complete, the on_exchange_declareok method will
be invoked by pika.
:param str|unicode exchange_name: The name of the exchange to declare
"""
LOGGER.info('Declaring exchange %s', exchange_name)
# Note: using functools.partial is not required, it is demonstrating
# how arbitrary data can be passed to the callback when it is called
cb = functools.partial(
self.on_exchange_declareok, userdata=exchange_name)
self._channel.exchange_declare(
exchange=exchange_name,
exchange_type=self.EXCHANGE_TYPE,
durable=True,
auto_delete=False,
callback=cb)
def on_exchange_declareok(self, _unused_frame, userdata):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
command.
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
:param str|unicode userdata: Extra user data (exchange name)
"""
LOGGER.info('Exchange declared: %s', userdata)
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
command. When it is complete, the on_queue_declareok method will
be invoked by pika.
:param str|unicode queue_name: The name of the queue to declare.
"""
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(
queue=queue_name,
durable=True,
exclusive=False,
auto_delete=False,
callback=self.on_queue_declareok,
)
def on_queue_declareok(self, _unused_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
setup_queue has completed. In this method we will bind the queue
and exchange together with the routing key by issuing the Queue.Bind
RPC command. When this command is complete, the on_bindok method will
be invoked by pika.
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
"""
LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE,
self.ROUTING_KEY)
self._channel.queue_bind(
self.QUEUE,
self.EXCHANGE,
routing_key=self.ROUTING_KEY,
callback=self.on_bindok)
def on_bindok(self, _unused_frame):
"""This method is invoked by pika when it receives the Queue.BindOk
response from RabbitMQ. Since we know we're now setup and bound, it's
time to start publishing."""
LOGGER.info('Queue bound')
self.start_publishing()
def start_publishing(self):
"""This method will enable delivery confirmations and schedule the
first message to be sent to RabbitMQ
"""
LOGGER.info('Issuing consumer related RPC commands')
self.enable_delivery_confirmations()
self.schedule_next_message()
def enable_delivery_confirmations(self):
"""Send the Confirm.Select RPC method to RabbitMQ to enable delivery
confirmations on the channel. The only way to turn this off is to close
the channel and create a new one.
When the message is confirmed from RabbitMQ, the
on_delivery_confirmation method will be invoked passing in a Basic.Ack
or Basic.Nack method from RabbitMQ that will indicate which messages it
is confirming or rejecting.
"""
LOGGER.info('Issuing Confirm.Select RPC command')
self._channel.confirm_delivery(self.on_delivery_confirmation)
def on_delivery_confirmation(self, method_frame):
"""Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
command, passing in either a Basic.Ack or Basic.Nack frame with
the delivery tag of the message that was published. The delivery tag
is an integer counter indicating the message number that was sent
on the channel via Basic.Publish. Here we're just doing house keeping
to keep track of stats and remove message numbers that we expect
a delivery confirmation of from the list used to keep track of messages
that are pending confirmation.
:param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
"""
LOGGER.info('Received method_frame: {}'.format(method_frame))
confirmation_type = method_frame.method.NAME.split('.')[1].lower()
LOGGER.info('Received %s for delivery tag: %i', confirmation_type,
method_frame.method.delivery_tag)
if confirmation_type == 'ack':
self._acked += 1
elif confirmation_type == 'nack':
self._nacked += 1
self._deliveries.remove(method_frame.method.delivery_tag)
LOGGER.info(
'Published %i messages, %i have yet to be confirmed, '
'%i were acked and %i were nacked', self._message_number,
len(self._deliveries), self._acked, self._nacked)
def schedule_next_message(self):
"""If we are not closing our connection to RabbitMQ, schedule another
message to be delivered in PUBLISH_INTERVAL seconds.
"""
LOGGER.info('Scheduling next message for %0.1f seconds',
self.PUBLISH_INTERVAL)
self._connection.ioloop.call_later(self.PUBLISH_INTERVAL,
self.publish_message)
def publish_message(self):
"""If the class is not stopping, publish a message to RabbitMQ,
appending a list of deliveries with the message number that was sent.
This list will be used to check for delivery confirmations in the
on_delivery_confirmations method.
Once the message has been sent, schedule another message to be sent.
The main reason I put scheduling in was just so you can get a good idea
of how the process is flowing by slowing down and speeding up the
delivery intervals by changing the PUBLISH_INTERVAL constant in the
class.
"""
if self._channel is None or not self._channel.is_open:
return
hdrs = {u'مفتاح': u' قيمة', u'键': u'值', u'キー': u'値'}
properties = pika.BasicProperties(
app_id='example-publisher',
content_type='application/json',
headers=hdrs,
delivery_mode=2,
)
message = u'مفتاح قيمة 键 值 キー 値'
self._channel.basic_publish(
self.EXCHANGE,
self.ROUTING_KEY,
json.dumps(message, ensure_ascii=False),
properties,
mandatory=True,
)
self._message_number += 1
self._deliveries.append(self._message_number)
LOGGER.info('Published message # %i', self._message_number)
self.schedule_next_message()
def run(self):
"""Run the example code by connecting and then starting the IOLoop.
"""
while not self._stopping:
self._connection = None
self._deliveries = []
self._acked = 0
self._nacked = 0
self._message_number = 0
try:
self._connection = self.connect()
self._connection.ioloop.start()
except KeyboardInterrupt:
self.stop()
if (self._connection is not None and
not self._connection.is_closed):
# Finish closing
self._connection.ioloop.start()
LOGGER.info('Stopped')
def stop(self):
"""Stop the example by closing the channel and connection. We
set a flag here so that we stop scheduling new messages to be
published. The IOLoop is started because this method is
invoked by the Try/Catch below when KeyboardInterrupt is caught.
Starting the IOLoop again will allow the publisher to cleanly
disconnect from RabbitMQ.
"""
LOGGER.info('Stopping')
self._stopping = True
self.close_channel()
self.close_connection()
def close_channel(self):
"""Invoke this command to close the channel with RabbitMQ by sending
the Channel.Close RPC command.
"""
if self._channel is not None:
LOGGER.info('Closing the channel')
self._channel.close()
def close_connection(self):
"""This method closes the connection to RabbitMQ."""
if self._connection is not None:
LOGGER.info('Closing connection')
self._connection.close()
def main():
#logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
logging.basicConfig(level=logging_level, format=LOG_FORMAT)
# Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
example = ExamplePublisher(
'amqp://' + username + ':' + password + '@localhost:5672/%2F?connection_attempts=3&heartbeat=3600'
)
example.run()
if __name__ == '__main__':
main()
1. Asíncrono: Terminar rabbitmq
Cuando esto sucede, el programa termina (he eliminado las trazas). Bien.
...
2022-03-18 13:44:49,693 INFO [on_delivery_confirmation 253] Published 6 messages, 0 have yet to be confirmed, 6 were acked and 0 were nacked
2022-03-18 13:44:50,680 INFO [publish_message 299] Published message # 7
2022-03-18 13:44:50,681 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:44:50,689 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 7
2022-03-18 13:44:50,689 INFO [on_delivery_confirmation 253] Published 7 messages, 0 have yet to be confirmed, 7 were acked and 0 were nacked
2022-03-18 13:44:51,203 INFO [abort 731] Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO [_initiate_abort 904] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO [_deactivate 869] Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO [_on_stream_terminated 1996] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByBroker: (320) "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
2022-03-18 13:44:51,204 WARNING [on_channel_closed 146] Channel 1 was closed: (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
2022-03-18 13:44:51,204 ERROR [close 1282] Illegal close(200, 'Normal shutdown') request on <SelectConnection CLOSED transport=None params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>> because it was called while connection state=CLOSED.
2022-03-18 13:44:51,204 ERROR [process 235] Calling <bound method ExamplePublisher.on_channel_closed of <__main__.ExamplePublisher object at 0x7efed2c60a90>> for "1:_on_channel_close" failed
2022-03-18 13:44:51,205 INFO [_close_and_finalize 882] Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,205 ERROR [log_exception_func_wrap 55] Wrapped func exited with exception. Caller's stack:
...
2. Asíncrono: Borrar intercambio
Lo que vemos aquí es que la conexión se cierra y se reinicia después de 5 segundos. Bien.
...
2022-03-18 13:43:29,755 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 31
2022-03-18 13:43:29,755 INFO [on_delivery_confirmation 253] Published 31 messages, 0 have yet to be confirmed, 31 were acked and 0 were nacked
2022-03-18 13:43:30,753 INFO [publish_message 299] Published message # 32
2022-03-18 13:43:30,753 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:30,755 WARNING [_on_close_from_broker 1070] Received remote Channel.Close (404): "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a602bf40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
2022-03-18 13:43:30,755 WARNING [on_channel_closed 146] Channel 1 was closed: (404, "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'")
2022-03-18 13:43:30,755 INFO [close 1295] Closing connection (200): 'Normal shutdown'
2022-03-18 13:43:30,755 INFO [close 1322] Connection.close is waiting for 1 channels to close: <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a602bf40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:30,757 INFO [abort 731] Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,757 INFO [_initiate_abort 904] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,757 INFO [_deactivate 869] Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,758 INFO [_on_stream_terminated 1996] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByClient: (200) 'Normal shutdown'
2022-03-18 13:43:30,758 INFO [_on_stream_terminated 2065] Stack terminated due to ConnectionClosedByClient: (200) 'Normal shutdown'
2022-03-18 13:43:30,758 WARNING [on_connection_closed 106] Connection closed, reopening in 5 seconds: (200, 'Normal shutdown')
2022-03-18 13:43:30,758 INFO [_close_and_finalize 882] Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:35,763 INFO [connect 69] Connecting to amqp://user:password@localhost:5672/%2F?connection_attempts=3&heartbeat=3600
2022-03-18 13:43:35,764 INFO [start 179] Pika version 1.2.0 connecting to ('127.0.0.1', 5672)
2022-03-18 13:43:35,764 INFO [_on_writable 345] Socket connected: <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44168), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:35,764 INFO [_on_transport_establishment_done 428] Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>).
2022-03-18 13:43:35,766 INFO [on_connection_open 82] Connection opened
2022-03-18 13:43:35,766 INFO [open_channel 116] Creating a new channel
2022-03-18 13:43:35,767 INFO [_report_completion_and_cleanup 293] AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:35,767 INFO [_report_completion_and_cleanup 725] AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:35,767 INFO [on_channel_open 125] Channel opened
2022-03-18 13:43:35,767 INFO [add_on_channel_close_callback 134] Adding channel close callback
2022-03-18 13:43:35,767 INFO [setup_exchange 157] Declaring exchange my_test_exchange
2022-03-18 13:43:35,768 INFO [on_exchange_declareok 173] Exchange declared: my_test_exchange
2022-03-18 13:43:35,768 INFO [setup_queue 182] Declaring queue my_test_queue
2022-03-18 13:43:35,769 INFO [on_queue_declareok 199] Binding my_test_exchange to my_test_queue with my_test_queue
2022-03-18 13:43:35,769 INFO [on_bindok 211] Queue bound
2022-03-18 13:43:35,769 INFO [start_publishing 218] Issuing consumer related RPC commands
2022-03-18 13:43:35,769 INFO [enable_delivery_confirmations 231] Issuing Confirm.Select RPC command
2022-03-18 13:43:35,769 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:36,771 INFO [publish_message 299] Published message # 1
2022-03-18 13:43:36,771 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:36,785 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 1
2022-03-18 13:43:36,785 INFO [on_delivery_confirmation 253] Published 1 messages, 0 have yet to be confirmed, 1 were acked and 0 were nacked
2022-03-18 13:43:37,773 INFO [publish_message 299] Published message # 2---
...
3. Asíncrono: Borrar cola
No pasa nada cuando borramos la cola. No es bueno. ¿Por qué no se detecta?
...
2022-03-18 13:46:42,361 INFO [publish_message 299] Published message # 16
2022-03-18 13:46:42,361 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:42,363 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 16
2022-03-18 13:46:42,363 INFO [on_delivery_confirmation 253] Published 16 messages, 0 have yet to be confirmed, 16 were acked and 0 were nacked
2022-03-18 13:46:43,363 INFO [publish_message 299] Published message # 17
2022-03-18 13:46:43,363 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:43,364 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 17
2022-03-18 13:46:43,364 INFO [on_delivery_confirmation 253] Published 17 messages, 0 have yet to be confirmed, 17 were acked and 0 were nacked
2022-03-18 13:46:44,365 INFO [publish_message 299] Published message # 18
2022-03-18 13:46:44,366 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:44,367 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 18
2022-03-18 13:46:44,367 INFO [on_delivery_confirmation 253] Published 18 messages, 0 have yet to be confirmed, 18 were acked and 0 were nacked
2022-03-18 13:46:45,367 INFO [publish_message 299] Published message # 19
2022-03-18 13:46:45,367 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:45,368 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 19
2022-03-18 13:46:45,368 INFO [on_delivery_confirmation 253] Published 19 messages, 0 have yet to be confirmed, 19 were acked and 0 were nacked
2022-03-18 13:46:46,368 INFO [publish_message 299] Published message # 20
...
Publicación asíncrona: basic.ack y basic.nack no son suficientes
De la documentación:
'¿Cuándo serán confirmados los mensajes publicados por el broker?
Para los mensajes no enrutables, el broker emitirá una confirmación una vez que el intercambio verifique que un mensaje no se enruta a ninguna cola (devuelve una lista vacía de colas). Si el mensaje también se publica como obligatorio, el basic.return se envía al cliente antes del basic.ack. Lo mismo ocurre con los acuses de recibo negativos (basic.nack)".
Esto significa que debemos capturar el basic.return en nuestro script. Ya he añadido las líneas al código anterior que se encargan de esto. Para habilitarlas cambia esta línea:
# # self.add_on_channel_return_callback()
a:
self.add_on_channel_return_callback()
Si vuelves a ejecutar el script, notarás que después de eliminar la cola, el script intenta reiniciar la conexión. El método on_channel_returned(), devuelve el mensaje, antes de que basic.ack sea recibido.
Debido a que el basic.return es enviado al cliente antes (!) del basic.ack, el mensaje será reintentado.
Resumen
Siempre pruebe todas las condiciones antes de usar el código de ejemplo. Yo sólo hice tres pruebas, pero puedes hacer muchas más, por ejemplo, comprobar si el intercambio y la cola siguen presentes después de un reinicio de RabbitMQ. Y comprobar si los mensajes siguen en la cola después de un reinicio de RabbitMQ.
Además, la publicación asíncrona, sin pérdida de datos, es decir, con confirmaciones, introduce mucha complejidad. El rendimiento puede aumentar drásticamente, pero esto también depende en gran medida de la arquitectura de su aplicación. ¿Realmente necesita la publicación asíncrona?
Enlaces / créditos
Notify consumer when a queue is deleted on rabbitmq
https://stackoverflow.com/questions/15527226/notify-consumer-when-a-queue-is-deleted-on-rabbitmq
Part 1: RabbitMQ Best Practices
https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html
Publishing Throughput - Asynchronous vs Synchronous
https://www.cloudamqp.com/blog/publishing-throughput-asynchronous-vs-synchronous.html
RabbitMQ - AMQP 0-9-1 Complete Reference Guide
https://www.rabbitmq.com/amqp-0-9-1-reference.html
RabbitMQ - Publisher Confirms
https://www.rabbitmq.com/confirms.html
RabbitMQ - Reliability Guide
https://www.rabbitmq.com/reliability.html
Leer más
Message broker Pika RabbitMQ
Deje un comentario
Comente de forma anónima o inicie sesión para comentar.
Comentarios (1)
Deje una respuesta.
Responda de forma anónima o inicie sesión para responder.
Exceptional blog - TYVM. I'm new to the pika library and set up a work queue and multiple producers and consumers. It worked just as expected right until the connection to my RabbitMQ server dropped out. All the consumers threw exceptions, but the producers? They happily posted their messages into the ether, never to be seen again.
Which is how I ended up here, in your comments.
The async test code - the default messages and headers are intriguing
Recientes
- Cómo ocultar las claves primarias de la base de datos UUID de su aplicación web
- Don't Repeat Yourself (DRY) con Jinja2
- SQLAlchemy, PostgreSQL, número máximo de filas por user
- Mostrar los valores en filtros dinámicos SQLAlchemy
- Transferencia de datos segura con cifrado de Public Key y pyNaCl
- rqlite: una alternativa de alta disponibilidad y dist distribuida SQLite
Más vistos
- Usando Python's pyOpenSSL para verificar los certificados SSL descargados de un host
- Usando UUIDs en lugar de Integer Autoincrement Primary Keys con SQLAlchemy y MariaDb
- Conectarse a un servicio en un host Docker desde un contenedor Docker
- Usando PyInstaller y Cython para crear un ejecutable de Python
- SQLAlchemy: Uso de Cascade Deletes para eliminar objetos relacionados
- Flask RESTful API validación de parámetros de solicitud con esquemas Marshmallow