Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/docs/configuration/alerts-reports.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ To send alerts and reports to Slack channels, you need to create a new Slack App

Note: when you configure an alert or a report, the Slack channel list takes channel names without the leading '#' e.g. use `alerts` instead of `#alerts`.

#### Large Slack Workspaces (10k+ channels)

For workspaces with many channels, fetching the complete channel list can take several minutes and may encounter Slack API rate limits. Add the following to your `superset_config.py`:

```python
from datetime import timedelta

# Increase cache timeout to reduce API calls
# Default: 1 day (86400 seconds)
SLACK_CACHE_TIMEOUT = int(timedelta(days=2).total_seconds())

# Increase retry count for rate limit errors
# Default: 2
SLACK_API_RATE_LIMIT_RETRY_COUNT = 5
```

### Kubernetes-specific

- You must have a `celery beat` pod running. If you're using the chart included in the GitHub repository under [helm/superset](https://github.com/apache/superset/tree/master/helm/superset), you need to put `supersetCeleryBeat.enabled = true` in your values override.
Expand Down
5 changes: 5 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,11 @@ def EMAIL_HEADER_MUTATOR( # pylint: disable=invalid-name,unused-argument # noq
SLACK_PROXY = None
SLACK_CACHE_TIMEOUT = int(timedelta(days=1).total_seconds())

# Maximum number of retries when Slack API returns rate limit errors
# Default: 2
# For workspaces with 10k+ channels, consider increasing to 10
SLACK_API_RATE_LIMIT_RETRY_COUNT = 2

# The webdriver to use for generating reports. Use one of the following
# firefox
# Requires: geckodriver and firefox installations
Expand Down
21 changes: 17 additions & 4 deletions superset/tasks/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,23 @@

@celery_app.task(name="slack.cache_channels")
def cache_channels() -> None:
cache_timeout = current_app.config["SLACK_CACHE_TIMEOUT"]
retry_count = current_app.config.get("SLACK_API_RATE_LIMIT_RETRY_COUNT", 2)

logger.info(
"Starting Slack channels cache warm-up task "
"(cache_timeout=%ds, retry_count=%d)",
cache_timeout,
retry_count,
)

try:
get_channels(
force=True, cache_timeout=current_app.config["SLACK_CACHE_TIMEOUT"]
)
get_channels(force=True, cache_timeout=cache_timeout)
except Exception as ex:
logger.exception("An error occurred while caching Slack channels: %s", ex)
logger.exception(
"Failed to cache Slack channels: %s. "
"If this is due to rate limiting, consider increasing "
"SLACK_API_RATE_LIMIT_RETRY_COUNT.",
str(ex),
)
raise
63 changes: 51 additions & 12 deletions superset/utils/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ def get_slack_client() -> WebClient:
token = token()
client = WebClient(token=token, proxy=app.config["SLACK_PROXY"])

rate_limit_handler = RateLimitErrorRetryHandler(max_retry_count=2)
max_retry_count = app.config.get("SLACK_API_RATE_LIMIT_RETRY_COUNT", 2)
rate_limit_handler = RateLimitErrorRetryHandler(max_retry_count=max_retry_count)
client.retry_handlers.append(rate_limit_handler)

logger.debug("Slack client configured with %d rate limit retries", max_retry_count)

return client


Expand All @@ -73,19 +76,45 @@ def get_channels() -> list[SlackChannelSchema]:
channels: list[SlackChannelSchema] = []
extra_params = {"types": ",".join(SlackChannelTypes)}
cursor = None
page_count = 0

logger.info("Starting Slack channels fetch")

try:
while True:
page_count += 1

while True:
response = client.conversations_list(
limit=999, cursor=cursor, exclude_archived=True, **extra_params
response = client.conversations_list(
limit=999, cursor=cursor, exclude_archived=True, **extra_params
)
page_channels = response.data["channels"]
channels.extend(channel_schema.load(channel) for channel in page_channels)

logger.debug(
"Fetched page %d: %d channels (total: %d)",
page_count,
len(page_channels),
len(channels),
)

cursor = response.data.get("response_metadata", {}).get("next_cursor")
if not cursor:
break

logger.info(
"Successfully fetched %d Slack channels in %d pages",
len(channels),
page_count,
)
channels.extend(
channel_schema.load(channel) for channel in response.data["channels"]
return channels
except SlackApiError as ex:
logger.error(
"Failed to fetch Slack channels after %d pages: %s",
page_count,
str(ex),
exc_info=True,
)
cursor = response.data.get("response_metadata", {}).get("next_cursor")
if not cursor:
break

return channels
raise


def get_channels_with_search(
Expand All @@ -104,7 +133,17 @@ def get_channels_with_search(
force=force,
cache_timeout=app.config["SLACK_CACHE_TIMEOUT"],
)
except (SlackClientError, SlackApiError) as ex:
except SlackApiError as ex:
# Check if it's a rate limit error
status_code = getattr(ex.response, "status_code", None)
if status_code == 429:
raise SupersetException(
f"Slack API rate limit exceeded: {ex}. "
"For large workspaces, consider increasing "
"SLACK_API_RATE_LIMIT_RETRY_COUNT"
) from ex
raise SupersetException(f"Failed to list channels: {ex}") from ex
except SlackClientError as ex:
raise SupersetException(f"Failed to list channels: {ex}") from ex

if types and not len(types) == len(SlackChannelTypes):
Expand Down
Loading