Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pytz==2021.3
regex==2021.10.8
requests==2.26.0
six==1.16.0
slack-bolt==1.9.2
slack-bolt==1.10.0
slack-sdk==3.11.2
sqlparse==0.4.2
tqdm==4.62.3
Expand Down
91 changes: 54 additions & 37 deletions simone/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,27 @@
from cron_validator import CronValidator
from datetime import datetime
from django.conf import settings
from django.db import connection, transaction
from django.db import close_old_connections, transaction
from functools import wraps
from io import StringIO
from logging import getLogger
from os import environ
from pprint import pformat, pprint
from pylev import levenshtein
from slack_bolt import App
from time import time
from threading import Event, Thread

from slacker.listeners import SlackListener


max_dispatchers = getattr(settings, 'MAX_DISPATCHERS', 10)
executor = ThreadPoolExecutor(max_workers=max_dispatchers)
executor = ThreadPoolExecutor(
max_workers=max_dispatchers, thread_name_prefix='simone-worker'
)


def background(func):
@wraps(func)
def submit(*args, **kwargs):
executor.submit(func, *args, **kwargs)

return submit


def dispatch_with_error(func):
@background
def dispatch_with_error_reporting(func):
@wraps(func)
def wrap(self, context, *args, **kwargs):
ret = None
Expand All @@ -45,20 +40,14 @@ def wrap(self, context, *args, **kwargs):
'An error occured while responding to this message',
reply=True,
)
# We have to close the connection explicitly, if we don't things seem
# to "hang" somewhere in transaction.atomic() in future jobs :-(
# This isn't a problem in the dev server with sqlite3, but not clear if
# that's a difference between mysql and sqlite3 or something about dev
# mode.
# TODO: figure out what's going on here to see if we can avoid closing
connection.close()
return ret

return wrap


# TODO: we should put each handler in its own try/except so that if one fails
# it doesn't take out the others
def dispatch(func):
@background
@wraps(func)
def wrap(self, context, *args, **kwargs):
ret = None
Expand All @@ -72,13 +61,6 @@ def wrap(self, context, *args, **kwargs):
args,
kwargs,
)
# We have to close the connection explicitly, if we don't things seem
# to "hang" somewhere in transaction.atomic() in future jobs :-(
# This isn't a problem in the dev server with sqlite3, but not clear if
# that's a difference between mysql and sqlite3 or something about dev
# mode.
# TODO: figure out what's going on here to see if we can avoid closing
connection.close()
return ret

return wrap
Expand All @@ -94,13 +76,22 @@ class Dispatcher(object):
def __init__(self, handlers):
self.handlers = handlers

self.listeners = {'slack': SlackListener(self)}
token_verification = getattr(
settings, 'SLACK_TOKEN_VERIFICATION', False
)
app = App(
name='simone',
token=environ["SLACK_BOT_TOKEN"],
signing_secret=environ["SLACK_SIGNING_SECRET"],
token_verification_enabled=token_verification,
listener_executor=executor,
)
self.listeners = {'slack': SlackListener(self, app)}

addeds = []
commands = {}
command_words = {}
command_max_words = 0
crons = []
joineds = []
messages = []
for handler in handlers:
Expand All @@ -112,11 +103,6 @@ def __init__(self, handlers):
commands[' '.join(command)] = handler
command_words[command] = handler
command_max_words = max(command_max_words, len(command))
for cron in config.get('crons', []):
if self.validate_cron(cron):
# ^ will have shown warnings and we'll otherwise skip bad
# cron configs
crons.append((cron, handler))
if config.get('joined', False):
joineds.append(handler)
if config.get('messages', False):
Expand All @@ -127,10 +113,29 @@ def __init__(self, handlers):
self.commands = commands
self.command_words = command_words
self.command_max_words = command_max_words
self.crons = crons
self._crons = None
self.joineds = joineds
self.messages = messages

