Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 157 additions & 153 deletions auth_backend/auth_plugins/email.py

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions auth_backend/auth_plugins/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ async def _register(
gh_id = cls.create_auth_method_param('user_id', github_user_id, user.id, db_session=db.session)
new_user[cls.get_name()] = {"user_id": gh_id.value}
userdata = await GithubAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
GithubAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
await AuthPluginMeta.user_updated(new_user, old_user)
return await cls._create_session(
Expand Down Expand Up @@ -162,11 +162,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
'No users found for github account', 'Не найдено пользователей для аккаунта GitHub', id_token
)
userdata = await GithubAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
GithubAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
return await cls._create_session(
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name
Expand Down
8 changes: 4 additions & 4 deletions auth_backend/auth_plugins/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ async def _register(
google_id = cls.create_auth_method_param('unique_google_id', userinfo['sub'], user.id, db_session=db.session)
new_user = {cls.get_name(): {"unique_google_id": google_id.value}}
userdata = await GoogleAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
GoogleAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
await AuthPluginMeta.user_updated(new_user, old_user)
return await cls._create_session(
Expand Down Expand Up @@ -154,11 +154,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
id_token=credentials.get("id_token"),
)
userdata = await GoogleAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
GoogleAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
return await cls._create_session(
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name
Expand Down
8 changes: 4 additions & 4 deletions auth_backend/auth_plugins/keycloak.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ async def _register(
keycloak_id = cls.create_auth_method_param('user_id', keycloak_user_id, user.id, db_session=db.session)
new_user = {cls.get_name(): {"user_id": keycloak_id.value}}
userdata = await KeycloakAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
KeycloakAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
await AuthPluginMeta.user_updated(new_user, old_user)
return await cls._create_session(
Expand Down Expand Up @@ -153,11 +153,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
id_token = jwt.encode(userinfo, cls.settings.ENCRYPTION_KEY, algorithm="HS256")
raise OauthAuthFailed('No users found for keycloak account', id_token)
userdata = await KeycloakAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
KeycloakAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
return await cls._create_session(
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name
Expand Down
8 changes: 4 additions & 4 deletions auth_backend/auth_plugins/lkmsu.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ async def _register(
lk_id = cls.create_auth_method_param('user_id', lk_user_id, user.id, db_session=db.session)
new_user = {cls.get_name(): {"user_id": lk_id.value}}
userdata = await LkmsuAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
LkmsuAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
await AuthPluginMeta.user_updated(new_user, old_user)
return await cls._create_session(
Expand Down Expand Up @@ -154,11 +154,11 @@ async def _login(
'No users found for lk msu account', 'Не найдено пользователей с таким аккаунтом LK MSU', id_token
)
userdata = await LkmsuAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
LkmsuAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
return await cls._create_session(
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name
Expand Down
8 changes: 4 additions & 4 deletions auth_backend/auth_plugins/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ async def _register(
tg_id = cls.create_auth_method_param('user_id', telegram_user_id, user.id, db_session=db.session)
new_user[cls.get_name()]["user_id"] = tg_id.value
userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
TelegramAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
await AuthPluginMeta.user_updated(new_user, old_user)
return await cls._create_session(
Expand All @@ -111,11 +111,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
'No users found for Telegram account', 'Не найдено пользователей с таким ТГ аккаунтом', id_token
)
userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
TelegramAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
return await cls._create_session(
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name
Expand Down
8 changes: 4 additions & 4 deletions auth_backend/auth_plugins/vk.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ async def _register(
vk_id = cls.create_auth_method_param('user_id', vk_user_id, user.id, db_session=db.session)
new_user[cls.get_name()]["user_id"] = vk_id.value
userdata = await VkAuth._convert_data_to_userdata_format(userinfo['response'][0])
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
VkAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
await AuthPluginMeta.user_updated(new_user, old_user)
return await cls._create_session(
Expand Down Expand Up @@ -163,11 +163,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
'No users found for VK account', 'Не найдено пользователей с таким аккаунтом ВК', id_token
)
userdata = await VkAuth._convert_data_to_userdata_format(userinfo['response'][0])
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
VkAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
return await cls._create_session(
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name
Expand Down
8 changes: 4 additions & 4 deletions auth_backend/auth_plugins/yandex.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ async def _register(
ya_id = cls.create_auth_method_param('user_id', yandex_user_id, user.id, db_session=db.session)
new_user[cls.get_name()]["user_id"] = ya_id.value
userdata = await YandexAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
YandexAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
await AuthPluginMeta.user_updated(new_user, old_user)
return await cls._create_session(
Expand Down Expand Up @@ -167,11 +167,11 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun
'No users found for Yandex account', 'Не найдено пользователей для аккаунт Яндекс', id_token
)
userdata = await YandexAuth._convert_data_to_userdata_format(userinfo)
await get_kafka_producer().produce(
background_tasks.add_task(
get_kafka_producer().produce,
cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME,
YandexAuth.generate_kafka_key(user.id),
userdata,
bg_tasks=background_tasks,
)
return await cls._create_session(
user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name
Expand Down
2 changes: 1 addition & 1 deletion auth_backend/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(self, dtime: datetime.timedelta):
self.delay_time = dtime
super().__init__(
f'Too many email requests. Delay: {dtime}',
f'Слишком много запрос к email. Задержка: {dtime}',
f'Слишком много запросов к email. Задержка: {dtime}',
)


Expand Down
31 changes: 8 additions & 23 deletions auth_backend/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from confluent_kafka import KafkaError, KafkaException, Message, Producer
from event_schema.auth import UserLogin, UserLoginKey
from fastapi import BackgroundTasks

from auth_backend import __version__
from auth_backend.kafka.kafkameta import KafkaMeta
Expand All @@ -14,7 +13,7 @@
log = logging.getLogger(__name__)


class AIOKafka(KafkaMeta):
class Kafka(KafkaMeta):
"""
Класс для работы с Kafka
"""
Expand Down Expand Up @@ -59,7 +58,7 @@ def delivery_callback(self, err: KafkaError, msg: Message) -> None:
else:
log.info('%% Message delivered to %s [%d] @ %d\n' % (msg.topic(), msg.partition(), msg.offset()))

def _produce(self, topic: str, key: UserLoginKey, value: UserLogin) -> None:
def produce(self, topic: str, key: UserLoginKey, value: UserLogin) -> None:
"""
Отправляет сообщение в Kafka
Args:
Expand All @@ -82,29 +81,15 @@ def _produce(self, topic: str, key: UserLoginKey, value: UserLogin) -> None:

self._producer.poll(0)

async def produce(self, topic: str, key: UserLoginKey, value: UserLogin, *, bg_tasks: BackgroundTasks) -> None:
"""
Добавляет отправку сообщения в фоновые задачи
Args:
topic: топик в который будет написано сообщение
key: ключ сообщения
value: значение сообщение
bg_tasks: fastapi background_tasks

Returns:
Ничего
"""
bg_tasks.add_task(self._produce, topic, key, value)

async def close(self) -> None:
def close(self) -> None:
self._producer.flush()


class AIOKafkaMock(KafkaMeta):
async def produce(self, topic: str, key: Any, value: Any, *, bg_tasks: BackgroundTasks) -> Any:
class KafkaMock(KafkaMeta):
def produce(self, topic: str, key: Any, value: Any) -> Any:
log.debug(f"Kafka cluster disabled, debug msg: {topic=}, {key=}, {value=}")

async def close(self) -> None:
def close(self) -> None:
return


Expand All @@ -115,5 +100,5 @@ def get_kafka_producer() -> KafkaMeta:
иначе Mock кафки
"""
if get_settings().KAFKA_DSN:
return AIOKafka()
return AIOKafkaMock()
return Kafka()
return KafkaMock()
6 changes: 2 additions & 4 deletions auth_backend/kafka/kafkameta.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
from abc import ABC, abstractmethod
from typing import Any

from fastapi import BackgroundTasks


class KafkaMeta(ABC):
@abstractmethod
async def produce(self, topic: str, key: Any, value: Any, *, bg_tasks: BackgroundTasks) -> Any:
def produce(self, topic: str, key: Any, value: Any) -> Any:
raise NotImplementedError()

@abstractmethod
async def close(self) -> None:
def close(self) -> None:
raise NotImplementedError()
45 changes: 44 additions & 1 deletion auth_backend/models/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from __future__ import annotations

import asyncio
import re
from contextlib import asynccontextmanager
from typing import AsyncIterator

import sqlalchemy
from sqlalchemy import Integer, not_
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import Mapped, Query, Session, as_declarative, declared_attr, mapped_column

from auth_backend.exceptions import ObjectNotFound
from auth_backend.exceptions import AuthAPIError, ObjectNotFound


@as_declarative()
Expand Down Expand Up @@ -74,3 +78,42 @@ def delete(cls, id: int, *, session: Session) -> None:
else:
session.delete(obj)
session.flush()

@classmethod
@asynccontextmanager
async def lock(cls, session: Session) -> AsyncIterator[Session]:
"""
Сначала пытаемся захватить блокировку таблицы.

Так как используем синхронную алхимимю, сставим таймаут и не будем ждать больше него

Если удерживается блокировка другой корутиной, то в конце концов выйдем из ожидания по таймауту
и заблочим корутину асинхронным сном

Таким образом дадим корутине, удерживающей блокировку, доделать свою работу
"""
for _ in range(3):
nested = session.begin_nested()
session.execute(sqlalchemy.text("SET LOCAL lock_timeout = '0.2s';"))
try:
session.execute(sqlalchemy.text(f'LOCK TABLE {cls.__tablename__} IN ACCESS EXCLUSIVE MODE;'))
except sqlalchemy.exc.OperationalError:
nested.rollback()
await asyncio.sleep(1.5)
else:
break
else:
raise AuthAPIError("Internal Server Error", "Произошла ошибка, попробуйте позже")
try:
yield session
except Exception:
nested.rollback()
session.rollback()
if session and session.is_active:
session.close()
raise
else:
nested.commit()
session.commit()
if session and session.is_active:
session.close()
4 changes: 2 additions & 2 deletions auth_backend/routes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
await get_kafka_producer().close()
get_kafka_producer().close()


settings = get_settings()
Expand All @@ -39,7 +39,7 @@ async def lifespan(app: FastAPI):
app.add_middleware(
DBSessionMiddleware,
db_url=str(settings.DB_DSN),
engine_args={"pool_pre_ping": True, "isolation_level": "AUTOCOMMIT"},
engine_args={"pool_pre_ping": True},
)

app.add_middleware(
Expand Down
16 changes: 8 additions & 8 deletions auth_backend/routes/exc_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,6 @@ async def session_expired_handler(req: starlette.requests.Request, exc: SessionE
)


@app.exception_handler(Exception)
async def http_error_handler(req: starlette.requests.Request, exc: Exception):
return JSONResponse(
content=StatusResponseModel(status="Error", message="Internal server error", ru="Ошибка").model_dump(),
status_code=500,
)


@app.exception_handler(TooManyEmailRequests)
async def too_many_requests_handler(req: starlette.requests.Request, exc: TooManyEmailRequests):
return JSONResponse(
Expand All @@ -106,3 +98,11 @@ async def last_auth_method_delete_handler(req: starlette.requests.Request, exc:
).model_dump(),
status_code=403,
)


@app.exception_handler(Exception)
async def http_error_handler(req: starlette.requests.Request, exc: Exception):
return JSONResponse(
content=StatusResponseModel(status="Error", message="Internal server error", ru="Ошибка").model_dump(),
status_code=500,
)
Loading