From 294f9b0cf7d700f7305cffc7ea73241191937a04 Mon Sep 17 00:00:00 2001 From: Semyon Grigoriev Date: Mon, 5 Aug 2024 13:07:52 +0300 Subject: [PATCH 1/3] locks impl --- auth_backend/auth_plugins/email.py | 83 +++++++++++++++--------------- auth_backend/models/base.py | 30 +++++++++++ auth_backend/routes/base.py | 2 +- auth_backend/routes/scopes.py | 12 +++-- auth_backend/utils/smtp.py | 4 +- tests/conftest.py | 2 +- 6 files changed, 85 insertions(+), 48 deletions(-) diff --git a/auth_backend/auth_plugins/email.py b/auth_backend/auth_plugins/email.py index 7041afc5..6e77c4c8 100644 --- a/auth_backend/auth_plugins/email.py +++ b/auth_backend/auth_plugins/email.py @@ -9,6 +9,9 @@ from fastapi_sqlalchemy import db from pydantic import field_validator, model_validator from sqlalchemy import func +import sqlalchemy +from sqlalchemy.orm import Session as DBSession +import sqlalchemy.exc from auth_backend.auth_method import AuthPluginMeta, LoginableMixin, RegistrableMixin, Session, UserdataMixin from auth_backend.base import Base, StatusResponseModel @@ -184,8 +187,8 @@ async def _add_to_db(user_inp: EmailRegister, confirmation_token: str, user: Use return method_params @staticmethod - async def _change_confirmation_link(user: User, confirmation_token: str) -> None: - auth_params = Email.get_auth_method_params(user.id, session=db.session) + async def _change_confirmation_link(user: User, confirmation_token: str, *, session: DBSession) -> None: + auth_params = Email.get_auth_method_params(user.id, session=session) if auth_params["confirmed"].value == "true": raise AlreadyExists(User, user.id) else: @@ -200,57 +203,55 @@ async def _register( user_session: UserSession = Depends(UnionAuth(scopes=[], allow_none=True, auto_error=True)), ) -> StatusResponseModel: confirmation_token: str = random_string() - auth_method: AuthMethod | None = ( - AuthMethod.query(session=db.session) - .filter( - AuthMethod.param == "email", - func.lower(AuthMethod.value) == user_inp.email.lower(), - AuthMethod.auth_method == Email.get_name(), + with AuthMethod.txn(db.session) as txn: + auth_method: AuthMethod | None = ( + AuthMethod.query(session=txn) + .filter( + AuthMethod.param == "email", + func.lower(AuthMethod.value) == user_inp.email.lower(), + AuthMethod.auth_method == Email.get_name(), + ) + .one_or_none() ) - .one_or_none() - ) - if auth_method: - await Email._change_confirmation_link(auth_method.user, confirmation_token) + if auth_method: + await Email._change_confirmation_link(auth_method.user, confirmation_token, session=txn) + SendEmailMessage.send( + user_inp.email, + request.client.host, + "main_confirmation.html", + "Подтверждение регистрации Твой ФФ!", + txn, + background_tasks, + url=f"{settings.APPLICATION_HOST}/auth/register/success?token={confirmation_token}", + ) + return StatusResponseModel( + status="Success", message="Email confirmation link sent", ru="Ссылка отправлена на почту" + ) + if user_session: + user = await cls._get_user(user_session=user_session, db_session=txn) + if not user: + raise SessionExpired(user_session.token) + else: + user = await cls._create_user(db_session=txn) + method_params = await Email._add_to_db(user_inp, confirmation_token, user) + method_params["password"] = user_inp.password # В user_updated передаем пароль в открытую SendEmailMessage.send( user_inp.email, request.client.host, "main_confirmation.html", "Подтверждение регистрации Твой ФФ!", - db.session, + txn, background_tasks, url=f"{settings.APPLICATION_HOST}/auth/register/success?token={confirmation_token}", ) - db.session.commit() + + old_user = None + if user_session: + old_user = {"user_id": user_session.user.id} + await AuthPluginMeta.user_updated({"user_id": user.id, Email.get_name(): method_params}, old_user) return StatusResponseModel( status="Success", message="Email confirmation link sent", ru="Ссылка отправлена на почту" ) - if user_session: - user = await cls._get_user(user_session=user_session, db_session=db.session) - if not user: - raise SessionExpired(user_session.token) - else: - user = await cls._create_user(db_session=db.session) - method_params = await Email._add_to_db(user_inp, confirmation_token, user) - method_params["password"] = user_inp.password # В user_updated передаем пароль в открытую - SendEmailMessage.send( - user_inp.email, - request.client.host, - "main_confirmation.html", - "Подтверждение регистрации Твой ФФ!", - db.session, - background_tasks, - url=f"{settings.APPLICATION_HOST}/auth/register/success?token={confirmation_token}", - ) - - old_user = None - if user_session: - old_user = {"user_id": user_session.user.id} - await AuthPluginMeta.user_updated({"user_id": user.id, Email.get_name(): method_params}, old_user) - - db.session.commit() - return StatusResponseModel( - status="Success", message="Email confirmation link sent", ru="Ссылка отправлена на почту" - ) @staticmethod def _hash_password(password: str, salt: str) -> str: diff --git a/auth_backend/models/base.py b/auth_backend/models/base.py index 0734f150..cbde1998 100644 --- a/auth_backend/models/base.py +++ b/auth_backend/models/base.py @@ -1,8 +1,12 @@ from __future__ import annotations +from contextlib import contextmanager import re +import sys +from typing import Iterator from sqlalchemy import Integer, not_ +import sqlalchemy from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import Mapped, Query, Session, as_declarative, declared_attr, mapped_column @@ -74,3 +78,29 @@ def delete(cls, id: int, *, session: Session) -> None: else: session.delete(obj) session.flush() + + @classmethod + @contextmanager + def txn(cls, session: Session) -> Iterator[Session]: + try: + nested = session.begin_nested() + session.execute(sqlalchemy.text(f'LOCK TABLE {cls.__tablename__} IN ACCESS EXCLUSIVE MODE;')) + try: + yield session + except Exception: + exception_name, _, __ = sys.exc_info() + nested.rollback() + session.rollback() + if session and session.is_active: + session.close() + raise + finally: + if locals().get("exception_name") is not None: + return + nested.commit() + session.commit() + if session and session.is_active: + session.close() + except Exception: + raise + diff --git a/auth_backend/routes/base.py b/auth_backend/routes/base.py index a0370dbf..71d89624 100644 --- a/auth_backend/routes/base.py +++ b/auth_backend/routes/base.py @@ -39,7 +39,7 @@ async def lifespan(app: FastAPI): app.add_middleware( DBSessionMiddleware, db_url=str(settings.DB_DSN), - engine_args={"pool_pre_ping": True, "isolation_level": "AUTOCOMMIT"}, + engine_args={"pool_pre_ping": True}, ) app.add_middleware( diff --git a/auth_backend/routes/scopes.py b/auth_backend/routes/scopes.py index e766233f..4091290f 100644 --- a/auth_backend/routes/scopes.py +++ b/auth_backend/routes/scopes.py @@ -26,9 +26,11 @@ async def create_scope( detail=StatusResponseModel(status="Error", message="Already exists", ru="Уже существует").model_dump(), ) scope.name = scope.name.lower() - return ScopeGet.model_validate( + retval = ScopeGet.model_validate( Scope.create(**scope.model_dump(), creator_id=user_session.user_id, session=db.session) ) + db.session.commit() + return retval @scopes.get("/{id}", response_model=ScopeGet) @@ -62,9 +64,11 @@ async def update_scope( Scopes: `["auth.scope.update"]` """ scope = Scope.get(id, session=db.session) - return ScopeGet.model_validate( + retval = ScopeGet.model_validate( Scope.update(scope.id, **scope_inp.model_dump(exclude_unset=True), session=db.session) ) + db.session.commit() + return retval @scopes.delete("/{id}", response_model=StatusResponseModel) @@ -75,4 +79,6 @@ async def delete_scope( Scopes: `["auth.scope.delete"]` """ Scope.delete(session=db.session, id=id) - return StatusResponseModel(status="Success", message="Scope has been deleted", ru="Скоуп удален") + retval = StatusResponseModel(status="Success", message="Scope has been deleted", ru="Скоуп удален") + db.session.commit() + return retval diff --git a/auth_backend/utils/smtp.py b/auth_backend/utils/smtp.py index 3cbdad40..5d9a305d 100644 --- a/auth_backend/utils/smtp.py +++ b/auth_backend/utils/smtp.py @@ -26,7 +26,7 @@ def create_user_delay(cls, ip: str, email: str, dbsession: DbSession): cls.check_email_delay(email, dbsession) user_delay = UserMessageDelay(user_ip=ip, user_email=email, delay_time=datetime.datetime.utcnow()) dbsession.add(user_delay) - dbsession.commit() + dbsession.flush() @classmethod def delete_user_delay(cls, ip: str, email: str, dbsession: DbSession): @@ -42,7 +42,7 @@ def delete_user_delay(cls, ip: str, email: str, dbsession: DbSession): dbsession.query(UserMessageDelay).filter( UserMessageDelay.user_email == email, UserMessageDelay.delay_time < time_filter ).delete() - dbsession.commit() + dbsession.flush() @classmethod def check_ip_delay(cls, ip: str, dbsession: DbSession): diff --git a/tests/conftest.py b/tests/conftest.py index 09183f9e..68319eba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -50,7 +50,7 @@ def client_auth(): @pytest.fixture() def dbsession(): settings = get_settings() - engine = create_engine(str(settings.DB_DSN)) + engine = create_engine(str(settings.DB_DSN), isolation_level="AUTOCOMMIT") TestingSessionLocal = sessionmaker(bind=engine) return TestingSessionLocal() From 36b5eb7d3b506df8d67372bb577fbbbf69f659ec Mon Sep 17 00:00:00 2001 From: semen603089 Date: Tue, 6 Aug 2024 17:48:39 +0300 Subject: [PATCH 2/3] datarace fixes, kafka refactoring --- auth_backend/auth_plugins/email.py | 244 +++++++++++++------------- auth_backend/auth_plugins/github.py | 20 +-- auth_backend/auth_plugins/google.py | 18 +- auth_backend/auth_plugins/keycloak.py | 20 +-- auth_backend/auth_plugins/lkmsu.py | 18 +- auth_backend/auth_plugins/telegram.py | 18 +- auth_backend/auth_plugins/vk.py | 18 +- auth_backend/auth_plugins/yandex.py | 18 +- auth_backend/exceptions.py | 2 +- auth_backend/kafka/kafka.py | 31 +--- auth_backend/kafka/kafkameta.py | 6 +- auth_backend/models/base.py | 63 ++++--- auth_backend/routes/base.py | 2 +- auth_backend/routes/exc_handlers.py | 16 +- 14 files changed, 241 insertions(+), 253 deletions(-) diff --git a/auth_backend/auth_plugins/email.py b/auth_backend/auth_plugins/email.py index 6e77c4c8..7cbee521 100644 --- a/auth_backend/auth_plugins/email.py +++ b/auth_backend/auth_plugins/email.py @@ -9,9 +9,7 @@ from fastapi_sqlalchemy import db from pydantic import field_validator, model_validator from sqlalchemy import func -import sqlalchemy from sqlalchemy.orm import Session as DBSession -import sqlalchemy.exc from auth_backend.auth_method import AuthPluginMeta, LoginableMixin, RegistrableMixin, Session, UserdataMixin from auth_backend.base import Base, StatusResponseModel @@ -161,11 +159,12 @@ async def _login(cls, user_inp: EmailLogin, background_tasks: BackgroundTasks) - ): raise AuthFailed("Incorrect login or password", "Некорректный логин или пароль") userdata = await Email._convert_data_to_userdata_format({"email": auth_params["email"].value}) - await get_kafka_producer().produce( - settings.KAFKA_USER_LOGIN_TOPIC_NAME, - Email.generate_kafka_key(query.user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + settings.KAFKA_USER_LOGIN_TOPIC_NAME, + Email.generate_kafka_key(query.user.id), + userdata, + ) ) return await cls._create_session( query.user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name @@ -203,7 +202,7 @@ async def _register( user_session: UserSession = Depends(UnionAuth(scopes=[], allow_none=True, auto_error=True)), ) -> StatusResponseModel: confirmation_token: str = random_string() - with AuthMethod.txn(db.session) as txn: + async with AuthMethod.lock(db.session) as txn: auth_method: AuthMethod | None = ( AuthMethod.query(session=txn) .filter( @@ -284,11 +283,10 @@ async def _approve_email(token: str, background_tasks: BackgroundTasks) -> Statu auth_params = Email.get_auth_method_params(auth_method.user.id, session=db.session) auth_params["confirmed"].value = "true" userdata = await Email._convert_data_to_userdata_format({"email": auth_params["email"].value}) - await get_kafka_producer().produce( - settings.KAFKA_USER_LOGIN_TOPIC_NAME, - Email.generate_kafka_key(auth_method.user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + settings.KAFKA_USER_LOGIN_TOPIC_NAME, Email.generate_kafka_key(auth_method.user.id), userdata + ) ) await AuthPluginMeta.user_updated( {"user_id": auth_method.user.id, Email.get_name(): {"confirmed": True}}, @@ -305,57 +303,59 @@ async def _request_reset_email( background_tasks: BackgroundTasks, user_session: UserSession = Depends(UnionAuth(scopes=[], allow_none=False, auto_error=True)), ) -> StatusResponseModel: - auth_params = Email.get_auth_method_params(user_session.user_id, session=db.session) - if "email" not in auth_params: - raise IncorrectUserAuthType() - if auth_params["confirmed"].value == "false": - raise AuthFailed( - "Registration wasn't completed. Try to registrate again and do not forget to approve your email", - "Регистрация не была завершена. Попробуйте зарегистрироваться снова и не забудьте подтвердить почту", + async with AuthMethod.lock(db.session) as txn: + auth_params = Email.get_auth_method_params(user_session.user_id, session=txn) + if "email" not in auth_params: + raise IncorrectUserAuthType() + if auth_params["confirmed"].value == "false": + raise AuthFailed( + "Registration wasn't completed. Try to registrate again and do not forget to approve your email", + "Регистрация не была завершена. Попробуйте зарегистрироваться снова и не забудьте подтвердить почту", + ) + if auth_params["email"].value == scheme.email: + raise HTTPException( + status_code=401, + detail=StatusResponseModel( + status="Error", message="Email incorrect", ru="Некорректная почта" + ).model_dump(), + ) + + old_user = {"user_id": user_session.user_id, cls.get_name(): {}} + new_user = {"user_id": user_session.user_id, cls.get_name(): {}} + token = random_string(length=settings.TOKEN_LENGTH) + if "tmp_email" in auth_params: + old_user[cls.get_name()]["tmp_email"] = auth_params["tmp_email"].value + auth_params["tmp_email"].is_deleted = True + old_user[cls.get_name()]["tmp_email_confirmation_token"] = auth_params[ + "tmp_email_confirmation_token" + ].value + auth_params["tmp_email_confirmation_token"].is_deleted = True + txn.flush() + AuthMethod.create( + user_id=user_session.user_id, + auth_method="email", + param="tmp_email_confirmation_token", + value=token, + session=txn, ) - if auth_params["email"].value == scheme.email: - raise HTTPException( - status_code=401, - detail=StatusResponseModel( - status="Error", message="Email incorrect", ru="Некорректная почта" - ).model_dump(), + new_user[cls.get_name()]["tmp_email_confirmation_token"] = token + AuthMethod.create( + user_id=user_session.user_id, auth_method="email", param="tmp_email", value=scheme.email, session=txn + ) + new_user[cls.get_name()]["tmp_email"] = scheme.email + SendEmailMessage.send( + to_email=scheme.email, + ip=request.client.host, + message_file_name="mail_change_confirmation.html", + subject="Смена почты Твой ФФ!", + dbsession=txn, + background_tasks=background_tasks, + url=f"{settings.APPLICATION_HOST}/auth/reset/email?token={token}", + ) + await AuthPluginMeta.user_updated(new_user, old_user) + return StatusResponseModel( + status="Success", message="Email confirmation link sent", ru="Ссылка отправлена на почту" ) - - old_user = {"user_id": user_session.user_id, cls.get_name(): {}} - new_user = {"user_id": user_session.user_id, cls.get_name(): {}} - token = random_string(length=settings.TOKEN_LENGTH) - if "tmp_email" in auth_params: - old_user[cls.get_name()]["tmp_email"] = auth_params["tmp_email"].value - auth_params["tmp_email"].is_deleted = True - old_user[cls.get_name()]["tmp_email_confirmation_token"] = auth_params["tmp_email_confirmation_token"].value - auth_params["tmp_email_confirmation_token"].is_deleted = True - db.session.flush() - AuthMethod.create( - user_id=user_session.user_id, - auth_method="email", - param="tmp_email_confirmation_token", - value=token, - session=db.session, - ) - new_user[cls.get_name()]["tmp_email_confirmation_token"] = token - AuthMethod.create( - user_id=user_session.user_id, auth_method="email", param="tmp_email", value=scheme.email, session=db.session - ) - new_user[cls.get_name()]["tmp_email"] = scheme.email - SendEmailMessage.send( - to_email=scheme.email, - ip=request.client.host, - message_file_name="mail_change_confirmation.html", - subject="Смена почты Твой ФФ!", - dbsession=db.session, - background_tasks=background_tasks, - url=f"{settings.APPLICATION_HOST}/auth/reset/email?token={token}", - ) - await AuthPluginMeta.user_updated(new_user, old_user) - db.session.commit() - return StatusResponseModel( - status="Success", message="Email confirmation link sent", ru="Ссылка отправлена на почту" - ) @staticmethod async def _reset_email(token: str, background_tasks: BackgroundTasks) -> StatusResponseModel: @@ -397,8 +397,10 @@ async def _reset_email(token: str, background_tasks: BackgroundTasks) -> StatusR Email.get_name(): {"email": auth_params["email"].value}, } userdata = await Email._convert_data_to_userdata_format({"email": auth_params["email"].value}) - await get_kafka_producer().produce( - settings.KAFKA_USER_LOGIN_TOPIC_NAME, Email.generate_kafka_key(user.id), userdata, bg_tasks=background_tasks + background_tasks.add_task( + get_kafka_producer().produce( + settings.KAFKA_USER_LOGIN_TOPIC_NAME, Email.generate_kafka_key(user.id), userdata + ) ) await AuthPluginMeta.user_updated(new_user, old_user) db.session.commit() @@ -455,67 +457,67 @@ async def _request_reset_password( async def _request_reset_forgotten_password( request: Request, schema: RequestResetForgottenPassword, background_tasks: BackgroundTasks ) -> StatusResponseModel: - auth_method_email: AuthMethod | None = ( - AuthMethod.query(session=db.session) - .filter( - AuthMethod.auth_method == Email.get_name(), - AuthMethod.param == "email", - AuthMethod.value == schema.email, + async with AuthMethod.lock(db.session) as txn: + auth_method_email: AuthMethod | None = ( + AuthMethod.query(session=txn) + .filter( + AuthMethod.auth_method == Email.get_name(), + AuthMethod.param == "email", + AuthMethod.value == schema.email, + ) + .one_or_none() ) - .one_or_none() - ) - if not auth_method_email: - raise HTTPException( - status_code=404, - detail=StatusResponseModel( - status="Error", message="Email not found", ru="Почта не найдена" - ).model_dump(), + if not auth_method_email: + raise HTTPException( + status_code=404, + detail=StatusResponseModel( + status="Error", message="Email not found", ru="Почта не найдена" + ).model_dump(), + ) + auth_params = Email.get_auth_method_params(auth_method_email.user.id, session=txn) + old_user = {"user_id": auth_method_email.user.id, Email.get_name(): {}} + new_user = {"user_id": auth_method_email.user.id, Email.get_name(): {}} + if "email" not in auth_params: + raise HTTPException( + status_code=401, + detail=StatusResponseModel( + status="Error", + message="Auth method restricted for this user", + ru="Метод аутентификации не установлен для пользователя", + ).model_dump(), + ) + if auth_params["confirmed"].value.lower() == "false": + raise AuthFailed( + "Registration wasn't completed. Try to registrate again and do not forget to approve your email", + "Регистрация не была завершена. Паоробуйте зарегистрироваться снова и не забудьте подтвердить почту", + ) + if "reset_token" in auth_params: + old_user[Email.get_name()]["reset_token"] = auth_params["reset_token"].value + auth_params["reset_token"].is_deleted = True + txn.flush() + reset_token_value = random_string(length=settings.TOKEN_LENGTH) + AuthMethod.create( + user_id=auth_method_email.user.id, + auth_method="email", + param="reset_token", + value=reset_token_value, + session=txn, ) - auth_params = Email.get_auth_method_params(auth_method_email.user.id, session=db.session) - old_user = {"user_id": auth_method_email.user.id, Email.get_name(): {}} - new_user = {"user_id": auth_method_email.user.id, Email.get_name(): {}} - if "email" not in auth_params: - raise HTTPException( - status_code=401, - detail=StatusResponseModel( - status="Error", - message="Auth method restricted for this user", - ru="Метод аутентификации не установлен для пользователя", - ).model_dump(), + new_user[Email.get_name()]["reset_token"] = reset_token_value + auth_params = Email.get_auth_method_params(auth_method_email.user.id, session=txn) + SendEmailMessage.send( + to_email=auth_params["email"].value, + ip=request.client.host, + message_file_name="password_change_confirmation.html", + subject="Смена пароля Твой ФФ!", + dbsession=txn, + background_tasks=background_tasks, + url=f"{settings.APPLICATION_HOST}/auth/reset/password?token={auth_params['reset_token'].value}", ) - if auth_params["confirmed"].value.lower() == "false": - raise AuthFailed( - "Registration wasn't completed. Try to registrate again and do not forget to approve your email", - "Регистрация не была завершена. Паоробуйте зарегистрироваться снова и не забудьте подтвердить почту", + await AuthPluginMeta.user_updated(new_user, old_user) + return StatusResponseModel( + status="Success", message="Reset link has been successfully mailed", ru="Ссылка отправлена на почту" ) - if "reset_token" in auth_params: - old_user[Email.get_name()]["reset_token"] = auth_params["reset_token"].value - auth_params["reset_token"].is_deleted = True - db.session.flush() - reset_token_value = random_string(length=settings.TOKEN_LENGTH) - AuthMethod.create( - user_id=auth_method_email.user.id, - auth_method="email", - param="reset_token", - value=reset_token_value, - session=db.session, - ) - new_user[Email.get_name()]["reset_token"] = reset_token_value - auth_params = Email.get_auth_method_params(auth_method_email.user.id, session=db.session) - SendEmailMessage.send( - to_email=auth_params["email"].value, - ip=request.client.host, - message_file_name="password_change_confirmation.html", - subject="Смена пароля Твой ФФ!", - dbsession=db.session, - background_tasks=background_tasks, - url=f"{settings.APPLICATION_HOST}/auth/reset/password?token={auth_params['reset_token'].value}", - ) - await AuthPluginMeta.user_updated(new_user, old_user) - db.session.commit() - return StatusResponseModel( - status="Success", message="Reset link has been successfully mailed", ru="Ссылка отправлена на почту" - ) @staticmethod async def _reset_forgotten_password( diff --git a/auth_backend/auth_plugins/github.py b/auth_backend/auth_plugins/github.py index a255afdb..db10b6bf 100644 --- a/auth_backend/auth_plugins/github.py +++ b/auth_backend/auth_plugins/github.py @@ -106,11 +106,12 @@ async def _register( gh_id = cls.create_auth_method_param('user_id', github_user_id, user.id, db_session=db.session) new_user[cls.get_name()] = {"user_id": gh_id.value} userdata = await GithubAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - GithubAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + GithubAuth.generate_kafka_key(user.id), + userdata, + ) ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -162,11 +163,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun 'No users found for github account', 'Не найдено пользователей для аккаунта GitHub', id_token ) userdata = await GithubAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - GithubAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, GithubAuth.generate_kafka_key(user.id), userdata + ) ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/google.py b/auth_backend/auth_plugins/google.py index a8a9fd25..a56b21e1 100644 --- a/auth_backend/auth_plugins/google.py +++ b/auth_backend/auth_plugins/google.py @@ -114,11 +114,10 @@ async def _register( google_id = cls.create_auth_method_param('unique_google_id', userinfo['sub'], user.id, db_session=db.session) new_user = {cls.get_name(): {"unique_google_id": google_id.value}} userdata = await GoogleAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - GoogleAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, GoogleAuth.generate_kafka_key(user.id), userdata + ) ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -154,11 +153,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun id_token=credentials.get("id_token"), ) userdata = await GoogleAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - GoogleAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, GoogleAuth.generate_kafka_key(user.id), userdata + ) ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/keycloak.py b/auth_backend/auth_plugins/keycloak.py index 359cfded..7d93cc5c 100644 --- a/auth_backend/auth_plugins/keycloak.py +++ b/auth_backend/auth_plugins/keycloak.py @@ -102,11 +102,12 @@ async def _register( keycloak_id = cls.create_auth_method_param('user_id', keycloak_user_id, user.id, db_session=db.session) new_user = {cls.get_name(): {"user_id": keycloak_id.value}} userdata = await KeycloakAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - KeycloakAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + KeycloakAuth.generate_kafka_key(user.id), + userdata, + ) ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -153,11 +154,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun id_token = jwt.encode(userinfo, cls.settings.ENCRYPTION_KEY, algorithm="HS256") raise OauthAuthFailed('No users found for keycloak account', id_token) userdata = await KeycloakAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - KeycloakAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, KeycloakAuth.generate_kafka_key(user.id), userdata + ) ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/lkmsu.py b/auth_backend/auth_plugins/lkmsu.py index 0793e6e5..5f996d88 100644 --- a/auth_backend/auth_plugins/lkmsu.py +++ b/auth_backend/auth_plugins/lkmsu.py @@ -101,11 +101,10 @@ async def _register( lk_id = cls.create_auth_method_param('user_id', lk_user_id, user.id, db_session=db.session) new_user = {cls.get_name(): {"user_id": lk_id.value}} userdata = await LkmsuAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - LkmsuAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, LkmsuAuth.generate_kafka_key(user.id), userdata + ) ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -154,11 +153,10 @@ async def _login( 'No users found for lk msu account', 'Не найдено пользователей с таким аккаунтом LK MSU', id_token ) userdata = await LkmsuAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - LkmsuAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, LkmsuAuth.generate_kafka_key(user.id), userdata + ) ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/telegram.py b/auth_backend/auth_plugins/telegram.py index 070f334e..f2b967d6 100644 --- a/auth_backend/auth_plugins/telegram.py +++ b/auth_backend/auth_plugins/telegram.py @@ -80,11 +80,10 @@ async def _register( tg_id = cls.create_auth_method_param('user_id', telegram_user_id, user.id, db_session=db.session) new_user[cls.get_name()]["user_id"] = tg_id.value userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - TelegramAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, TelegramAuth.generate_kafka_key(user.id), userdata + ) ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -111,11 +110,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun 'No users found for Telegram account', 'Не найдено пользователей с таким ТГ аккаунтом', id_token ) userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - TelegramAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, TelegramAuth.generate_kafka_key(user.id), userdata + ) ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/vk.py b/auth_backend/auth_plugins/vk.py index f118b99e..522024bc 100644 --- a/auth_backend/auth_plugins/vk.py +++ b/auth_backend/auth_plugins/vk.py @@ -113,11 +113,10 @@ async def _register( vk_id = cls.create_auth_method_param('user_id', vk_user_id, user.id, db_session=db.session) new_user[cls.get_name()]["user_id"] = vk_id.value userdata = await VkAuth._convert_data_to_userdata_format(userinfo['response'][0]) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - VkAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, VkAuth.generate_kafka_key(user.id), userdata + ) ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -163,11 +162,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun 'No users found for VK account', 'Не найдено пользователей с таким аккаунтом ВК', id_token ) userdata = await VkAuth._convert_data_to_userdata_format(userinfo['response'][0]) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - VkAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, VkAuth.generate_kafka_key(user.id), userdata + ) ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/yandex.py b/auth_backend/auth_plugins/yandex.py index 46820f13..43f20745 100644 --- a/auth_backend/auth_plugins/yandex.py +++ b/auth_backend/auth_plugins/yandex.py @@ -118,11 +118,10 @@ async def _register( ya_id = cls.create_auth_method_param('user_id', yandex_user_id, user.id, db_session=db.session) new_user[cls.get_name()]["user_id"] = ya_id.value userdata = await YandexAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - YandexAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, YandexAuth.generate_kafka_key(user.id), userdata + ) ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -167,11 +166,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun 'No users found for Yandex account', 'Не найдено пользователей для аккаунт Яндекс', id_token ) userdata = await YandexAuth._convert_data_to_userdata_format(userinfo) - await get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - YandexAuth.generate_kafka_key(user.id), - userdata, - bg_tasks=background_tasks, + background_tasks.add_task( + get_kafka_producer().produce( + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, YandexAuth.generate_kafka_key(user.id), userdata + ) ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/exceptions.py b/auth_backend/exceptions.py index f86fa4dd..4fa01572 100644 --- a/auth_backend/exceptions.py +++ b/auth_backend/exceptions.py @@ -61,7 +61,7 @@ def __init__(self, dtime: datetime.timedelta): self.delay_time = dtime super().__init__( f'Too many email requests. Delay: {dtime}', - f'Слишком много запрос к email. Задержка: {dtime}', + f'Слишком много запросов к email. Задержка: {dtime}', ) diff --git a/auth_backend/kafka/kafka.py b/auth_backend/kafka/kafka.py index 092bc0ef..7437e9e3 100644 --- a/auth_backend/kafka/kafka.py +++ b/auth_backend/kafka/kafka.py @@ -4,7 +4,6 @@ from confluent_kafka import KafkaError, KafkaException, Message, Producer from event_schema.auth import UserLogin, UserLoginKey -from fastapi import BackgroundTasks from auth_backend import __version__ from auth_backend.kafka.kafkameta import KafkaMeta @@ -14,7 +13,7 @@ log = logging.getLogger(__name__) -class AIOKafka(KafkaMeta): +class Kafka(KafkaMeta): """ Класс для работы с Kafka """ @@ -59,7 +58,7 @@ def delivery_callback(self, err: KafkaError, msg: Message) -> None: else: log.info('%% Message delivered to %s [%d] @ %d\n' % (msg.topic(), msg.partition(), msg.offset())) - def _produce(self, topic: str, key: UserLoginKey, value: UserLogin) -> None: + def produce(self, topic: str, key: UserLoginKey, value: UserLogin) -> None: """ Отправляет сообщение в Kafka Args: @@ -82,29 +81,15 @@ def _produce(self, topic: str, key: UserLoginKey, value: UserLogin) -> None: self._producer.poll(0) - async def produce(self, topic: str, key: UserLoginKey, value: UserLogin, *, bg_tasks: BackgroundTasks) -> None: - """ - Добавляет отправку сообщения в фоновые задачи - Args: - topic: топик в который будет написано сообщение - key: ключ сообщения - value: значение сообщение - bg_tasks: fastapi background_tasks - - Returns: - Ничего - """ - bg_tasks.add_task(self._produce, topic, key, value) - - async def close(self) -> None: + def close(self) -> None: self._producer.flush() -class AIOKafkaMock(KafkaMeta): - async def produce(self, topic: str, key: Any, value: Any, *, bg_tasks: BackgroundTasks) -> Any: +class KafkaMock(KafkaMeta): + def produce(self, topic: str, key: Any, value: Any) -> Any: log.debug(f"Kafka cluster disabled, debug msg: {topic=}, {key=}, {value=}") - async def close(self) -> None: + def close(self) -> None: return @@ -115,5 +100,5 @@ def get_kafka_producer() -> KafkaMeta: иначе Mock кафки """ if get_settings().KAFKA_DSN: - return AIOKafka() - return AIOKafkaMock() + return Kafka() + return KafkaMock() diff --git a/auth_backend/kafka/kafkameta.py b/auth_backend/kafka/kafkameta.py index e49c5ed4..c7966604 100644 --- a/auth_backend/kafka/kafkameta.py +++ b/auth_backend/kafka/kafkameta.py @@ -1,14 +1,12 @@ from abc import ABC, abstractmethod from typing import Any -from fastapi import BackgroundTasks - class KafkaMeta(ABC): @abstractmethod - async def produce(self, topic: str, key: Any, value: Any, *, bg_tasks: BackgroundTasks) -> Any: + def produce(self, topic: str, key: Any, value: Any) -> Any: raise NotImplementedError() @abstractmethod - async def close(self) -> None: + def close(self) -> None: raise NotImplementedError() diff --git a/auth_backend/models/base.py b/auth_backend/models/base.py index cbde1998..f59dd9e9 100644 --- a/auth_backend/models/base.py +++ b/auth_backend/models/base.py @@ -1,16 +1,16 @@ from __future__ import annotations -from contextlib import contextmanager +import asyncio import re -import sys -from typing import Iterator +from contextlib import asynccontextmanager +from typing import AsyncIterator -from sqlalchemy import Integer, not_ import sqlalchemy +from sqlalchemy import Integer, not_ from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import Mapped, Query, Session, as_declarative, declared_attr, mapped_column -from auth_backend.exceptions import ObjectNotFound +from auth_backend.exceptions import AuthAPIError, ObjectNotFound @as_declarative() @@ -78,29 +78,42 @@ def delete(cls, id: int, *, session: Session) -> None: else: session.delete(obj) session.flush() - + @classmethod - @contextmanager - def txn(cls, session: Session) -> Iterator[Session]: - try: + @asynccontextmanager + async def lock(cls, session: Session) -> AsyncIterator[Session]: + """ + Сначала пытаемся захватить блокировку таблицы. + + Так как используем синхронную алхимимю, сставим таймаут и не будем ждать больше него + + Если удерживается блокировка другой корутиной, то в конце концов выйдем из ожидания по таймауту + и заблочим корутину асинхронным сном + + Таким образом дадим корутине, удерживающей блокировку, доделать свою работу + """ + for _ in range(3): nested = session.begin_nested() - session.execute(sqlalchemy.text(f'LOCK TABLE {cls.__tablename__} IN ACCESS EXCLUSIVE MODE;')) + session.execute(sqlalchemy.text("SET LOCAL lock_timeout = '0.2s';")) try: - yield session - except Exception: - exception_name, _, __ = sys.exc_info() + session.execute(sqlalchemy.text(f'LOCK TABLE {cls.__tablename__} IN ACCESS EXCLUSIVE MODE;')) + except sqlalchemy.exc.OperationalError: nested.rollback() - session.rollback() - if session and session.is_active: - session.close() - raise - finally: - if locals().get("exception_name") is not None: - return - nested.commit() - session.commit() - if session and session.is_active: - session.close() + await asyncio.sleep(1.5) + else: + break + else: + raise AuthAPIError("Internal Server Error", "Произошла ошибка, попробуйте позже") + try: + yield session except Exception: + nested.rollback() + session.rollback() + if session and session.is_active: + session.close() raise - + else: + nested.commit() + session.commit() + if session and session.is_active: + session.close() diff --git a/auth_backend/routes/base.py b/auth_backend/routes/base.py index 71d89624..65eb6c2d 100644 --- a/auth_backend/routes/base.py +++ b/auth_backend/routes/base.py @@ -18,7 +18,7 @@ @asynccontextmanager async def lifespan(app: FastAPI): yield - await get_kafka_producer().close() + get_kafka_producer().close() settings = get_settings() diff --git a/auth_backend/routes/exc_handlers.py b/auth_backend/routes/exc_handlers.py index d212b01d..e58f21b7 100644 --- a/auth_backend/routes/exc_handlers.py +++ b/auth_backend/routes/exc_handlers.py @@ -76,14 +76,6 @@ async def session_expired_handler(req: starlette.requests.Request, exc: SessionE ) -@app.exception_handler(Exception) -async def http_error_handler(req: starlette.requests.Request, exc: Exception): - return JSONResponse( - content=StatusResponseModel(status="Error", message="Internal server error", ru="Ошибка").model_dump(), - status_code=500, - ) - - @app.exception_handler(TooManyEmailRequests) async def too_many_requests_handler(req: starlette.requests.Request, exc: TooManyEmailRequests): return JSONResponse( @@ -106,3 +98,11 @@ async def last_auth_method_delete_handler(req: starlette.requests.Request, exc: ).model_dump(), status_code=403, ) + + +@app.exception_handler(Exception) +async def http_error_handler(req: starlette.requests.Request, exc: Exception): + return JSONResponse( + content=StatusResponseModel(status="Error", message="Internal server error", ru="Ошибка").model_dump(), + status_code=500, + ) From b3065766fcd47f1384910c88b728feafe514d7d2 Mon Sep 17 00:00:00 2001 From: semen603089 Date: Tue, 6 Aug 2024 18:16:47 +0300 Subject: [PATCH 3/3] bg tasks fix --- auth_backend/auth_plugins/email.py | 23 ++++++++++++----------- auth_backend/auth_plugins/github.py | 16 ++++++++-------- auth_backend/auth_plugins/google.py | 14 ++++++++------ auth_backend/auth_plugins/keycloak.py | 16 ++++++++-------- auth_backend/auth_plugins/lkmsu.py | 14 ++++++++------ auth_backend/auth_plugins/telegram.py | 14 ++++++++------ auth_backend/auth_plugins/vk.py | 14 ++++++++------ auth_backend/auth_plugins/yandex.py | 14 ++++++++------ 8 files changed, 68 insertions(+), 57 deletions(-) diff --git a/auth_backend/auth_plugins/email.py b/auth_backend/auth_plugins/email.py index 7cbee521..330867dd 100644 --- a/auth_backend/auth_plugins/email.py +++ b/auth_backend/auth_plugins/email.py @@ -160,11 +160,10 @@ async def _login(cls, user_inp: EmailLogin, background_tasks: BackgroundTasks) - raise AuthFailed("Incorrect login or password", "Некорректный логин или пароль") userdata = await Email._convert_data_to_userdata_format({"email": auth_params["email"].value}) background_tasks.add_task( - get_kafka_producer().produce( - settings.KAFKA_USER_LOGIN_TOPIC_NAME, - Email.generate_kafka_key(query.user.id), - userdata, - ) + get_kafka_producer().produce, + settings.KAFKA_USER_LOGIN_TOPIC_NAME, + Email.generate_kafka_key(query.user.id), + userdata, ) return await cls._create_session( query.user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name @@ -284,9 +283,10 @@ async def _approve_email(token: str, background_tasks: BackgroundTasks) -> Statu auth_params["confirmed"].value = "true" userdata = await Email._convert_data_to_userdata_format({"email": auth_params["email"].value}) background_tasks.add_task( - get_kafka_producer().produce( - settings.KAFKA_USER_LOGIN_TOPIC_NAME, Email.generate_kafka_key(auth_method.user.id), userdata - ) + get_kafka_producer().produce, + settings.KAFKA_USER_LOGIN_TOPIC_NAME, + Email.generate_kafka_key(auth_method.user.id), + userdata, ) await AuthPluginMeta.user_updated( {"user_id": auth_method.user.id, Email.get_name(): {"confirmed": True}}, @@ -398,9 +398,10 @@ async def _reset_email(token: str, background_tasks: BackgroundTasks) -> StatusR } userdata = await Email._convert_data_to_userdata_format({"email": auth_params["email"].value}) background_tasks.add_task( - get_kafka_producer().produce( - settings.KAFKA_USER_LOGIN_TOPIC_NAME, Email.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + settings.KAFKA_USER_LOGIN_TOPIC_NAME, + Email.generate_kafka_key(user.id), + userdata, ) await AuthPluginMeta.user_updated(new_user, old_user) db.session.commit() diff --git a/auth_backend/auth_plugins/github.py b/auth_backend/auth_plugins/github.py index db10b6bf..1ef892a0 100644 --- a/auth_backend/auth_plugins/github.py +++ b/auth_backend/auth_plugins/github.py @@ -107,11 +107,10 @@ async def _register( new_user[cls.get_name()] = {"user_id": gh_id.value} userdata = await GithubAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - GithubAuth.generate_kafka_key(user.id), - userdata, - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + GithubAuth.generate_kafka_key(user.id), + userdata, ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -164,9 +163,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun ) userdata = await GithubAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, GithubAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + GithubAuth.generate_kafka_key(user.id), + userdata, ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/google.py b/auth_backend/auth_plugins/google.py index a56b21e1..90afb810 100644 --- a/auth_backend/auth_plugins/google.py +++ b/auth_backend/auth_plugins/google.py @@ -115,9 +115,10 @@ async def _register( new_user = {cls.get_name(): {"unique_google_id": google_id.value}} userdata = await GoogleAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, GoogleAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + GoogleAuth.generate_kafka_key(user.id), + userdata, ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -154,9 +155,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun ) userdata = await GoogleAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, GoogleAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + GoogleAuth.generate_kafka_key(user.id), + userdata, ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/keycloak.py b/auth_backend/auth_plugins/keycloak.py index 7d93cc5c..1520f188 100644 --- a/auth_backend/auth_plugins/keycloak.py +++ b/auth_backend/auth_plugins/keycloak.py @@ -103,11 +103,10 @@ async def _register( new_user = {cls.get_name(): {"user_id": keycloak_id.value}} userdata = await KeycloakAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, - KeycloakAuth.generate_kafka_key(user.id), - userdata, - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + KeycloakAuth.generate_kafka_key(user.id), + userdata, ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -155,9 +154,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun raise OauthAuthFailed('No users found for keycloak account', id_token) userdata = await KeycloakAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, KeycloakAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + KeycloakAuth.generate_kafka_key(user.id), + userdata, ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/lkmsu.py b/auth_backend/auth_plugins/lkmsu.py index 5f996d88..b9cac630 100644 --- a/auth_backend/auth_plugins/lkmsu.py +++ b/auth_backend/auth_plugins/lkmsu.py @@ -102,9 +102,10 @@ async def _register( new_user = {cls.get_name(): {"user_id": lk_id.value}} userdata = await LkmsuAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, LkmsuAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + LkmsuAuth.generate_kafka_key(user.id), + userdata, ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -154,9 +155,10 @@ async def _login( ) userdata = await LkmsuAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, LkmsuAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + LkmsuAuth.generate_kafka_key(user.id), + userdata, ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/telegram.py b/auth_backend/auth_plugins/telegram.py index f2b967d6..11484f5b 100644 --- a/auth_backend/auth_plugins/telegram.py +++ b/auth_backend/auth_plugins/telegram.py @@ -81,9 +81,10 @@ async def _register( new_user[cls.get_name()]["user_id"] = tg_id.value userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, TelegramAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + TelegramAuth.generate_kafka_key(user.id), + userdata, ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -111,9 +112,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun ) userdata = await TelegramAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, TelegramAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + TelegramAuth.generate_kafka_key(user.id), + userdata, ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/vk.py b/auth_backend/auth_plugins/vk.py index 522024bc..ff771dad 100644 --- a/auth_backend/auth_plugins/vk.py +++ b/auth_backend/auth_plugins/vk.py @@ -114,9 +114,10 @@ async def _register( new_user[cls.get_name()]["user_id"] = vk_id.value userdata = await VkAuth._convert_data_to_userdata_format(userinfo['response'][0]) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, VkAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + VkAuth.generate_kafka_key(user.id), + userdata, ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -163,9 +164,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun ) userdata = await VkAuth._convert_data_to_userdata_format(userinfo['response'][0]) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, VkAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + VkAuth.generate_kafka_key(user.id), + userdata, ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name diff --git a/auth_backend/auth_plugins/yandex.py b/auth_backend/auth_plugins/yandex.py index 43f20745..c1f8dbbe 100644 --- a/auth_backend/auth_plugins/yandex.py +++ b/auth_backend/auth_plugins/yandex.py @@ -119,9 +119,10 @@ async def _register( new_user[cls.get_name()]["user_id"] = ya_id.value userdata = await YandexAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, YandexAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + YandexAuth.generate_kafka_key(user.id), + userdata, ) await AuthPluginMeta.user_updated(new_user, old_user) return await cls._create_session( @@ -167,9 +168,10 @@ async def _login(cls, user_inp: OauthResponseSchema, background_tasks: Backgroun ) userdata = await YandexAuth._convert_data_to_userdata_format(userinfo) background_tasks.add_task( - get_kafka_producer().produce( - cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, YandexAuth.generate_kafka_key(user.id), userdata - ) + get_kafka_producer().produce, + cls.settings.KAFKA_USER_LOGIN_TOPIC_NAME, + YandexAuth.generate_kafka_key(user.id), + userdata, ) return await cls._create_session( user, user_inp.scopes, db_session=db.session, session_name=user_inp.session_name