Senden von Nachrichten an Slack mit chat_postMessage
Das Schreiben einer Wrapperklasse ist kein schlechter Weg, um das Innenleben eines Python -Pakets kennenzulernen.
Für ein Projekt habe ich bereits Nachrichten per E-Mail versendet, aber jetzt wollte ich auch Nachrichten an Slack senden. Natürlich verwenden wir das Python Slack SDK.
Die Dokumentation ist auf der Seite zu finden: Python Slack SDK - Web Client. In diesem Beitrag erstelle ich eine einfache SlackAPI Klasse mit ihrer eigenen SlackError Ausnahmeklasse.
Erstellen und konfigurieren Sie eine neue Slack-App
Wir werden unsere Nachrichten an einen Slack-Kanal senden. Zuerst müssen wir eine Slack App erstellen, die Berechtigungen festlegen (Scopes: 'chat:write') und sie zu unserem Slack Channel hinzufügen. Am Ende haben wir die folgenden Informationen:
- Slack-Bot-Token
- Slack-Kanal (Id oder Name)
- Slack User (optional)
Lesen Sie in der Slack-Dokumentation nach, wie das geht.
Slack und Ratenbegrenzung
Slack kann die Rate unserer Nachrichten begrenzen. Wenn wir innerhalb kurzer Zeit zu viele Nachrichten senden, wird eine Warnmeldung angezeigt:
Due to a high volume of activity, we are not displaying some messages sent by this application.
In unserem Code wird ein SlackApiError mit dem Fehler 'ratelimited' ausgelöst. Der status_code ist 429 und 'retry_after' gibt die Sekunden an, in denen wir es erneut versuchen müssen:
try:
...
except SlackApiError as e:
error_message=e.response.get('error', None),
status_code=e.response.status_code,
retry_after=e.response.headers.get('Retry-After', None),
Glücklicherweise verfügt das 'Python Slack SDK' über einen RetryHandler, der die Wiederholungen automatisch für uns durchführt, wenn der status_code 429 zurückgegeben wird. Wenn wir dies zu unserem Code hinzufügen, müssen wir uns nicht selbst um diese Wiederholungen kümmern!
Wichtig: Standardmäßig fügt der RetryHandler einen (!) Wiederholungsversuch hinzu. Das bedeutet, dass wenn Sie den WebClient so initialisieren:
client = WebClient(
timeout=10.
)
client.chat_postMessage(...)
Dann kehrt die chat_postMessage nach dem doppelten angegebenen Timeout zurück: 20 Sekunden. Sie können dies nicht ändern, indem Sie den RetryHandler mit dem Parameter aufrufen:
max_retry_count
Im Moment habe ich noch nicht herausgefunden, wie wir den Timeout für die gesamte Operation festlegen können.
Die SlackError-Ausnahmeklasse
In unserem SlackAPI, wir distinghuish zwischen:
- Permanenten Fehlern
- Temporären Fehlern
Permanente Fehler sind zum Beispiel ein schlechtes oder ungültiges Slack Bot Token, Slack Channel. In diesem Fall sollten wir nicht versuchen, die Nachricht erneut zu senden. Ein Beispiel für einen temporären Fehler ist, wenn die Verbindung zu Slack unterbrochen ist. In diesem Fall müssen wir den Versand der Nachricht nach einiger Zeit wiederholen. Auch bei einer Zeitüberschreitung müssen wir das Senden wiederholen. Beachten Sie, dass wir im Falle einer Zeitüberschreitung eine Nachricht zweimal senden können. Dies geschieht, wenn die Antwort des Servers zur gleichen Zeit wie die Zeitüberschreitung im Client auftritt.
Das Slack SDK hat seine eigene Fehlerklasse, SlackApiError. Einige Fehler sind:
- invalid_auth invalid bot_token
- channel_not_found unknown channel
- no_text text is missing, e.g. text=None
- invalid_arguments e.g. channel=None
- ratelimited when you send too many messages in s short time
Wenn wir den RetryHandler verwenden, siehe oben, tritt der ratelimited error nicht auf. Wir können die Fehlerdaten aus dem Fehlerobjekt wie folgt erhalten:
e.response.status_code
e.response.error
e.response.headers
Es gibt einen weiteren Fehler, der hier auftreten kann. Dies ist der Fall, wenn die URL zum Slack API nicht korrekt ist (nicht gefunden). In diesem Fall wird ein HTTP 404-Fehler ausgegeben. Die zurückgegebenen Fehlerdaten unterscheiden sich von den "normalen" Fehlerdaten. In diesem Fall gibt das Fehlerobjekt ein Wörterbuch(!) zurück:
e.response['status']
e.response['headers']
e.response['body']
Die beiden anderen, temporären Fehler sind:
- connection_error
- timeout_error
Wir erstellen eine SlackError-Ausnahmeklasse, die alle permanenten und temporären Fehler zurückgibt, was bedeutet, dass unsere Anwendung nur die SlackError-Fehler erfassen muss.
Hinzufügen unseres SlackResponse-Objekts
Wir sind nicht wirklich an allen Arten von Daten interessiert, wir wollen nur wissen, was passiert ist und ob wir das Senden wiederholen müssen. Wir können ein Wörterbuch zurückgeben, aber um den Zugriff sauberer zu gestalten, verwenden wir hier ein namedtuple :
ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after')
Wir geben dasselbe SlackResponse-Objekt für beide Fälle zurück, d. h. für den Fall, dass kein Fehler aufgetreten ist oder ein Fehler vorliegt. Im Falle von Fehlern erhalten wir das SlackResponse-Objekt aus e.args[0].
Der Code
Hier ist der Code, wenn Sie dies ausprobieren möchten. Stellen Sie zunächst mindestens die folgenden Parameter ein:
SLACK_BOT_TOKEN
SLACK_CHANNEL
Slack Channel kann die Id oder der Name des Kanals sein. Wenn Sie die Ratenbegrenzungsfunktion beobachten wollen, können Sie diese einstellen:
number_of_messages = 100
Der Code:
import collections
import copy
import datetime
import logging
import os
import sys
import time
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry.builtin_handlers import RateLimitErrorRetryHandler
class SlackError(Exception):
pass
class SlackApi:
def __init__(
self,
config=None,
logger=None,
base_url=None,
slack_bot_token=None,
timeout=None,
use_rate_limit_handler=True
):
self.config = config
self.logger = logger
self.base_url = base_url
self.slack_bot_token = slack_bot_token
self.timeout = timeout or 5.
self.use_rate_limit_handler = use_rate_limit_handler
web_client_params = {
'token': self.slack_bot_token,
'timeout': self.timeout
}
if self.base_url is not None:
web_client_params['base_url'] = self.base_url
self.client = WebClient(**web_client_params)
if use_rate_limit_handler:
# this handler does retries when HTTP status 429 is returned
rate_limit_handler = RateLimitErrorRetryHandler(
max_retry_count=1
)
self.client.retry_handlers.append(rate_limit_handler)
def make_response(self, has_errors=False, error_message=None, status_code=None, retry_after=None):
request_msecs = int((datetime.datetime.now() - self.ts).total_seconds() * 1000)
ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after request_msecs')
return ResponseObj(has_errors, error_message, status_code, retry_after, request_msecs)
def chat_post_message(self, channel, text=None, blocks=None, user=None):
self.ts = datetime.datetime.now()
# change empty string user to None
if user is not None:
user = user.strip()
if user == '':
user = None
try:
response = self.client.chat_postMessage(
channel=channel,
text=text,
blocks=blocks,
user=user,
)
return self.make_response(
status_code=response.status_code,
)
except SlackApiError as e:
# first check for http errors
status = e.response.get('status', None)
self.logger.debug('status = {}'.format(status))
if status is not None:
status_code = status
error_message = 'http_' + str(status)
headers = e.response['headers']
else:
status_code = e.response.get('status_code', None)
error_message = e.response.get('error', None)
headers = e.response.headers
raise SlackError(self.make_response(
has_errors=True,
status_code=status_code,
error_message=error_message,
retry_after=headers.get('Retry-After', None),
))
except Exception as e:
exception = type(e).__name__
if exception == 'TimeoutError':
error_message='timeout_error'
else:
# all other errors are considered connection errors
error_message='connection_error'
raise SlackError(self.make_response(
has_errors=True,
error_message=error_message,
))
# the code below is typically in another file
# Example from:
# Stacking multiple blocks
# https://api.slack.com/messaging/composing/layouts
blocks_example = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "New request"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Type:*\nPaid Time Off"
},
{
"type": "mrkdwn",
"text": "*Created by:*\n<example.com|Fred Enriquez>"
}
]
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*When:*\nAug 10 - Aug 13"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "<https://example.com|View request>"
}
}
]
def get_logger(logger_name=None, logger_level=logging.DEBUG):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# console
console_handler = logging.StreamHandler()
console_logger_format = '%(levelname)-8.8s [%(filename)-15s%(funcName)15s():%(lineno)03s] %(message)s'
console_handler.setFormatter(logging.Formatter(console_logger_format))
console_handler.setLevel(logger_level)
logger.addHandler(console_handler)
return logger
config = {
'SLACK_BOT_TOKEN': '',
'SLACK_CHANNEL': '',
'SLACK_USER': None
}
def main():
logger = get_logger(logger_name='slack_api')
# your bot token and channel
slack_bot_token = config.get('SLACK_BOT_TOKEN')
slack_channel = config.get('SLACK_CHANNEL')
slack_user = config.get('SLACK_USER')
# create 'messages table'
number_of_messages = 100
#number_of_messages = 1
messages = []
for i in range(number_of_messages):
# deepcopy the dictionary here!
blocks = copy.deepcopy(blocks_example)
# patch message number
blocks[0]['text']['text'] = "New request" + ': {:03d}'.format(i)
messages.append({
'slack_bot_token': slack_bot_token,
'channel': slack_channel,
'user': slack_user,
'text': 'message {:03d}: this is a test message'.format(i),
'blocks': blocks,
# delivery status
'is_delivery_completed': False,
'is_delivered': False,
'delivery_tries': 0,
'last_delivery_error': None,
})
# send the messages
for i, message in enumerate(messages):
if message['is_delivered']:
continue
logger.debug('sending message {} ...'.format(i))
slack_api = SlackApi(
logger=logger,
slack_bot_token=message.get('slack_bot_token'),
#timeout=0.1,
)
try:
r = slack_api.chat_post_message(
channel=message.get('channel'),
text=message.get('text'),
blocks=message.get('blocks'),
user=message.get('user'),
)
logger.debug('success sending message[{}]: r = {}'.format(i, r))
# delivery status
message['is_delivery_completed'] = True
message['is_delivered'] = True
message['delivery_tries'] += 1
except SlackError as e:
#logger.exception('error sending message[{}]'.format(i))
r = e.args[0]
logger.debug('problem sending message[{}]: r = {}'.format(i, r))
message['delivery_tries'] += 1
message['last_delivery_error'] = r.error_message
if r.error_message in ['timeout_error', 'connection_error']:
# delivery status, must retry
message['is_delivery_completed'] = False
message['is_delivered'] = False
else:
# delivery status, done
message['is_delivery_completed'] = True
message['is_delivered'] = False
if __name__ == '__main__':
main()
Zusammenfassung
Zuerst müssen wir herausfinden, wie wir eine Slack-App erstellen, ihr die richtigen Berechtigungen geben und sie zu einem Slack-Kanal hinzufügen. Um Nachrichten an Slack zu senden, verwenden wir das Slack SDK. Der RetryHandler kümmert sich um die Ratenbegrenzungsfunktion von Slack. Er macht unseren Code viel kompakter. Wir haben das Slack SDK in unsere eigene SlackApi-Klasse eingepackt. Wir haben auch unsere eigene SlackError-Klasse hinzugefügt, die alle (!) Fehler behandelt, die auftreten können, einschließlich Timeouts und Verbindungsfehler.
Unsere Anwendung kann nun die Nachrichten und Blöcke erstellen und sie mit unserer SlackApi an Slack senden.
Das Schöne daran, eine Wrapper-Klasse zu schreiben, ist, dass wir alle möglichen Parameter und Antworten durchgehen müssen. Das ist ein Weg (und gar nicht so schlecht), um das Innenleben eines Python -Pakets kennenzulernen.
Links / Impressum
Python Slack SDK - RetryHandler
https://slack.dev/python-slack-sdk/web/index.html#retryhandler
Python Slack SDK - Web Client
https://slack.dev/python-slack-sdk/web/index.html
Slack - Package slack_sdk
https://slack.dev/python-slack-sdk/api-docs/slack_sdk
Slack - Rate Limits
https://api.slack.com/docs/rate-limits
Slack API - Python Slack SDK
https://slack.dev/python-slack-sdk
Using the Slack Web API
https://api.slack.com/web
Mehr erfahren
Slack
Neueste
- Ausblenden der Primärschlüssel der Datenbank UUID Ihrer Webanwendung
- Don't Repeat Yourself (DRY) mit Jinja2
- SQLAlchemy, PostgreSQL, maximale Anzahl von Zeilen pro user
- Anzeige der Werte in den dynamischen Filtern SQLAlchemy
- Sichere Datenübertragung mit Public Key Verschlüsselung und pyNaCl
- rqlite: eine hochverfügbare und distverteilte SQLite -Alternative
Meistgesehen
- Verwendung von Pythons pyOpenSSL zur Überprüfung von SSL-Zertifikaten, die von einem Host heruntergeladen wurden
- Verwendung von UUIDs anstelle von Integer Autoincrement Primary Keys mit SQLAlchemy und MariaDb
- Verbindung zu einem Dienst auf einem Docker -Host von einem Docker -Container aus
- PyInstaller und Cython verwenden, um eine ausführbare Python-Datei zu erstellen
- SQLAlchemy: Verwendung von Cascade Deletes zum Löschen verwandter Objekte
- Flask RESTful API Validierung von Anfrageparametern mit Marshmallow-Schemas