angle-uparrow-clockwisearrow-counterclockwisearrow-down-uparrow-leftatcalendarcard-listchatcheckenvelopefolderhouseinfo-circlepencilpeoplepersonperson-fillperson-plusphoneplusquestion-circlesearchtagtrashx

Celery, Redis und das (un)berühmte E-Mail-Aufgabenbeispiel

Mit Celery zu beginnen ist einfach. Aber bevor Sie Celery in die Produktion bringen, müssen Sie viel Zeit aufwenden, um es wirklich zu verstehen.

10 Oktober 2020
In Celery
post main image
https://unsplash.com/@designnnco

Die Möglichkeit, asynchrone Aufgaben von Ihrer Webanwendung aus auszuführen, ist in vielen Fällen ein Muss. Eine Möglichkeit, dies zu erreichen, ist die Verwendung von Celery. Es gibt viele Artikel im Internet und es werden einige Beispiele gegeben. Die Beiträge zu Miguel Grinberg über Celery haben mir sehr gefallen. Sehr klar, vielen Dank Miguel.

Aber der Beitrag von Miguel und die meisten anderen Artikel sind nur eine Einführung. Wenn Sie mit Celery beginnen, stellen Sie plötzlich fest, dass es nicht einfach ist. Tatsächlich sollten Sie sich besser auf eine Menge Lernen vorbereiten, bevor Sie es in der Produktion verwenden.

In diesem Beitrag schreibe ich einige Notizen, die bei der Implementierung von Celery mit Flask und Redis aufgetaucht sind. Der größte Teil davon ist eine Zusammenfassung der Dokumente, die auf der Website Celery und anderen Stellen im Internet zu finden sind.

Viele andere haben über Probleme geschrieben und wie sie diese gelöst haben

Jemand schrieb, dass bei der Ausführung von Celery in der Produktion Ihre erste Priorität darin besteht, einen Plan zu haben, was zu tun ist, wenn Ihre Aufgaben plötzlich nicht mehr funktionieren. Ich möchte hinzufügen, dass dies bedeutet, dass Sie zunächst wissen müssen, wie Sie überprüfen können, ob Ihr Broker, Ihre Mitarbeiter und Aufgaben laufen.

Suchen Sie im Internet nach 'Celery Problem' und Sie erhalten eine überwältigende Menge an Seiten. Viele dieser Probleme sind bereits in der Dokumentation zu Celery , TL;DR. Eine Reihe von Problemen beim E-Mail-Versand und deren Lösung werden in einem netten Beitrag 'Arbeiten mit asynchronen Celery -Aufgaben - Lektionen gelernt' behandelt, siehe die Links unten. Ich schlage auch vor, dass Sie sich die Website distributedpython ansehen, siehe die Links unten. Hier gibt es eine Menge guter Rezepte.

Was soll das ganze Gerede über acks_late und Wiederholungsversuche?

Standardmäßig wird Celery Aufgaben sofort bestätigen. Das bedeutet, dass eine Aufgabe aus der Warteschlange des Brokers entfernt wird, kurz bevor sie gestartet wird. Sie können dieses Verhalten durch die Angabe von acks_late pro Aufgabe oder systemweit ändern. In diesem Fall quittiert Celery eine Aufgabe erst nach erfolgreichem Abschluss.

Warum ist dies wichtig?

Wenn eine Aufgabe abrupt unterbrochen wird, dann wird Celery die Aufgabe standardmäßig nicht erneut versuchen. Wenn acks_late angegeben wurde, befindet sich die Aufgabe noch in der Warteschlange, und Celery wird die Aufgabe erneut versuchen. Im ersten Fall können Sie die Aufgabe trotzdem erneut versuchen, indem Sie Wiederholungsparameter wie retry_kwargs angeben.

Was ist also der Unterschied?

Das bedeutet, dass acks_late die Zustellung von höchstens einmal auf mindestens einmal ändert.

Aber acks_late funktioniert möglicherweise nicht wie erwartet.

