Berichten naar Slack sturen met chat_postMessage
Het schrijven van een wrapper class is geen slechte manier om de ins-en-outs van een Python pakket te leren.
Voor een project stuurde ik al berichten per e-mail, maar nu wilde ik ook berichten naar Slack sturen. Natuurlijk gebruiken we de Python Slack SDK.
De documentatie is te vinden op de pagina: Python Slack SDK - Web Client. In deze post maak ik een eenvoudige SlackAPI klasse met een eigen SlackError-uitzonderingsklasse.
Maak en configureer een nieuwe Slack App
We zullen onze berichten naar een Slack Kanaal sturen. Eerst moeten we een Slack App aanmaken, rechten instellen (Scopes: 'chat:write'), en deze toevoegen aan ons Slack Channel. Aan het eind hebben we de volgende informatie:
- Slack Bot Token
- Slack Kanaal (Id of naam)
- Slack User (optioneel)
Raadpleeg de Slack documentatie over hoe dit te doen.
Slack en snelheidsbeperking
Slack kan de snelheid van onze berichten beperken. Als we te veel berichten versturen binnen een korte tijd, zal het het waarschuwingsbericht tonen:
Due to a high volume of activity, we are not displaying some messages sent by this application.
In onze code wordt een SlackApiError met error 'ratelimited' opgeworpen. De status_code wordt 429 en 'retry_after' geeft de seconden aan wanneer we het opnieuw moeten proberen:
try:
...
except SlackApiError as e:
error_message=e.response.get('error', None),
status_code=e.response.status_code,
retry_after=e.response.headers.get('Retry-After', None),
Gelukkig komt de 'Python Slack SDK' met een RetryHandler die de retries automatisch voor ons doet wanneer status_code 429 wordt geretourneerd. Door dit aan onze code toe te voegen, hoeven we deze retries niet zelf af te handelen!
Belangrijk: De RetryHandler voegt standaard (!) één retry toe. Dit betekent dat als je de WebClient zo initialiseert:
client = WebClient(
timeout=10.
)
client.chat_postMessage(...)
Dan zal de chat_postMessage terugkeren na twee keer de opgegeven timeout: 20 seconden. Je kunt dit niet veranderen door de RetryHandler aan te roepen met de parameter:
max_retry_count
Op dit moment heb ik niet uitgezocht hoe we de timeout voor de hele operatie kunnen maken.
De SlackError uitzondering klasse
In onze SlackAPI zitten we distinghuish tussen:
- Permanente fouten
- Tijdelijke fouten
Permanente fouten zijn bijvoorbeeld een slecht of ongeldig Slack Bot Token, Slack Channel. In dit geval moeten we niet opnieuw proberen dit bericht te verzenden. Een voorbeeld van een tijdelijke fout is wanneer de verbinding met Slack wegvalt. In dit geval moeten we dit bericht na enige tijd opnieuw proberen te versturen. We moeten ook opnieuw proberen te verzenden als er een time-out optreedt. Merk op dat we in geval van een time-out een bericht twee keer kunnen versturen. Dit gebeurt wanneer het antwoord van de server op hetzelfde moment komt als de time-out in de client.
De Slack SDK heeft zijn eigen foutklasse, SlackApiError. Enkele fouten zijn:
- invalid_auth invalid bot_token
- channel_not_found unknown channel
- no_text text is missing, e.g. text=None
- invalid_arguments e.g. channel=None
- ratelimited when you send too many messages in s short time
Als we de RetryHandler gebruiken, zie hierboven, treedt de ratelimited error niet op. We kunnen de foutgegevens zo uit het error object halen:
e.response.status_code
e.response.error
e.response.headers
Er is nog een andere fout die hier kan optreden. Dit is wanneer de URL naar de Slack API niet correct is (niet gevonden). Dit zal een HTTP 404-fout opleveren. De geretourneerde foutgegevens zijn anders dan de 'normale' foutgegevens. In dit geval retourneert het foutobject een woordenboek(!):
e.response['status']
e.response['headers']
e.response['body']
De twee andere, tijdelijke fouten, zijn:
- connection_error
- timeout_error
We maken een SlackError uitzondering klasse die alle permanente en tijdelijke fouten teruggeeft, wat betekent dat onze applicatie alleen SlackError fouten hoeft vast te leggen.
Het toevoegen van ons SlackResponse object
We zijn niet echt geïnteresseerd in allerlei gegevens, we willen alleen weten wat er is gebeurd en of we opnieuw moeten proberen te verzenden. We kunnen een woordenboek retourneren, maar om de toegang zuiverder te maken, gebruiken we hier een namedtuple :
ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after')
We retourneren hetzelfde SlackResponse object voor beide gevallen geen fouten of fouten. In geval van fouten halen we het SlackResponse object uit e.args[0].
De code
Hier is de code als je dit wilt proberen. Stel eerst minimaal de volgende parameters in:
SLACK_BOT_TOKEN
SLACK_CHANNEL
Slack Channel kan de Id of de naam van het kanaal zijn. Als u de snelheidsbeperkende functie wilt observeren, kunt u instellen:
number_of_messages = 100
De code:
import collections
import copy
import datetime
import logging
import os
import sys
import time
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry.builtin_handlers import RateLimitErrorRetryHandler
class SlackError(Exception):
pass
class SlackApi:
def __init__(
self,
config=None,
logger=None,
base_url=None,
slack_bot_token=None,
timeout=None,
use_rate_limit_handler=True
):
self.config = config
self.logger = logger
self.base_url = base_url
self.slack_bot_token = slack_bot_token
self.timeout = timeout or 5.
self.use_rate_limit_handler = use_rate_limit_handler
web_client_params = {
'token': self.slack_bot_token,
'timeout': self.timeout
}
if self.base_url is not None:
web_client_params['base_url'] = self.base_url
self.client = WebClient(**web_client_params)
if use_rate_limit_handler:
# this handler does retries when HTTP status 429 is returned
rate_limit_handler = RateLimitErrorRetryHandler(
max_retry_count=1
)
self.client.retry_handlers.append(rate_limit_handler)
def make_response(self, has_errors=False, error_message=None, status_code=None, retry_after=None):
request_msecs = int((datetime.datetime.now() - self.ts).total_seconds() * 1000)
ResponseObj = collections.namedtuple('SlackAPIResponse', 'has_errors error_message status_code retry_after request_msecs')
return ResponseObj(has_errors, error_message, status_code, retry_after, request_msecs)
def chat_post_message(self, channel, text=None, blocks=None, user=None):
self.ts = datetime.datetime.now()
# change empty string user to None
if user is not None:
user = user.strip()
if user == '':
user = None
try:
response = self.client.chat_postMessage(
channel=channel,
text=text,
blocks=blocks,
user=user,
)
return self.make_response(
status_code=response.status_code,
)
except SlackApiError as e:
# first check for http errors
status = e.response.get('status', None)
self.logger.debug('status = {}'.format(status))
if status is not None:
status_code = status
error_message = 'http_' + str(status)
headers = e.response['headers']
else:
status_code = e.response.get('status_code', None)
error_message = e.response.get('error', None)
headers = e.response.headers
raise SlackError(self.make_response(
has_errors=True,
status_code=status_code,
error_message=error_message,
retry_after=headers.get('Retry-After', None),
))
except Exception as e:
exception = type(e).__name__
if exception == 'TimeoutError':
error_message='timeout_error'
else:
# all other errors are considered connection errors
error_message='connection_error'
raise SlackError(self.make_response(
has_errors=True,
error_message=error_message,
))
# the code below is typically in another file
# Example from:
# Stacking multiple blocks
# https://api.slack.com/messaging/composing/layouts
blocks_example = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "New request"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*Type:*\nPaid Time Off"
},
{
"type": "mrkdwn",
"text": "*Created by:*\n<example.com|Fred Enriquez>"
}
]
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": "*When:*\nAug 10 - Aug 13"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "<https://example.com|View request>"
}
}
]
def get_logger(logger_name=None, logger_level=logging.DEBUG):
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# console
console_handler = logging.StreamHandler()
console_logger_format = '%(levelname)-8.8s [%(filename)-15s%(funcName)15s():%(lineno)03s] %(message)s'
console_handler.setFormatter(logging.Formatter(console_logger_format))
console_handler.setLevel(logger_level)
logger.addHandler(console_handler)
return logger
config = {
'SLACK_BOT_TOKEN': '',
'SLACK_CHANNEL': '',
'SLACK_USER': None
}
def main():
logger = get_logger(logger_name='slack_api')
# your bot token and channel
slack_bot_token = config.get('SLACK_BOT_TOKEN')
slack_channel = config.get('SLACK_CHANNEL')
slack_user = config.get('SLACK_USER')
# create 'messages table'
number_of_messages = 100
#number_of_messages = 1
messages = []
for i in range(number_of_messages):
# deepcopy the dictionary here!
blocks = copy.deepcopy(blocks_example)
# patch message number
blocks[0]['text']['text'] = "New request" + ': {:03d}'.format(i)
messages.append({
'slack_bot_token': slack_bot_token,
'channel': slack_channel,
'user': slack_user,
'text': 'message {:03d}: this is a test message'.format(i),
'blocks': blocks,
# delivery status
'is_delivery_completed': False,
'is_delivered': False,
'delivery_tries': 0,
'last_delivery_error': None,
})
# send the messages
for i, message in enumerate(messages):
if message['is_delivered']:
continue
logger.debug('sending message {} ...'.format(i))
slack_api = SlackApi(
logger=logger,
slack_bot_token=message.get('slack_bot_token'),
#timeout=0.1,
)
try:
r = slack_api.chat_post_message(
channel=message.get('channel'),
text=message.get('text'),
blocks=message.get('blocks'),
user=message.get('user'),
)
logger.debug('success sending message[{}]: r = {}'.format(i, r))
# delivery status
message['is_delivery_completed'] = True
message['is_delivered'] = True
message['delivery_tries'] += 1
except SlackError as e:
#logger.exception('error sending message[{}]'.format(i))
r = e.args[0]
logger.debug('problem sending message[{}]: r = {}'.format(i, r))
message['delivery_tries'] += 1
message['last_delivery_error'] = r.error_message
if r.error_message in ['timeout_error', 'connection_error']:
# delivery status, must retry
message['is_delivery_completed'] = False
message['is_delivered'] = False
else:
# delivery status, done
message['is_delivery_completed'] = True
message['is_delivered'] = False
if __name__ == '__main__':
main()
Samenvatting
Eerst moeten we uitzoeken hoe we een Slack App maken, deze de juiste rechten geven en toevoegen aan een Slack Channel. Om berichten naar Slack te sturen gebruiken we de Slack SDK. De RetryHandler zorgt voor de snelheidsbeperkende functie van Slack. Het maakt onze code veel compacter. We hebben de Slack SDK verpakt in onze aangepaste SlackApi klasse. We hebben ook onze eigen SlackError klasse toegevoegd die alle (!) fouten afhandelt die kunnen optreden, inclusief timeouts en verbindingsfouten.
Onze applicatie kan nu de berichten en blokken aanmaken en ze naar Slack sturen met behulp van onze SlackApi.
Het aardige van het schrijven van een wrapper class, is dat we alle mogelijke parameters en responses hebben doorlopen. Dat is een manier (en niet zo'n slechte) om de ins-en-outs van een Python pakket te leren.
Links / credits
Python Slack SDK - RetryHandler
https://slack.dev/python-slack-sdk/web/index.html#retryhandler
Python Slack SDK - Web Client
https://slack.dev/python-slack-sdk/web/index.html
Slack - Package slack_sdk
https://slack.dev/python-slack-sdk/api-docs/slack_sdk
Slack - Rate Limits
https://api.slack.com/docs/rate-limits
Slack API - Python Slack SDK
https://slack.dev/python-slack-sdk
Using the Slack Web API
https://api.slack.com/web
Lees meer
Slack
Recent
- Database UUID primaire sleutels van je webapplicatie verbergen
- Don't Repeat Yourself (DRY) met Jinja2
- SQLAlchemy, PostgreSQL, maximum aantal rijen per user
- Toon de waarden in SQLAlchemy dynamische filters
- Veilige gegevensoverdracht met Public Key versleuteling en pyNaCl
- rqlite: een alternatief voor SQLite met hoge beschikbaarheid en distributed
Meest bekeken
- Met behulp van Python's pyOpenSSL om SSL-certificaten die van een host zijn gedownload te controleren
- Gebruik van UUIDs in plaats van Integer Autoincrement Primary Keys met SQLAlchemy en MariaDb
- Maak verbinding met een dienst op een Docker host vanaf een Docker container
- PyInstaller en Cython gebruiken om een Python executable te maken
- SQLAlchemy: Gebruik van Cascade Deletes om verwante objecten te verwijderen
- Flask RESTful API verzoekparametervalidatie met Marshmallow-schema's