Testen der Veröffentlichungsbeispiele RabbitMQ Pika
Vergessen Sie bei der asynchronen Veröffentlichung von RabbitMQ Pika nicht, alle Bedingungen zu berücksichtigen, um Datenverluste zu vermeiden.
Über das synchrone und asynchrone Publizieren mit RabbitMQ wurde schon viel geschrieben, siehe Links unten, ich werde das hier nicht wiederholen. Da ich RabbitMQ zum ersten Mal verwende, wollte ich sowohl die synchrone als auch die asynchrone Veröffentlichungsversion ausprobieren,
unter Verwendung von Beispielen, die in Pika enthalten sind, der RabbitMQ (AMQP 0-9-1) Client-Bibliothek für Python.
Zu meiner Überraschung hat das asynchrone Beispiel Pika nicht erkannt, wenn die Warteschlange entfernt wurde, sondern hat weiterhin Nachrichten gesendet. Was ist hier los?
Wie wird getestet?
Wir verwenden den RabbitMQ mit dem Management Docker image, die Managementschnittstelle ist bei:
http://127.0.0.1:15672
Hier habe ich die Anmeldedaten in 'user'/'password' geändert.
Ich habe zwei Python -Skripte erstellt, die auf Pika -Beispielen basieren:
- synchrone Veröffentlichung, unter Verwendung von BlockingConnection
- asynchrone Veröffentlichung, unter Verwendung von SelectConnection
Die Skripte veröffentlichen die Nachrichten alle 2 Sekunden.
Wenn ein Skript läuft (das Nachrichten in einer Warteschlange veröffentlicht), schaffe ich die folgenden Bedingungen:
- Terminate RabbitMQ
- Löschen Sie den Austausch
- Löschen der Warteschlange
Das Beenden von RabbitMQ erfolgt durch das Töten des Containers Docker . Das Löschen des Austauschs und der Warteschlange erfolgt über die RabbitMQ -Verwaltungsschnittstelle. Schauen wir uns an, was passiert.
Synchrone Veröffentlichung mit BlockingConnection
Der Code:
# blockingconnection.py
# based on:
# Using Delivery Confirmations with the BlockingConnection
# https://pika.readthedocs.io/en/stable/examples/blocking_delivery_confirmations.html?highlight=blockingconnection
# changes made:
# - username/password = user/password
# - added to exchange_declare: durable=True, auto_delete=False
# - added to queue_declare: durable=True, exclusive=False, auto_delete=False
import pika
from pika.exchange_type import ExchangeType
import time
username = 'user'
password = 'password'
my_exchange = 'my_test_exchange'
my_exchange_type = ExchangeType.direct
my_queue = 'my_test_queue'
my_routing_key = my_queue
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(
pika.ConnectionParameters(
credentials=pika.PlainCredentials(
username,
password
),
),
)
# get channel
channel = connection.channel()
# declare our exchange
channel.exchange_declare(
exchange=my_exchange,
exchange_type=my_exchange_type,
durable=True,
auto_delete=False,
)
# declare our queue
channel.queue_declare(
queue=my_queue,
durable=True,
exclusive=False,
auto_delete=False,
)
# bind queue to exchange
channel.queue_bind(
my_queue,
my_exchange,
routing_key=my_routing_key,
)
# Turn on delivery confirmations
channel.confirm_delivery()
# Send messages
for i in range(20):
try:
channel.basic_publish(
exchange=my_exchange,
routing_key=my_routing_key,
body='Hello World!',
properties=pika.BasicProperties(
content_type='text/plain',
delivery_mode=2,
),
mandatory=True,
),
print('Message {} publish was confirmed'.format(i))
except Exception as e:
e_name = type(e).__name__
print('Message {} could not be confirmed, exception = {}, e = {}'.format(i, e_name, e))
time.sleep(2)
1. Synchron: Beenden von RabbitMQ
Wenn RabbitMQ abgebrochen wird, kann die Nachricht nicht bestätigt werden. Gut.
...
Message 1 publish was confirmed
Message 2 publish was confirmed
Message 3 publish was confirmed
Message 4 could not be confirmed, exception = ConnectionClosedByBroker, e = (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
Message 5 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 6 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 7 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
...
2. Synchron: Austausch löschen
Wenn der Austausch gelöscht wird, kann die Meldung nicht bestätigt werden. Gut.
...
Message 3 publish was confirmed
Message 4 publish was confirmed
Message 5 publish was confirmed
Message 6 could not be confirmed, exception = ChannelClosedByBroker, e = (404, "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'")
Message 7 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 8 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
Message 9 could not be confirmed, exception = ChannelWrongStateError, e = Channel is closed.
...
3. Synchron: Warteschlange löschen
Wenn die Warteschlange gelöscht wird, kann die Nachricht nicht bestätigt werden. Gut.
...
Message 6 publish was confirmed
Message 7 publish was confirmed
Message 8 publish was confirmed
Message 9 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
Message 10 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
Message 11 could not be confirmed, exception = UnroutableError, e = 1 unroutable message(s) returned
...
Asynchrone Veröffentlichung mit SelectConnection
Der Code:
# selectconnection.py
# based on:
# https://github.com/pika/pika/blob/master/examples/asynchronous_publisher_example.py
# changes made:
# - username/password = user/password
# - added username, password to connect string
# - added to exchange_declare: durable=True, auto_delete=False
# - added to queue_declare: durable=True, exclusive=False, auto_delete=False
# - added to basic_publish: mandatory=True
# - added to basic_publish properties: delivery_mode=2
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205
import functools
import logging
import json
import pika
from pika.exchange_type import ExchangeType
LOG_FORMAT = ('%(asctime)s %(levelname) -8s [%(funcName) -35s %(lineno) 5d] %(message)s')
LOGGER = logging.getLogger(__name__)
logging_level = logging.INFO
username = 'user'
password = 'password'
my_exchange = 'my_test_exchange'
my_exchange_type = ExchangeType.direct
my_queue = 'my_test_queue'
my_routing_key = my_queue
class ExamplePublisher(object):
"""This is an example publisher that will handle unexpected interactions
with RabbitMQ such as channel and connection closures.
If RabbitMQ closes the connection, it will reopen it. You should
look at the output, as there are limited reasons why the connection may
be closed, which usually are tied to permission related issues or
socket timeouts.
It uses delivery confirmations and illustrates one way to keep track of
messages that have been sent and if they've been confirmed by RabbitMQ.
"""
EXCHANGE = my_exchange
EXCHANGE_TYPE = my_exchange_type
QUEUE = my_queue
ROUTING_KEY = my_routing_key
PUBLISH_INTERVAL = 2
def __init__(self, amqp_url):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param str amqp_url: The URL for connecting to RabbitMQ
"""
self._connection = None
self._channel = None
self._deliveries = None
self._acked = None
self._nacked = None
self._message_number = None
self._stopping = False
self._url = amqp_url
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
will be invoked by pika.
:rtype: pika.SelectConnection
"""
LOGGER.info('Connecting to %s', self._url)
return pika.SelectConnection(
pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)
def on_connection_open(self, _unused_connection):
"""This method is called by pika once the connection to RabbitMQ has
been established. It passes the handle to the connection object in
case we need it, but in this case, we'll just mark it unused.
:param pika.SelectConnection _unused_connection: The connection
"""
LOGGER.info('Connection opened')
self.open_channel()
def on_connection_open_error(self, _unused_connection, err):
"""This method is called by pika if the connection to RabbitMQ
can't be established.
:param pika.SelectConnection _unused_connection: The connection
:param Exception err: The error
"""
LOGGER.error('Connection open failed, reopening in 5 seconds: %s', err)
self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
def on_connection_closed(self, _unused_connection, reason):
"""This method is invoked by pika when the connection to RabbitMQ is
closed unexpectedly. Since it is unexpected, we will reconnect to
RabbitMQ if it disconnects.
:param pika.connection.Connection connection: The closed connection obj
:param Exception reason: exception representing reason for loss of
connection.
"""
self._channel = None
if self._stopping:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
def open_channel(self):
"""This method will open a new channel with RabbitMQ by issuing the
Channel.Open RPC command. When RabbitMQ confirms the channel is open
by sending the Channel.OpenOK RPC reply, the on_channel_open method
will be invoked.
"""
LOGGER.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel):
"""This method is invoked by pika when the channel has been opened.
The channel object is passed in so we can make use of it.
Since the channel is now open, we'll declare the exchange to use.
:param pika.channel.Channel channel: The channel object
"""
LOGGER.info('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
# # (1 of 3) add to detect unroutable messages
# # self.add_on_channel_return_callback()
self.setup_exchange(self.EXCHANGE)
def add_on_channel_close_callback(self):
"""This method tells pika to call the on_channel_closed method if
RabbitMQ unexpectedly closes the channel.
"""
LOGGER.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reason):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Channels are usually closed if you attempt to do something that
violates the protocol, such as re-declare an exchange or queue with
different parameters. In this case, we'll close the connection
to shutdown the object.
:param pika.channel.Channel channel: The closed channel
:param Exception reason: why the channel was closed
"""
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self._channel = None
if not self._stopping:
self._connection.close()
# # (2 of 3) add to detect unroutable messages
def add_on_channel_return_callback(self):
LOGGER.info('Adding channel return callback')
self._channel.add_on_return_callback(self.on_channel_returned)
# # (3 of 3) add to detect unroutable messages
def on_channel_returned(self, channel, method, properties, body):
"""Invoked by pika when RabbitMQ unexpectedly returns from the channel.
"""
LOGGER.warning('Channel = {} returned, method = {}, properties = {}, body = {}'.format(channel, method, properties, body))
self._channel = None
if not self._stopping:
self._connection.close()
def setup_exchange(self, exchange_name):
"""Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC
command. When it is complete, the on_exchange_declareok method will
be invoked by pika.
:param str|unicode exchange_name: The name of the exchange to declare
"""
LOGGER.info('Declaring exchange %s', exchange_name)
# Note: using functools.partial is not required, it is demonstrating
# how arbitrary data can be passed to the callback when it is called
cb = functools.partial(
self.on_exchange_declareok, userdata=exchange_name)
self._channel.exchange_declare(
exchange=exchange_name,
exchange_type=self.EXCHANGE_TYPE,
durable=True,
auto_delete=False,
callback=cb)
def on_exchange_declareok(self, _unused_frame, userdata):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
command.
:param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame
:param str|unicode userdata: Extra user data (exchange name)
"""
LOGGER.info('Exchange declared: %s', userdata)
self.setup_queue(self.QUEUE)
def setup_queue(self, queue_name):
"""Setup the queue on RabbitMQ by invoking the Queue.Declare RPC
command. When it is complete, the on_queue_declareok method will
be invoked by pika.
:param str|unicode queue_name: The name of the queue to declare.
"""
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(
queue=queue_name,
durable=True,
exclusive=False,
auto_delete=False,
callback=self.on_queue_declareok,
)
def on_queue_declareok(self, _unused_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
setup_queue has completed. In this method we will bind the queue
and exchange together with the routing key by issuing the Queue.Bind
RPC command. When this command is complete, the on_bindok method will
be invoked by pika.
:param pika.frame.Method method_frame: The Queue.DeclareOk frame
"""
LOGGER.info('Binding %s to %s with %s', self.EXCHANGE, self.QUEUE,
self.ROUTING_KEY)
self._channel.queue_bind(
self.QUEUE,
self.EXCHANGE,
routing_key=self.ROUTING_KEY,
callback=self.on_bindok)
def on_bindok(self, _unused_frame):
"""This method is invoked by pika when it receives the Queue.BindOk
response from RabbitMQ. Since we know we're now setup and bound, it's
time to start publishing."""
LOGGER.info('Queue bound')
self.start_publishing()
def start_publishing(self):
"""This method will enable delivery confirmations and schedule the
first message to be sent to RabbitMQ
"""
LOGGER.info('Issuing consumer related RPC commands')
self.enable_delivery_confirmations()
self.schedule_next_message()
def enable_delivery_confirmations(self):
"""Send the Confirm.Select RPC method to RabbitMQ to enable delivery
confirmations on the channel. The only way to turn this off is to close
the channel and create a new one.
When the message is confirmed from RabbitMQ, the
on_delivery_confirmation method will be invoked passing in a Basic.Ack
or Basic.Nack method from RabbitMQ that will indicate which messages it
is confirming or rejecting.
"""
LOGGER.info('Issuing Confirm.Select RPC command')
self._channel.confirm_delivery(self.on_delivery_confirmation)
def on_delivery_confirmation(self, method_frame):
"""Invoked by pika when RabbitMQ responds to a Basic.Publish RPC
command, passing in either a Basic.Ack or Basic.Nack frame with
the delivery tag of the message that was published. The delivery tag
is an integer counter indicating the message number that was sent
on the channel via Basic.Publish. Here we're just doing house keeping
to keep track of stats and remove message numbers that we expect
a delivery confirmation of from the list used to keep track of messages
that are pending confirmation.
:param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame
"""
LOGGER.info('Received method_frame: {}'.format(method_frame))
confirmation_type = method_frame.method.NAME.split('.')[1].lower()
LOGGER.info('Received %s for delivery tag: %i', confirmation_type,
method_frame.method.delivery_tag)
if confirmation_type == 'ack':
self._acked += 1
elif confirmation_type == 'nack':
self._nacked += 1
self._deliveries.remove(method_frame.method.delivery_tag)
LOGGER.info(
'Published %i messages, %i have yet to be confirmed, '
'%i were acked and %i were nacked', self._message_number,
len(self._deliveries), self._acked, self._nacked)
def schedule_next_message(self):
"""If we are not closing our connection to RabbitMQ, schedule another
message to be delivered in PUBLISH_INTERVAL seconds.
"""
LOGGER.info('Scheduling next message for %0.1f seconds',
self.PUBLISH_INTERVAL)
self._connection.ioloop.call_later(self.PUBLISH_INTERVAL,
self.publish_message)
def publish_message(self):
"""If the class is not stopping, publish a message to RabbitMQ,
appending a list of deliveries with the message number that was sent.
This list will be used to check for delivery confirmations in the
on_delivery_confirmations method.
Once the message has been sent, schedule another message to be sent.
The main reason I put scheduling in was just so you can get a good idea
of how the process is flowing by slowing down and speeding up the
delivery intervals by changing the PUBLISH_INTERVAL constant in the
class.
"""
if self._channel is None or not self._channel.is_open:
return
hdrs = {u'مفتاح': u' قيمة', u'键': u'值', u'キー': u'値'}
properties = pika.BasicProperties(
app_id='example-publisher',
content_type='application/json',
headers=hdrs,
delivery_mode=2,
)
message = u'مفتاح قيمة 键 值 キー 値'
self._channel.basic_publish(
self.EXCHANGE,
self.ROUTING_KEY,
json.dumps(message, ensure_ascii=False),
properties,
mandatory=True,
)
self._message_number += 1
self._deliveries.append(self._message_number)
LOGGER.info('Published message # %i', self._message_number)
self.schedule_next_message()
def run(self):
"""Run the example code by connecting and then starting the IOLoop.
"""
while not self._stopping:
self._connection = None
self._deliveries = []
self._acked = 0
self._nacked = 0
self._message_number = 0
try:
self._connection = self.connect()
self._connection.ioloop.start()
except KeyboardInterrupt:
self.stop()
if (self._connection is not None and
not self._connection.is_closed):
# Finish closing
self._connection.ioloop.start()
LOGGER.info('Stopped')
def stop(self):
"""Stop the example by closing the channel and connection. We
set a flag here so that we stop scheduling new messages to be
published. The IOLoop is started because this method is
invoked by the Try/Catch below when KeyboardInterrupt is caught.
Starting the IOLoop again will allow the publisher to cleanly
disconnect from RabbitMQ.
"""
LOGGER.info('Stopping')
self._stopping = True
self.close_channel()
self.close_connection()
def close_channel(self):
"""Invoke this command to close the channel with RabbitMQ by sending
the Channel.Close RPC command.
"""
if self._channel is not None:
LOGGER.info('Closing the channel')
self._channel.close()
def close_connection(self):
"""This method closes the connection to RabbitMQ."""
if self._connection is not None:
LOGGER.info('Closing connection')
self._connection.close()
def main():
#logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
logging.basicConfig(level=logging_level, format=LOG_FORMAT)
# Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F)
example = ExamplePublisher(
'amqp://' + username + ':' + password + '@localhost:5672/%2F?connection_attempts=3&heartbeat=3600'
)
example.run()
if __name__ == '__main__':
main()
1. Asynchron: Beenden von rabbitmq
Wenn dies geschieht, wird das Programm beendet (ich habe die Tracebacks entfernt). Gut.
...
2022-03-18 13:44:49,693 INFO [on_delivery_confirmation 253] Published 6 messages, 0 have yet to be confirmed, 6 were acked and 0 were nacked
2022-03-18 13:44:50,680 INFO [publish_message 299] Published message # 7
2022-03-18 13:44:50,681 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:44:50,689 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 7
2022-03-18 13:44:50,689 INFO [on_delivery_confirmation 253] Published 7 messages, 0 have yet to be confirmed, 7 were acked and 0 were nacked
2022-03-18 13:44:51,203 INFO [abort 731] Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO [_initiate_abort 904] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO [_deactivate 869] Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,204 INFO [_on_stream_terminated 1996] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByBroker: (320) "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
2022-03-18 13:44:51,204 WARNING [on_channel_closed 146] Channel 1 was closed: (320, "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
2022-03-18 13:44:51,204 ERROR [close 1282] Illegal close(200, 'Normal shutdown') request on <SelectConnection CLOSED transport=None params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>> because it was called while connection state=CLOSED.
2022-03-18 13:44:51,204 ERROR [process 235] Calling <bound method ExamplePublisher.on_channel_closed of <__main__.ExamplePublisher object at 0x7efed2c60a90>> for "1:_on_channel_close" failed
2022-03-18 13:44:51,205 INFO [_close_and_finalize 882] Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44170), raddr=('127.0.0.1', 5672)>
2022-03-18 13:44:51,205 ERROR [log_exception_func_wrap 55] Wrapped func exited with exception. Caller's stack:
...
2. Asynchron: Austausch löschen
Hier sehen wir, dass die Verbindung geschlossen und nach 5 Sekunden neu gestartet wird. Das ist gut.
...
2022-03-18 13:43:29,755 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 31
2022-03-18 13:43:29,755 INFO [on_delivery_confirmation 253] Published 31 messages, 0 have yet to be confirmed, 31 were acked and 0 were nacked
2022-03-18 13:43:30,753 INFO [publish_message 299] Published message # 32
2022-03-18 13:43:30,753 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:30,755 WARNING [_on_close_from_broker 1070] Received remote Channel.Close (404): "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a602bf40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
2022-03-18 13:43:30,755 WARNING [on_channel_closed 146] Channel 1 was closed: (404, "NOT_FOUND - no exchange 'my_test_exchange' in vhost '/'")
2022-03-18 13:43:30,755 INFO [close 1295] Closing connection (200): 'Normal shutdown'
2022-03-18 13:43:30,755 INFO [close 1322] Connection.close is waiting for 1 channels to close: <SelectConnection CLOSING transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a602bf40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:30,757 INFO [abort 731] Aborting transport connection: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,757 INFO [_initiate_abort 904] _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,757 INFO [_deactivate 869] Deactivating transport: state=1; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:30,758 INFO [_on_stream_terminated 1996] AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=None; pending-error=ConnectionClosedByClient: (200) 'Normal shutdown'
2022-03-18 13:43:30,758 INFO [_on_stream_terminated 2065] Stack terminated due to ConnectionClosedByClient: (200) 'Normal shutdown'
2022-03-18 13:43:30,758 WARNING [on_connection_closed 106] Connection closed, reopening in 5 seconds: (200, 'Normal shutdown')
2022-03-18 13:43:30,758 INFO [_close_and_finalize 882] Closing transport socket and unlinking: state=3; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44166), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:35,763 INFO [connect 69] Connecting to amqp://user:password@localhost:5672/%2F?connection_attempts=3&heartbeat=3600
2022-03-18 13:43:35,764 INFO [start 179] Pika version 1.2.0 connecting to ('127.0.0.1', 5672)
2022-03-18 13:43:35,764 INFO [_on_writable 345] Socket connected: <socket.socket fd=8, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 44168), raddr=('127.0.0.1', 5672)>
2022-03-18 13:43:35,764 INFO [_on_transport_establishment_done 428] Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>).
2022-03-18 13:43:35,766 INFO [on_connection_open 82] Connection opened
2022-03-18 13:43:35,766 INFO [open_channel 116] Creating a new channel
2022-03-18 13:43:35,767 INFO [_report_completion_and_cleanup 293] AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:35,767 INFO [_report_completion_and_cleanup 725] AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f07a6036f40> params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>
2022-03-18 13:43:35,767 INFO [on_channel_open 125] Channel opened
2022-03-18 13:43:35,767 INFO [add_on_channel_close_callback 134] Adding channel close callback
2022-03-18 13:43:35,767 INFO [setup_exchange 157] Declaring exchange my_test_exchange
2022-03-18 13:43:35,768 INFO [on_exchange_declareok 173] Exchange declared: my_test_exchange
2022-03-18 13:43:35,768 INFO [setup_queue 182] Declaring queue my_test_queue
2022-03-18 13:43:35,769 INFO [on_queue_declareok 199] Binding my_test_exchange to my_test_queue with my_test_queue
2022-03-18 13:43:35,769 INFO [on_bindok 211] Queue bound
2022-03-18 13:43:35,769 INFO [start_publishing 218] Issuing consumer related RPC commands
2022-03-18 13:43:35,769 INFO [enable_delivery_confirmations 231] Issuing Confirm.Select RPC command
2022-03-18 13:43:35,769 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:36,771 INFO [publish_message 299] Published message # 1
2022-03-18 13:43:36,771 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:43:36,785 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 1
2022-03-18 13:43:36,785 INFO [on_delivery_confirmation 253] Published 1 messages, 0 have yet to be confirmed, 1 were acked and 0 were nacked
2022-03-18 13:43:37,773 INFO [publish_message 299] Published message # 2---
...
3. Asynchron: Warteschlange löschen
Es passiert nichts, wenn wir die Warteschlange löschen. Das ist nicht gut! Warum wird dies nicht erkannt?
...
2022-03-18 13:46:42,361 INFO [publish_message 299] Published message # 16
2022-03-18 13:46:42,361 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:42,363 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 16
2022-03-18 13:46:42,363 INFO [on_delivery_confirmation 253] Published 16 messages, 0 have yet to be confirmed, 16 were acked and 0 were nacked
2022-03-18 13:46:43,363 INFO [publish_message 299] Published message # 17
2022-03-18 13:46:43,363 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:43,364 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 17
2022-03-18 13:46:43,364 INFO [on_delivery_confirmation 253] Published 17 messages, 0 have yet to be confirmed, 17 were acked and 0 were nacked
2022-03-18 13:46:44,365 INFO [publish_message 299] Published message # 18
2022-03-18 13:46:44,366 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:44,367 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 18
2022-03-18 13:46:44,367 INFO [on_delivery_confirmation 253] Published 18 messages, 0 have yet to be confirmed, 18 were acked and 0 were nacked
2022-03-18 13:46:45,367 INFO [publish_message 299] Published message # 19
2022-03-18 13:46:45,367 INFO [schedule_next_message 262] Scheduling next message for 1.0 seconds
2022-03-18 13:46:45,368 INFO [on_delivery_confirmation 246] Received ack for delivery tag: 19
2022-03-18 13:46:45,368 INFO [on_delivery_confirmation 253] Published 19 messages, 0 have yet to be confirmed, 19 were acked and 0 were nacked
2022-03-18 13:46:46,368 INFO [publish_message 299] Published message # 20
...
Asynchrone Veröffentlichung: basic.ack und basic.nack sind nicht genug
Aus der Dokumentation:
'Wann werden veröffentlichte Nachrichten vom Broker bestätigt?
Bei nicht weiterleitbaren Nachrichten gibt der Broker eine Bestätigung aus, sobald der Exchange verifiziert, dass eine Nachricht an keine Warteschlange weitergeleitet wird (er gibt eine leere Liste von Warteschlangen zurück). Wenn die Nachricht auch als obligatorisch veröffentlicht wird, wird die basic.return vor der basic.ack an den Client gesendet. Dasselbe gilt für negative Bestätigungen (basic.nack).'
Das bedeutet, dass wir das basic.return in unserem Skript erfassen müssen. Ich habe bereits die Zeilen in den obigen Code eingefügt, die sich darum kümmern. Um sie zu aktivieren, ändern Sie diese Zeile:
# # self.add_on_channel_return_callback()
to:
self.add_on_channel_return_callback()
Wenn Sie das Skript erneut ausführen, werden Sie feststellen, dass das Skript nach dem Entfernen der Warteschlange versucht, die Verbindung neu zu starten. Die Methode on_channel_returned() gibt die Nachricht zurück, bevor basic.ack empfangen wird.
Da basic.return vor (!) basic.ack an den Client gesendet wird, wird die Nachricht erneut versucht.
Zusammenfassung
Testen Sie immer alle Bedingungen, bevor Sie den Beispielcode verwenden. Ich habe nur drei Tests durchgeführt, aber Sie können noch viel mehr tun, z. B. prüfen, ob der Austausch und die Warteschlange nach einem Neustart von RabbitMQ noch vorhanden sind. Und prüfen Sie, ob sich die Nachrichten nach einem Neustart von RabbitMQ noch in der Warteschlange befinden.
Außerdem bringt die asynchrone Veröffentlichung ohne Datenverlust, d. h. mit Bestätigungen, eine Menge Komplexität mit sich. Der Durchsatz kann dramatisch ansteigen, aber das hängt auch sehr stark von der Architektur Ihrer Anwendung ab. Brauchen Sie wirklich asynchrones Publizieren?
Links / Impressum
Notify consumer when a queue is deleted on rabbitmq
https://stackoverflow.com/questions/15527226/notify-consumer-when-a-queue-is-deleted-on-rabbitmq
Part 1: RabbitMQ Best Practices
https://www.cloudamqp.com/blog/part1-rabbitmq-best-practice.html
Publishing Throughput - Asynchronous vs Synchronous
https://www.cloudamqp.com/blog/publishing-throughput-asynchronous-vs-synchronous.html
RabbitMQ - AMQP 0-9-1 Complete Reference Guide
https://www.rabbitmq.com/amqp-0-9-1-reference.html
RabbitMQ - Publisher Confirms
https://www.rabbitmq.com/confirms.html
RabbitMQ - Reliability Guide
https://www.rabbitmq.com/reliability.html
Mehr erfahren
Message broker Pika RabbitMQ
Einen Kommentar hinterlassen
Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.
Kommentare (1)
Eine Antwort hinterlassen
Antworten Sie anonym oder melden Sie sich an, um zu antworten.
Exceptional blog - TYVM. I'm new to the pika library and set up a work queue and multiple producers and consumers. It worked just as expected right until the connection to my RabbitMQ server dropped out. All the consumers threw exceptions, but the producers? They happily posted their messages into the ether, never to be seen again.
Which is how I ended up here, in your comments.
The async test code - the default messages and headers are intriguing
Neueste
- Ausblenden der Primärschlüssel der Datenbank UUID Ihrer Webanwendung
- Don't Repeat Yourself (DRY) mit Jinja2
- SQLAlchemy, PostgreSQL, maximale Anzahl von Zeilen pro user
- Anzeige der Werte in den dynamischen Filtern SQLAlchemy
- Sichere Datenübertragung mit Public Key Verschlüsselung und pyNaCl
- rqlite: eine hochverfügbare und distverteilte SQLite -Alternative
Meistgesehen
- Verwendung von Pythons pyOpenSSL zur Überprüfung von SSL-Zertifikaten, die von einem Host heruntergeladen wurden
- Verwendung von UUIDs anstelle von Integer Autoincrement Primary Keys mit SQLAlchemy und MariaDb
- Verbindung zu einem Dienst auf einem Docker -Host von einem Docker -Container aus
- PyInstaller und Cython verwenden, um eine ausführbare Python-Datei zu erstellen
- SQLAlchemy: Verwendung von Cascade Deletes zum Löschen verwandter Objekte
- Flask RESTful API Validierung von Anfrageparametern mit Marshmallow-Schemas