Hier müssen wir zwischen mehreren Bedingungen unterscheiden:

  • Anmutig
    Der Verwalter stoppt den Arbeiter mit dem Signal SIGTERM : Töte <pid>.
    Aufgabe not acked
  • Abrupt
    Der Administrator tötet den Arbeiter mit dem Signal SIGKILL : CTRL-C oder unter Verwendung von kill -9 <pid>,
    oder,
    ein Systemabsturz (kein Speicher, Hardwarefehler) beendet den Arbeitsprozess.
    - Wenn der übergeordnete Prozess läuft:
    task acked
    - Wenn der übergeordnete Prozess nicht läuft, z.B. Stromausfall):
    task not acked (wahrscheinlich): task acked (wahrscheinlich)

Im ersten Fall wird der Worker-Prozess anmutig gestoppt und eine laufende Aufgabe, die gestoppt wird, ist not acked. Dies liegt daran, dass der Administrator die Aufgabenausführung (vorübergehend) stoppen wollte. Durch erneutes Starten des Workers wird die Aufgabe erneut gestartet.

Im zweiten Fall, in dem der Elternprozess läuft, sieht der Elternprozess, dass der Mitarbeiter verloren gegangen ist, markiert den Task und setzt den Status auf FAILURE. Für den Fall, dass der Administrator den Arbeiter getötet hat, gab es wahrscheinlich einen guten Grund, dies zu tun. Im Falle eines Systemabsturzes wollen wir wahrscheinlich nicht, dass der Task neu gestartet wird. Wenn der Elternprozess nicht läuft, z.B. weil er plötzlich gestorben ist, dann wird der Task wahrscheinlich nicht get acked.

Idempotenz

Unsere Aufgaben können laufen unerwartetmehrfach müssen wir dafür sorgen, dass sich das Ergebnis nicht verändert. Wenn z.B. eine Aufgabe bereits eine E-Mail gesendet hat, dann sollte die erneute Ausführung derselben Aufgabe diese E-Mail nicht noch einmal senden. Dies wird als Idempotenz bezeichnet. Führen Sie die Aufgabe mehrmals aus, ohne das Ergebnis über die ursprüngliche Anwendung hinaus zu verändern (Wikipedia).

Acks_late versus Wiederholungsversuch

Angenommen, eine Aufgabe besteht darin, eine E-Mail an einen externen Dienst zu senden, der vorübergehend außer Betrieb ist. Die Aufgabe wird versuchen, eine Verbindung herzustellen, aber die Verbindung wird zeitweise unterbrochen. Wenn wir das Standardverhalten verwenden, wird Celery die Aufgabe als fehlgeschlagen markieren. Sie wird nicht erneut ausgeführt.

Aber warten Sie. Ohne acks_late können wir auch Celery anweisen, den Task erneut auszuführen. Was sollten Sie verwenden? Kommt darauf an. Lesen Sie die Dokumentation auf der Website Celery 'Should I use retry or acks_late?'.

Wie kann sichergestellt werden, dass eine E-Mail nur einmal gesendet wird?

Unter der Annahme, dass ein und dieselbe Aufgabe mehrmals ausgeführt werden kann, können wir nur verhindern, dass eine E-Mail zweimal oder öfter gesendet wird, indem wir ein globales Flag für die Aufgaben außerhalb der Aufgabe haben. Ich habe bereits eine Tabelle send_email_table erstellt, die Datensätze mit (Meta-)Daten für jede E-Mail enthält. Wir fügen einfach eine neue (boolesche) Spalte EmailHasBeenSent hinzu. Das bedeutet auch, dass wir vor (!) dem Aufruf der Aufgabe einen neuen Datensatz send_email_table einfügen müssen. Ohne Verwendung von Aufgaben sah meine E-Mail-Sendefunktion so aus:

class EmailSender:

    def send(params):
        # basic checks and insert the email parameters in a new  send_email_table  record
        preprocess(...)
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

Nach Änderungen zur Verwendung einer Aufgabe zum Senden einer E-Mail:

