Verhinderung des Versands doppelter Nachrichten an ein entferntes System
Die Berücksichtigung aller möglichen Zustände und Bedingungen ist nicht wünschenswert, sondern eine Voraussetzung.
Oftmals müssen Anwendungen Nachrichten an ein entferntes System senden. In einer perfekten Welt hätten wir es nur mit dem Happy Flow zu tun: Es passieren keine schlimmen Dinge wie Ausnahmen oder Fehler.
Leider ist die Welt nicht perfekt. Neben Programmierfehlern können auch Verbindungen ausfallen, Datenbanksysteme können ausfallen, entfernte Systeme können ausfallen. Eine einfache Frage - können Sie etwas Code schreiben, um Nachrichten an ein entferntes System zu senden - kann leicht zu einem sehr komplexen Projekt werden.
In diesem Beitrag zeige ich einen Weg, um zu verhindern, dass doppelte Nachrichten gesendet werden, wenn die Aktualisierung eines Datenbanknachrichtendatensatzes fehlschlägt.
Kompromisse
Wenn Sie ein System entwickeln, das Finanztransaktionen durchführt, dann sind die Anforderungen völlig anders als bei einem System, das Eintrittskarten für ein Konzert per E-Mail verschickt. Ersteres darf niemals ausfallen und muss in der Lage sein, neu zu starten und genau dort fortzufahren, wo es aufgehört hat. Das System, das die Eintrittskarten verschickt, kann ein paar Mal im Monat ein Problem haben, so dass die Verbraucher ihre Eintrittskarten nicht oder zweimal erhalten.
Was ich damit sagen will, ist, dass es immer Abwägungen gibt. Lohnt es sich, viel Zeit in die Entwicklung einer kugelsicheren Anwendung zu investieren, oder nimmt man sporadische Ausfälle in Kauf? Wenn ein solcher sporadischer Fehler auftritt, müssen Sie die Konsequenzen kennen. Ein Fehler, der den Versand einer E-Mail verhindert, ist nicht so schlimm wie ein Fehler, der dazu führt, dass dieselben E-Mails Hunderte oder gar Tausende von Malen verschickt werden.
Unser Zustelldienst
Unser Zustelldienst stellt eine Verbindung zu einer Datenbank mit Nachrichten her, die an ein entferntes System gesendet werden müssen.
+----------+ +----------+ +----------+
| database | | deliver- | | remote |
| message |<-----| service |----->| system |
| records | | | | |
+----------+ +----------+ +----------+
Der Zustelldienst wird bei jedem Tick ausgeführt, zum Beispiel jede Sekunde. Der Nachrichtendatensatz hat ein 'is_delivered'-Flag. Dieses Flag ist zunächst "False" und wird nach dem Versand einer Nachricht auf "True" aktualisiert. Dieses Flag wird auch bei der Auswahl von Nachrichtensätzen aus der Datenbank verwendet.
Schritte:
- Auswahl von Nachrichten aus der Datenbank, die noch nicht zugestellt worden sind.
- Eine Nachricht nach der anderen an das entfernte System zustellen.
- (post-update) Sobald eine Nachricht zugestellt wurde, aktualisieren wir das 'is_delivered'-Flag.
Ich nenne die Aktion in Schritt 3. post-update, weil es sich um eine Aktualisierung nach dem Senden der Nachricht handelt.
Hier ist der Happy Flow Code ohne jegliche Fehlerprüfung und Fehlerbehandlung:
# happy flow code
class RemoteSystem:
def receive_data(self, data):
self.data = data
print('remote system: received data = {}'.format(data))
# database message record
class Message:
def __init__(self, data):
self.data = data
self.is_delivered = False
def __str__(self):
return '<Message: data = {}, is_delivered = {}>'.format(self.data, self.is_delivered)
class DeliverService:
def __init__(self):
# database
self.messages = [Message('data1'), Message('data2')]
# remote system
self.rem_sys = RemoteSystem()
def select_messages_to_deliver(self):
return [message for message in self.messages if not message.is_delivered]
def deliver_message(self, message):
# send to remote system
self.rem_sys.receive_data(message.data)
def deliver_messages(self):
delivered_count = 0
for message in self.select_messages_to_deliver():
# deliver message
self.deliver_message(message)
delivered_count += 1
# update message record
message.is_delivered = True
return delivered_count
def main():
print('\nHAPPY FLOW\n{}'.format('-'*40))
ds = DeliverService()
for tick in range(2):
# debug
print('on tick-{} messages:'.format(tick))
for message in ds.messages:
print('- {}'.format(message))
# deliver_messages
delivered_count = ds.deliver_messages()
# debug
print('on tick-{} messages delivered to remote system: {}'.format(tick, delivered_count))
if __name__ == '__main__':
main()
Und hier ist das Ergebnis nach der Ausführung:
HAPPY FLOW
----------------------------------------
on tick-0 messages:
- <Message: data = data1, is_delivered = False>
- <Message: data = data2, is_delivered = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1 messages:
- <Message: data = data1, is_delivered = True>
- <Message: data = data2, is_delivered = True>
on tick-1 messages delivered to remote system: 0
Die Nachrichten wurden beim ersten Tick zugestellt, beim zweiten Tick wurden keine Nachrichten zugestellt.
Geschafft, fertig! Das war einfach ... in der perfekten Welt ...
Machen Sie eine Liste aller Dinge, die schief gehen können, schauen Sie sich die Auswirkungen an und wie man sie lösen kann
Da unsere Welt nicht perfekt ist, müssen wir alle möglichen Fehler, die auftreten können, in Betracht ziehen. Bei jedem dieser Fehler müssen wir die Folgen betrachten, die dieser Fehler für den Betrieb unseres Dienstes hat.
Es gibt temporäre oder vorübergehende Fehler und permanente Fehler. Ein vorübergehender Fehler ist in der Regel einmalig oder von kurzer Dauer. Wenn ein entferntes System für eine kurze Zeit nicht verfügbar ist, erhalten wir eine Zeitüberschreitung. Ein Beispiel für einen permanenten Fehler ist ein ausgefallener Datenbankserver.
Für unseren Lieferdienst finden Sie hier eine (begrenzte) Liste von Fehlern, die auftreten können, ihre Auswirkungen und wie sie zu beheben sind:
- Datenbank-Selektionsfehler: Nachrichten können nicht aus der Datenbank ausgewählt werden.
Unsere Nachrichten werden nicht zugestellt. Das ist nicht gut, aber das war's. Vielleicht ist unser Datenbankserver vorübergehend ausgefallen, wir können das weiter untersuchen.
Wie man das Problem löst: Wir versuchen es einfach weiter, bis die Datenbank wieder antwortet. - Entferntes System (vorübergehend) nicht verfügbar.
Wieder werden unsere Nachrichten nicht zugestellt. Das ist nicht gut, aber das war's.
Wie man das löst: Wir haben bereits das 'is_delivered'-Flag. Wir aktualisieren dieses Flag nur dann auf 'True', wenn das entfernte System den Empfang unserer Nachricht bestätigt. - Datenbank-Post-Update-Fehler: Ein Nachrichtendatensatz konnte nach (!) der Zustellung nicht aktualisiert werden.
In diesem Fall wird das 'is_delivered'-Flag nicht aktualisiert. Unsere Nachricht wurde an das entfernte System zugestellt oder nicht, aber dieser Zustand wurde im Nachrichtendatensatz nicht vermerkt. Die Folge ist, dass die Nachricht ausgewählt und zugestellt wird, wenn sie angekreuzt wird usw. Das entfernte System wird unsere Nachricht mehrfach erhalten. Schrecklich!
Die Lösung: Siehe nächster Abschnitt.
Verhindern, dass eine Nachricht bei einem Post-Update-Fehler mehrfach gesendet wird
Der einfachste Weg ist, unseren Zustelldienst komplett anzuhalten (abzubrechen). Dann prüfen wir, was falsch ist und starten neu, nachdem wir den Post-Update-Fehler behoben haben. Wir müssen das 'is_delivered'-Flag der letzten Nachricht manuell setzen.
Um mehr Kontrolle zu erhalten, fügen wir ein 'delivery_in_progress'-Flag zum Nachrichtendatensatz hinzu. Wir setzen dieses Kennzeichen auf 'True', bevor (!) wir die Nachricht an das entfernte System senden. Wir setzen dieses Kennzeichen auf 'False' zurück, wenn wir den Nachrichtendatensatz aktualisieren, um das 'is_delivered'-Kennzeichen zu setzen. Bei der Auswahl von Nachrichten für die Zustellung wird nun auch das 'delivery_in_progress'-Flag gesetzt.
Die neuen Schritte:
- Auswahl von Nachrichten aus der Datenbank, die noch nicht zugestellt wurden und deren Zustellung noch nicht begonnen hat.
- (vor dem Update) Setzen Sie den Nachrichtendatensatz 'delivery_in_progress'-Flag auf 'True'.
- Stellen Sie eine Nachricht nach der anderen an das entfernte System zu.
- (post-update) Sobald eine Nachricht zugestellt wurde, aktualisieren wir das 'is_delivered'-Flag auf 'True' und das 'delivery_in_progress'-Flag auf 'False'.
Wenn nun die Post-Update-Aktion fehlschlägt, wird die Nachricht nicht gesendet, aber der Vorgang wird nicht blockiert. Neue Nachrichten werden weiterhin versendet.
Auswählen von Nachrichten, vorher:
def select_messages(self):
return [message for message in self.messages if not message.is_delivered]
Wird:
def select_messages(self):
return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]
Diese Methode ist nicht kostenlos, da sie einen zusätzlichen Aktualisierungsbefehl erfordert.
Falls Sie es ausprobieren wollen, hier ist der Code. Um den Code zu testen, fügte ich dem Zustelldienst die folgenden 'Schalter hinzu:
- inject_post_update_error': um einen Post-Update-Fehler zu simulieren
- use_delivery_in_progress': um das 'delivery_in_progress'-Flag im Nachrichtensatz zu verwenden
Wir führen zwei 'Ticks' (Durchläufe) für jede der folgenden Bedingungen durch:
- Glücklicher Fluss
- Fehler bei der Nachverbuchung => falsch
- Nachverbuchungsfehler + 'delivery_in_progress'-Kennzeichen => richtig
# prevent duplicate messages caused by post-update errors
class SendError(Exception):
pass
class PostUpdateError(Exception):
pass
class RemoteSystem:
def send_data(self, data):
print('remote system: received data = {}'.format(data))
# database message record
class Message:
def __init__(self, data):
self.data = data
self.is_delivered = False
self.delivery_in_progress = False
def __str__(self):
return '<Message: data = {}, is_delivered = {}, delivery_in_progress = {}>'.format(self.data, self.is_delivered, self.delivery_in_progress)
class DeliverService:
def __init__(self, inject_post_update_error, use_delivery_in_progress):
# switch to inject update_failure
self.inject_post_update_error = inject_post_update_error
# switch to prevent sending duplicates
self.use_delivery_in_progress = use_delivery_in_progress
# database
self.messages = [Message('data1'), Message('data2')]
# remote system
self.rem_sys = RemoteSystem()
def select_messages(self):
if self.use_delivery_in_progress:
return [message for message in self.messages if not message.is_delivered and not message.delivery_in_progress]
else:
return [message for message in self.messages if not message.is_delivered]
def deliver_message(self, message):
try:
self.rem_sys.send_data(message.data)
return True
except Exception as e:
raise SendError()
def deliver_messages(self):
tick_delivered_count = 0
tick_send_error_count = 0
tick_post_update_error_count = 0
for message in self.select_messages():
# pre-update message record
if self.use_delivery_in_progress:
message.delivery_in_progress = True
# deliver message
try:
self.deliver_message(message)
tick_delivered_count += 1
except SendError:
tick_send_error_count += 1
# post-update message record
try:
if self.inject_post_update_error:
raise PostUpdateError()
message.is_delivered = True
if self.use_delivery_in_progress:
message.delivery_in_progress = False
except PostUpdateError:
tick_post_update_error_count += 1
return (tick_delivered_count, tick_send_error_count, tick_post_update_error_count)
def main():
tests = [
{
'title': 'HAPPY FLOW => CORRECT',
'inject_post_update_error': False,
'use_delivery_in_progress': False,
},
{
'title': 'POST-UPDATE FAILURE => WRONG',
'inject_post_update_error': True,
'use_delivery_in_progress': False,
},
{
'title': 'POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT',
'inject_post_update_error': True,
'use_delivery_in_progress': True,
},
]
for test in tests:
print('\n{}\n{}'.format(test['title'], '-'*40))
ds = DeliverService(
inject_post_update_error=test['inject_post_update_error'],
use_delivery_in_progress=test['use_delivery_in_progress'],
)
for tick in range(2):
# debug
print('on tick-{}, messages:'.format(tick))
for message in ds.messages:
print('- {}'.format(message))
# deliver messages
counts = ds.deliver_messages()
# debug
print('on tick-{} messages delivered to remote system: {}'.format(tick, counts[0]))
if test['inject_post_update_error']:
print('on tick-{} post-update error count: {}'.format(tick, counts[2]))
if __name__ == '__main__':
main()
Und das Ergebnis nach der Ausführung:
HAPPY FLOW => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = True, delivery_in_progress = False>
- <Message: data = data2, is_delivered = True, delivery_in_progress = False>
on tick-1 messages delivered to remote system: 0
POST-UPDATE FAILURE => WRONG
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-1 messages delivered to remote system: 2
on tick-1 post-update error count: 2
POST-UPDATE FAILURE + DELIVERY_IN_PROGRESS => CORRECT
----------------------------------------
on tick-0, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = False>
- <Message: data = data2, is_delivered = False, delivery_in_progress = False>
remote system: received data = data1
remote system: received data = data2
on tick-0 messages delivered to remote system: 2
on tick-0 post-update error count: 2
on tick-1, messages:
- <Message: data = data1, is_delivered = False, delivery_in_progress = True>
- <Message: data = data2, is_delivered = False, delivery_in_progress = True>
on tick-1 messages delivered to remote system: 0
on tick-1 post-update error count: 0
Was genau haben wir gelöst?
Was wir gelöst haben, ist das mehrmalige Senden der gleichen Nachricht im Falle eines Post-Update-Fehlers. Unser Dienst versucht es weiter, das ist gut. Wenn eine Post-Update-Aktion ein- oder zweimal oder für eine kurze Zeit fehlschlägt, werden die Nachrichten nicht mehrfach gesendet, und unser Dienst läuft weiter, er wird nicht abgebrochen. Das ist eine große Erleichterung.
Der Nachteil ist, dass wir immer noch nicht wissen, ob die Nachrichten tatsächlich gesendet wurden, als der Post-Update-Fehler auftrat. Eine Möglichkeit, dies zu umgehen, besteht darin, die Antwort des entfernten Systems zusammen mit der Record-ID der Nachricht in einem anderen Datenspeicher (mit minimalen Abhängigkeiten) zu speichern.
Idempotente Operationen
Wikipedia: Die Eigenschaft bestimmter Operationen in der Mathematik und Informatik, dass sie mehrfach angewendet werden können, ohne dass sich das Ergebnis über die erste Anwendung hinaus ändert.
Eine gute Möglichkeit ist, ein entferntes System (oder einen Dienst) zu haben, das auf doppelte Nachrichten prüfen kann, indem es ein bestimmtes Feld in der Nachricht überprüft. Dann fügen wir einfach eine eindeutige ID zu jeder Nachricht hinzu, die wir erstellen. Das entfernte System verwirft automatisch doppelte Nachrichten, die es erhält.
Zusammenfassung
Die Berücksichtigung aller möglichen Fehler ist eine der wichtigsten Aufgaben beim Entwurf einer Softwareanwendung. Sie steht in völligem Widerspruch zu dem Druck auf Designer und Programmierer, eine funktionierende Anwendung so schnell wie möglich zu liefern. Viele Programmierer wissen, dass ihr Code einige Schwachstellen aufweist, aber aus Zeitgründen ist es oft unmöglich, alle zu beheben, bevor eine Anwendung in Produktion geht.
Im obigen Code wurde nur die schwerwiegendste Folge - das wiederholte Senden derselben Nachricht - berücksichtigt. Dies wurde durch Hinzufügen eines 'delivery_in_progress'-Flags gelöst. Viele andere Probleme müssen ebenfalls gelöst werden, wie z. B.:
- Wie viele Wiederholungsversuche, bevor das Senden von Nachrichten an das entfernte System aufgegeben wird
- Wie können Auswahlfehler, Zustellungsfehler, Aktualisierungsfehler usw. überwacht werden?
- Wie kann man sich von Fehlern erholen?
- Darf sich die Reihenfolge der Nachrichten beim Versenden ändern?
Schließlich werden die meisten Softwareprojekte nie abgeschlossen, sondern nur freigegeben.
Links / Impressum
[rabbitmq-discuss] Exactly Once Delivery
https://groups.google.com/g/rabbitmq-discuss/c/eniPTe1aKvk
Handling Duplicate Messages (Idempotent Consumers)
https://codeopinion.com/handling-duplicate-messages-idempotent-consumers
Mehr erfahren
API Exceptions Internet
Neueste
- Ausblenden der Primärschlüssel der Datenbank UUID Ihrer Webanwendung
- Don't Repeat Yourself (DRY) mit Jinja2
- SQLAlchemy, PostgreSQL, maximale Anzahl von Zeilen pro user
- Anzeige der Werte in den dynamischen Filtern SQLAlchemy
- Sichere Datenübertragung mit Public Key Verschlüsselung und pyNaCl
- rqlite: eine hochverfügbare und distverteilte SQLite -Alternative
Meistgesehen
- Verwendung von Pythons pyOpenSSL zur Überprüfung von SSL-Zertifikaten, die von einem Host heruntergeladen wurden
- Verwendung von UUIDs anstelle von Integer Autoincrement Primary Keys mit SQLAlchemy und MariaDb
- Verbindung zu einem Dienst auf einem Docker -Host von einem Docker -Container aus
- PyInstaller und Cython verwenden, um eine ausführbare Python-Datei zu erstellen
- SQLAlchemy: Verwendung von Cascade Deletes zum Löschen verwandter Objekte
- Flask RESTful API Validierung von Anfrageparametern mit Marshmallow-Schemas