@property
def crons(self):
# load crons on demand since validating them requires db access. If we
# don't delay it we can't do the initial migration etc.
if self._crons is None:
self.log.debug('crons: loading')
crons = []
for handler in self.handlers:
for cron in handler.config().get('crons', []):
if self.validate_cron(cron):
# ^ will have shown warnings and we'll otherwise skip
# bad cron configs
crons.append((cron, handler))

self.log.debug('crons: loaded crons=%s', pformat(crons))
self._crons = crons

return self._crons

def urlpatterns(self):
return sum(
[l.urlpatterns() for _, l in sorted(self.listeners.items())], []
Expand Down Expand Up @@ -224,7 +229,7 @@ def find_command_handler(self, text):
self.log.debug('find_command_handler: no match')
return (command_words, None, None, None)

@dispatch_with_error
@dispatch_with_error_reporting
def command(self, context, text, **kwargs):
'''
Note: this will clean up whitespace in the command & text
Expand Down Expand Up @@ -330,7 +335,19 @@ def run(self):
running = True
while running:
start = time()
self.dispatcher.tick(datetime.utcnow())
# Cron is its own thread so we're in the background when tick is
# called so no need to submit to the executor. we do want to wrap
# it with try/except to catch handler problems and avoid killing
# the thread as well as wrap each time around in calls to check
# our database connections health (name doesn't match
# functionality)
close_old_connections()
try:
self.dispatcher.tick(datetime.utcnow())
except Exception:
self.log.exception('run: tick failed')
finally:
close_old_connections()
elapsed = time() - start
pause = 60 - elapsed
self.log.debug('run: elapsed=%f, pause=%f', elapsed, pause)
Expand Down
25 changes: 20 additions & 5 deletions simone/settings/dev.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from os import environ
from pathlib import Path

# Build paths inside the project like this: BASE_DIR / 'subdir'.
Expand All @@ -7,12 +8,26 @@

CRON_ENABLED = False

DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db' / 'db.sqlite3',

if 'SIMONE_DB_NAME' in environ:
DATABASES = {
'default': {
'ENGINE': 'mysql.connector.django',
'NAME': environ['SIMONE_DB_NAME'],
'USER': environ['SIMONE_DB_USER'],
'PASSWORD': environ['SIMONE_DB_PASSWORD'],
'HOST': environ['SIMONE_DB_HOST'],
'PORT': environ.get('SIMONE_DB_PORT', '3306'),
'CONN_MAX_AGE': 300,
}
}
else:
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db' / 'db.sqlite3',
}
}
}

LOGGING = {
'version': 1,
Expand Down
16 changes: 2 additions & 14 deletions slacker/listeners.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from django.conf import settings
from django.http import HttpRequest
from django.views.decorators.csrf import csrf_exempt
from django.urls import path
from logging import getLogger
from os import environ
from slack_bolt import App
from slack_bolt.adapter.django import SlackRequestHandler
import re

Expand Down Expand Up @@ -81,20 +78,11 @@ class SlackListener(object):

log = getLogger('SlackListener')

def __init__(self, dispatcher, app=None):
def __init__(self, dispatcher, app):
self.log.info('__init__: dispatcher=%s, app=%s', dispatcher, app)
self.dispatcher = dispatcher

if app is None:
token_verification = getattr(
settings, 'SLACK_TOKEN_VERIFICATION', False
)
app = App(
token=environ["SLACK_BOT_TOKEN"],
signing_secret=environ["SLACK_SIGNING_SECRET"],
token_verification_enabled=token_verification,
)
self.app = app

self._auth_info = None
self._bot_mention = None

Expand Down
3 changes: 2 additions & 1 deletion slacker/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,8 @@ def test_rich_methods(self):
)

def test_channel_rename(self):
listener = SlackListener(dispatcher=None, app=None)
app = DummyApp()
listener = SlackListener(dispatcher=None, app=app)

# create a channel we've never seen before
event = {
Expand Down