class EmailSender:

    def sender_init_function(params):
        # basic checks and insert the email parameters in a new  send_email_table  record
        preprocess(...)
        return  send_email_table_record_id

    def sender_send(send_email_table_record_id):
        # perform checks if we can send the email using an allow list, deny list, etc. 
        block_check(...)
        # construct actual email message to send
        prepare(...)
        # send the email to smtp provider
        send_email()

Und der Code zum Senden einer E-Mail:

def send_email(**kwargs):

    email_sender = EmailSender()
     send_email_table_record_id = email_sender.sender_init(**kwargs)

    # pass to task using apply_async
	kwargs  = {}
	kwargs['send_email_table_record_id'] =  send_email_table_record_id
    task_email_sender_sender_send.apply_async(queue=celery_task_queue,  kwargs=kwargs)


@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

    email_sender = EmailSender()
    id = email_sender.sender_send(kwargs['send_email_table_record_id'])

Die Methode sender_send() prüft das Flag EmailHasBeenSent in dem Datensatz, der unter Verwendung der ID abgerufen werden kann. Wenn True, wurde die E-Mail bereits gesendet, und die Aufgabe kehrt zurück, ohne etwas zu tun. Andernfalls sendet die Aufgabe die E-Mail und setzt nach dem Senden der E-Mail dieses Flag auf True.

Wie sieht es mit Wiederholungsversuchen aus?

Nehmen wir an, dass unser Provider SMTP für einige Zeit nicht verfügbar ist. Celery macht es sehr einfach, eine Aufgabe im Falle einer Ausnahme erneut zu versuchen. Wir können die Ausnahme selbst auslösen und abfangen oder den einfacheren Parameter autoretry_for verwenden. Von der Website Celery :

@celery.task(bind=True,  autoretry_for=(Exception,),  retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):

    if error_sending_email:
        raise Exception(f'error sending email')
    # done

Für Wiederholungsversuche gibt es ein paar Parameter, die Sie verwenden können. Meine Einstellung (während der Entwicklung):

retry_backoff  = 2
retry_backoff_max = 10 * 60
retry_kwargs  = {'max_retries': 5}

retry_backoff bedeutet, dass die erste Wiederholung um 2 Sekunden verzögert wird, die zweite Wiederholung um 4 Sekunden, die nächsten Wiederholungen um 8, 16, 32 usw. Sekunden. retry_backoff_max gibt die maximale Verzögerung zwischen den Aufgabenwiederholungen an, in diesem Fall eine Stunde.
max_retries ist die maximale Anzahl von Wiederholungsversuchen.

Redis und der visibility_timeout

Unsere E-Mail-Aufgabe läuft nur eine kurze Zeit, vielleicht 10 Sekunden, auch wenn der SMTP -Provider nicht erreichbar ist. Aber was passiert, wenn wir einen Task haben, der sehr lange läuft? Von der Website Celery : Das Sichtbarkeits-Timeout definiert die Anzahl der Sekunden, die gewartet werden muss, bis der Mitarbeiter die Aufgabe bestätigt, bevor die Nachricht erneut an einen anderen Mitarbeiter zugestellt wird. Der Standardwert der Website visibility_timeout beträgt eine Stunde. Dies kann zu einem Problem werden, wenn Ihre lang laufende Aufgabe acks_late verwendet. Wenn Ihre Aufgabe länger läuft als die visibility_timeout, wird sie erneut geliefert und Sie haben plötzlich zwei Aufgaben, die dasselbe tun. Sie können den visibility_timeout erhöhen, aber es ist besser, die Aufgabenausführung atomar zu gestalten, zum Beispiel:

@celery.task(bind=True, name='celery_tasks.task_email_sender_sender_send')
def task_email_sender_sender_send(self, **kwargs):

	if get_lock('task_email_sender_sender_send'):
		# we were able to lock the task
	
		email_sender = EmailSender()
		id = email_sender.sender_send(kwargs['id'])
		...
		release_lock()

	else:
		# we could not lock the task
		# retry

Es sieht so aus, als ob dies nur dann ein Problem sein kann, wenn wir acks_late für eine Aufgabe verwenden. Für die E-Mail-Aufgabe verwende ich nicht acks_late.

Setzen Sie eine Zeitüberschreitung für die Aufgabe

Alles kann schief gehen, und vielleicht kehren eine oder mehrere Aufgaben plötzlich nicht mehr zurück. Dadurch werden alle anderen Aufgaben blockiert oder verlangsamt. Um dies zu vermeiden, können wir Zeitüberschreitungen für eine Aufgabe festlegen. Wenn task_soft_time_limit abläuft, wird eine Ausnahme SoftTimeLimitExceeded ausgelöst. Wenn die Ausnahme task_time_limit abläuft, wird die Aufgabe abgebrochen und durch eine neue ersetzt.

@celery.task(bind=True,  task_time_limit=150,  task_soft_time_limit=120, ...)
def task_email_sender_sender_send(self, **kwargs):

    try:
        a_long_time_to_finish()
    except  SoftTimeLimitExceeded:
        recover()

Ratenbegrenzung, Überlastung des Anbieters SMTP verhindern

Angenommen, der Provider SMTP ist für längere Zeit nicht erreichbar oder Ihr System ist abgestürzt und es dauert ein paar Stunden, um das Problem zu beheben. Nachdem das Problem behoben ist, werden Ihre E-Mails mit maximaler Geschwindigkeit verschickt. Dies kann die Grenzen Ihres SMTP -Providers überschreiten. Glücklicherweise können wir für eine Aufgabe eine rate_limit angeben, z.B. eine Begrenzung auf 50 Aufgaben pro Minute:

@celery.task(bind=True,  autoretry_for=(Exception,), rate_limit='50/m',  retry_kwargs={'max_retries': 5}, ...)
def task_email_sender_sender_send(self, **kwargs):
    ....

Hierbei handelt es sich um einen Höchstsatz pro Mitarbeiter und nicht um einen globalen Höchstsatz. Wenn Sie zwei Mitarbeiter haben, die E-Mails versenden, können Sie das rate_limit auf '25/m' setzen.

Der Zeitstempel, zu dem die E-Mail ursprünglich übermittelt wurde, kann wichtig sein.

In der Funktion block_check(), siehe oben, wird jede Empfänger-Mail-Adresse gegen eine Zulässigkeits- und eine Verweigerungsliste geprüft. Es gibt aber noch weitere Prüfungen. Zum Beispiel ist die Anzahl der an einen Empfänger gesendeten Nachrichten auf X pro Y Minuten begrenzt. Wenn irgendwo ein Problem auftritt, z.B. der Arbeiter abgestürzt ist und dann nach einigen Stunden neu gestartet wird, werden alle E-Mails in der Warteschlange nacheinander gesendet. Würden wir den Zeitstempel des Zeitpunktes des Versands verwenden, besteht die Möglichkeit, dass eine E-Mail blockiert wird.

Was wir tun sollten, ist, den Zeitstempel des Zeitpunkts zu verwenden, an dem die Nachricht in die Aufgabenwarteschlange gestellt wurde. Glücklicherweise haben alle meine Datenbankeinträge einen Zeitstempel created_on , und wir wissen bereits, dass wir einen send_mail_table-Eintrag erstellt haben, bevor (!) die Nachricht an die Aufgabenwarteschlange übergeben wurde. In der Task können wir diesen created_on -Zeitstempel verwenden, um auf Zeitlimits zu prüfen.

Hinzufügen von Anhängen

Wir können unsere E-Mails versenden, aber was ist mit Anhängen? Wenn Sie Ihre Anwendung und Ihre Mitarbeiter auf demselben Server laufen lassen, dann gibt es kein Problem, Sie geben einfach die Links oder Pfade der Dateien weiter. Aber was ist, wenn Sie Ihre Webanwendung auf Server X und Ihre Mitarbeiter auf Server Y laufen lassen? Eine Möglichkeit besteht darin, ein gemeinsam genutztes Anlagenvolumen zu verwenden, das Sie irgendwo erstellt haben. Eine andere Möglichkeit besteht darin, die Anlagendateien in den Datensätzen send_email_table zu speichern. Dies ist in den Zyklen und im Speicher von CPU teuer, hat aber den Vorteil, dass sich alle Daten in der Datenbank befinden und der Arbeiter immer Zugriff auf die Anlagen hat.

Können E-Mails verloren gehen?

Wie in den Links unten erwähnt: 'Bewahren Sie wichtige Daten nicht in Ihrer Celery Warteschlange auf'. Glücklicherweise füge ich bereits einen send_mail_table-Eintrag mit den E-Mail-Parametern ein, bevor (!) ich die Aufgabe aufrufe. In diesem Datensatz gibt es auch ein Flag EmailHasBeenSent . Das bedeutet, dass es immer möglich ist, alle E-Mails, die aus irgendeinem Grund nicht gesendet wurden, erneut zu senden.

Setzen Sie die globalen Einstellungen Celery

Wir können die Parameter Celery mit dem Task-Dekorator einstellen, aber es kann eine gute Idee sein, Standardwerte für alle Ihre Aufgaben festzulegen.

    TaskBase = celery.Task
    class ContextTask(TaskBase):

        time_limit = 40
        soft_time_limit = 30
         autoretry_for  = (Exception,  SoftTimeLimitExceeded, TimeLimitExceeded)
         retry_backoff  = 2
         retry_backoff_max = 10 * 60
         retry_kwargs  = {'max_retries': 5}
        retry_jitter = False

Zusammenfassung

Vor einiger Zeit dachte ich, fügen wir doch Celery zu meiner Bewerbung hinzu. Eine Woche, vielleicht zwei, sollte möglich sein. Ja, in dieser Zeit hat es funktioniert, aber es gab noch so viel zu lernen. Die E-Mail-Beispiele im Internet sind nur eine Einführung. Die Verwendung von Celery fühlt sich an wie das Erlernen einer anderen Sprache. Ich glaube, ich verstehe es jetzt. Am Anfang gibt es so viele Parameter, Bedingungen. Nicht lesen hilft nicht. Viele Probleme, die im Internet erwähnt werden, stammen von Leuten, die die meist ausgezeichnete Dokumentation auf der Website Celery nicht gelesen haben. Ja, es gibt einige Ungereimtheiten, aber sie funktionieren. Wie dem auch sei, Celery steht und läuft und ich lerne, bessere Aufgaben zu machen.

Links / Impressum

acks_late option seems to have no effect
https://celery-users.narkive.com/Cv8Juuxs/acks-late-option-seems-to-have-no-effect

Celery task exceptions and automatic retries
https://www.distributedpython.com/2018/09/04/error-handling-retry/

Celery throttling - setting rate limit for queues
https://alievmagomed.com/celery-throttling-setting-rate-limit-for-queues/

Custom Celery task states
https://www.distributedpython.com/2018/09/28/celery-task-states/

Distributed Task Locking in Celery
http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html

distributedpython
https://www.distributedpython.com/

Don't keep important data in your Celery queue
https://www.caktusgroup.com/blog/2016/10/18/dont-keep-important-data-your-celery-queue/

Flask Celery task locking
https://stackoverflow.com/questions/53950548/flask-celery-task-locking

Should I use retry or acks_late?
https://docs.celeryproject.org/en/latest/faq.html#should-i-use-retry-or-acks-late

Working with Asynchronous Celery Tasks – lessons learned
https://blog.daftcode.pl/working-with-asynchronous-celery-tasks-lessons-learned-32bb7495586b

Einen Kommentar hinterlassen

Kommentieren Sie anonym oder melden Sie sich zum Kommentieren an.

Kommentare

Eine Antwort hinterlassen

Antworten Sie anonym oder melden Sie sich an, um zu antworten.