Skip to content

Шаблон для приложения FastAPI RESTful, построенного на принципах модульного монолита и многоуровневой архитектуры

Notifications You must be signed in to change notification settings

Forgot-0/fastapi_template

Repository files navigation

FastAPI Template

Содержание

  1. Введение
  2. Архитектура
  3. Безопасность
  4. Core Services

Введение

Вдохновлен Starter Kit

Project Structure

fastapi_template/
├── migrations/          # Alembic migrations
├── app/                 # Application package
│   ├── main.py         # Application entry point
│   ├── pre_start.py    # Pre-start checks and initialization
│   ├── init_data.py    # Initial data setup
│   ├── tasks.py        # Background tasks
│   ├── core/           # Core functionality
│   │   ├── api/        # API related tools
│   │   ├── configs/    # Configuration management
│   │   ├── db/         # Database utilities
│   │   ├── di/         # Dependency injection setup
│   │   ├── events/     # Event system
│   │   ├── log/        # Logging configuration
│   │   ├── mediators/  # Mediator pattern implementation
│   │   ├── message_brokers/  # Message broker implementations
│   │   ├── middlewares/      # Custom middlewares
│   │   ├── services/   # Core services
│   │   └── websockets/ # WebSocket support
│   └── auth/           # Auth module (example module structure)
│       ├── commands/   # Command handlers
│       ├── events/     # Module specific events
│       ├── models/     # Database models
│       ├── queries/    # Query handlers
│       ├── repositories/  # Data access layer
│       ├── routes/     # API routes
│       ├── schemas/    # Pydantic schemas
│       └── services/   # Business logic services
├── monitoring/         # Monitoring configuration (Grafana, Loki, Vector)
└── pyproject.toml     # Project dependencies

Database Layer

Used:

Key Notes:

  • app.core.db.BaseModel — реализует общую логику модели. Все пользовательские модели должны её наследовать. Сам BaseModel наследует от sqlalchemy.orm.DeclarativeBase.
  • app.core.db.DateMixin - Добавляет поля created_at и updated_at для автоматического отслеживания времени создания и обновления.
  • app.core.db.SoftDeleteMixin - Реализует функцию «мягкого» удаления. Чтобы добавить логику «мягкого» удаления для вашей конкретной модели, вам просто нужно унаследовать SoftDeleteMixin.
  • BaseModel поддерживает систему событий через методы register_event() и pull_events() для интеграции с EventBus.
  • Все модели должны быть импортированы в app/core/models.py, чтобы Alembic мог их видеть и работать с ними.

API Layer

Used:

Key Notes:

  • Для группировки связанных маршрутов следует использовать APIRouter из fastapi. Их следует разместить в отдельных файлах, расположенных в routes.v<api-version> ваших модулей. Например: app.auth.routes.v1.users.py.

  • Каждый модуль имеет маршрутизатор верхнего уровня, который объединяет все групповые маршрутизаторы в один главный. Например: app.auth.routers.py.

  • Маршрутизатор верхнего уровня из каждого модуля должен быть зарегистрирован в маршрутизаторе приложения в app.main.py

  • app.core.api.builder.ListParamsBuilder — зависимость, которая анализирует и формирует список параметров запроса. Она использует модели Pydantic app.core.api.schemas.ListParams, app.core.api.schemas.SortParam, app.core.api.schemas.FilterParam. Поэтому мы можем расширить их и настроить правила валидации. Например, в app.auth.schemas.user.py:

    from app.core.api.schemas import FilterParam, ListParams, SortParam
    
    class UserSortParam(SortParam):
        field: Literal['id', 'username', 'status_id', 'created_at']
    
    class UserFilterParam(FilterParam):
        field: Literal['id', 'username', 'status_id']
    
    class UserListParams(ListParams):
        sort: list[UserSortParam] | None = Field(None, description='Sorting parameters')
        filters: list[UserFilterParam] | None = Field(None, description='Filtering parameters')

    Затем мы создаем экземпляр app.core.api.builder.ListParamsBuilder и используем его в функции операции пути:

    from app.auth.schemas.user import UserFilterParam, UserListParams, UserResponse, UserSortParam
    from app.core.api.builder import ListParamsBuilder, PaginatedResponse
    
    list_params_builder = ListParamsBuilder(UserListParams, UserSortParam, UserFilterParam)
    
    @router.get('')
    async def get_list(request: UserListParams = Depends(list_params_builder)
    ) -> PaginatedResponse[list[UserResponse]]:
        ...
  • app.core.api.rate_limiter.ConfigurableRateLimiter - это просто простая оболочка для зависимости RateLimiter из пакета fastapi_limiter, которая добавляет возможность включать/отключать ограничение из конфигурации.

    Вот как можно использовать ограничитель APIRouter:

    from app.core.api.rate_limiter import ConfigurableRateLimiter
    
    router = APIRouter(dependencies=[Depends(ConfigurableRateLimiter(times=3, seconds=60))])

    Практически то же самое для функции операции пропуска:

    from app.core.api.rate_limiter import ConfigurableRateLimiter
    
    @app.get("/", dependencies=[Depends(ConfigurableRateLimiter(times=3, seconds=60))])
    async def index():
        ...

Config

Key Notes:

  • Конфигурации приложения можно получить через app.core.configs.app_config.
  • Каждый модуль должен иметь свой собственный config.py (при необходимости), который должен быть унаследован от app.core.configs.BaseConfig.
  • Все конфигурации извлекают параметры из файла .env.
  • .env.example — это всего лишь пример, описывающий все используемые параметры. Его следует скопировать в .env при первом развёртывании приложения.

Dependency Injection.

В проекте используется сложная система DI, основанная на фреймворке Diska:

Container Setup

  • Модульная регистрация зависимостей в папках di/
  • Поддержка как синхронных, так и асинхронных зависимостей

Key Features

  • Scoped Lifetime Management: Поддержка времен жизни APP, REQUEST и SESSION
  • Integration with FastAPI:
from dishka.integrations.fastapi import setup_dishka
from app.core.di.container import create_container

def init_app() -> FastAPI:
    app = FastAPI()
    container = create_container()
    setup_dishka(container=container, app=app)
    return app

Использование в эндпоинтах:

from dishka import FromDishka
from app.core.mediators.base import BaseMediator

@router.get("/")
async def example(mediator: FromDishka[BaseMediator]):
    # Использование медиатора для команд и запросов
    result = await mediator.handle_command(SomeCommand())
    return result

Policies

Key Notes:

  • Все файлы политик должны быть размещены в каталоге policies в нашем модуле.

Очень полезно разделить логику доступа к действиям и саму логику действий. Для реализации этого не требуется устанавливать никакие сторонние библиотеки. В шаблоне сейчас нет места для этой логики, поэтому я просто покажу вам пример.

В вашем app.our_module.policies.users.py:

from app.auth.deps import ActiveUser
from app.auth.exceptions import ActionNotAllowed

async def can_update(user: ActiveUser) bool:
    # Any logic we need to restrict access to this action.
    if not user.is_admin:
        raise ActionNotAllowed("You don't have permission to update the user")
    
    return True

Затем мы можем использовать его в нашей функции операции пути:

@router.patch('/{user_id}', dependencies=[Depends(can_update)])
async def update(user_id: int) -> None:
    ...

Как видите, система DI позволяет нам легко и довольно элегантно добавлять эти проверки в наши маршруты. Мы также можем использовать её где угодно в нашем коде, например, в ваших сервисах. Достаточно просто передать необходимые параметры в нашу функцию:

from app.auth.exceptions import ActionNotAllowed

async def update_status(user_id: int, status_id: UserStatus) -> User:
    user = await self.get(user_id)

    if not await can_update(user):
        raise ActionNotAllowed("You don't have permission to update the user.")

Такой подход позволяет нам хранить логику доступа к действиям в одном месте и использовать её повторно при необходимости.

OAuth Authentication

Used:

  • OAuth 2.0 providers: Google, Yandex, GitHub
  • HTTP client: httpx

Key Notes:

Проект поддерживает OAuth аутентификацию через несколько провайдеров. Система построена на паттерне Factory для легкого добавления новых провайдеров.

  1. Поддерживаемые провайдеры:

    • Google OAuth 2.0
    • Yandex OAuth 2.0
    • GitHub OAuth 2.0
  2. Структура OAuth системы:

    • OAuthProvider - Базовый класс для всех OAuth провайдеров
    • OAuthProviderFactory - Фабрика для управления провайдерами
    • OAuthManager - Менеджер для работы с OAuth
  3. Использование OAuth:

    Получение URL для авторизации:

    from app.auth.services.oauth_manager import OAuthManager
    
    oauth_manager: OAuthManager
    auth_url = await oauth_manager.get_authorize_url(
        provider_name="google",
        state="unique_state_string"
    )

    Обработка callback:

    oauth_data = await oauth_manager.process_callback(
        provider_name="google",
        code="authorization_code"
    )
  4. API Endpoints:

    • GET /api/v1/auth/oauth/{provider}/authorize - Получить URL для авторизации
    • GET /api/v1/auth/oauth/{provider}/authorize/connect - Подключить OAuth к существующему аккаунту
    • GET /api/v1/auth/oauth/{provider}/callback - Callback endpoint для обработки OAuth ответа
  5. Конфигурация: OAuth настраивается через переменные окружения в app.auth.config.AuthConfig:

    OAUTH_GOOGLE_CLIENT_ID: str
    OAUTH_GOOGLE_CLIENT_SECRET: str
    OAUTH_GOOGLE_REDIRECT_URI: str
    # Аналогично для Yandex и GitHub

Services

Events

Used:

  • EventBus: Индивидуальная реализация на основе шаблона Mediator
  • DI Container: Dishka

Key Notes

  1. Event System Components:

    • BaseEvent - Базовый класс для всех событий (из app.core.events.event)
    • EventRegisty - Реестр обработчиков событий
    • BaseEventBus - Интерфейс для обработки событий
    • MediatorEventBus - Основная реализация EventBus
  2. Creating New Events:

     from dataclasses import dataclass
     from app.core.events.event import BaseEvent
    
     @dataclass(frozen=True)
     class UserCreatedEvent(BaseEvent):
         email: str
         username: str
         __event_name__: str = "user_created"  # Unique event identifier
  3. Creating Event Handlers:

    @dataclass(frozen=True)
    class SendVerifyEventHandler(BaseEventHandler[CreatedUserEvent, None]):
        user_service: UserService
    
        async def __call__(self, event: CreatedUserEvent) -> None:
            await self.user_service.send_verify(email=event.email)
  4. Dispatching Events:

    # In your models
    class User(BaseModel):
        @classmethod
        def create(cls, email: str, username: str, password_hash: str) -> "User":
            user = User(
                email=email,
                username=username,
                password_hash=password_hash
            )
    
            user.register_event(
                CreatedUserEvent(
                    email=email,
                    username=username
                )
            )
            return user

Best Practices:

  1. Event Naming:

    • Названия событий должны быть описательными и однозначными.
    • Определите __event_name__ для каждого события
  2. Handler Organization:

    • Храните обработчики в отдельном каталоге events внутри вашего модуля.
    • Один обработчик на файл для лучшей организации.
    • Следуйте шаблону: <module>/events/<entity>/<event_name>.py
  3. Event Data:

    • События должны быть неизменяемыми (используйте @dataclass(frozen=True))
    • Включайте в события только необходимые данные
    • По возможности используйте идентификаторы вместо полных объектов

Cache service

Used:

Key Notes:

  • CacheServiceInterface доступен через DI систему Dishka.
  • Сервис предоставляет базовые операции кеширования: get, set, delete.

Использование кеша:

from dishka import FromDishka
from app.core.services.cache.base import CacheServiceInterface

@router.get("/items/{item_id}")
async def get_item(
    item_id: int,
    cache_service: FromDishka[CacheServiceInterface]
):
    # Проверка кеша
    cached_item = await cache_service.get(f"item:{item_id}")
    if cached_item:
        return cached_item
    
    # Получение данных из БД
    item = await get_item_from_db(item_id)
    
    # Сохранение в кеш
    await cache_service.set(f"item:{item_id}", item, ttl=60)
    
    return item

@router.delete("/items/{item_id}")
async def delete_item(
    item_id: int,
    cache_service: FromDishka[CacheServiceInterface]
):
    # Удаление из кеша
    await cache_service.delete(f"item:{item_id}")
    # Удаление из БД
    ...

Queue service

Used:

Key Notes:

  • Каждая задача очереди должна быть унаследована от app.core.services.queue.task.BaseTask, иметь атрибут __task_name__ и реализовывать метод run(...).
  • Все задачи модуля должны быть зарегистрированы в core/di/tasks.py.

Вот как могут выглядеть задачи очереди:

from app.core.services.queue import BaseTask

class SendEmail(BaseTask):
    __task_name__ = 'mail.send'

    @staticmethod
    @inject
    async def run(content: str, email_data: dict, smtp_config: FromDishka[SMTPConfig]) -> None:
        ...
        message = EmailMessage()  
        ...
        await aiosmtplib.send(message, **smtp_config)

Чтобы отправить его в очередь, нам следует использовать QueueServiceInterface:

from dishka import FromDishka
from app.core.services.queues.service import QueueServiceInterface

@router.get('/')
async def index(
    queue_service: FromDishka[QueueServiceInterface]
) -> Response:
    ...
    await queue_service.push(
        task=SendEmail,  
        data={'content': template.render(), 'email_data': email_data},
    )

Mail service

Used:

Key Notes:

  • Каждый шаблон электронной почты должен наследовать app.core.services.mail.template.BaseTemplate и реализовывать методы _get_dir(...) и _get_name(...).
  • Все классы шаблонов электронной почты должны быть помещены в emails.templates.py каждого модуля. Сами HTML-шаблоны должны быть помещены в каталог emails.views.
  • Отправка почты может выполняться в фоновом режиме с помощью QueueService.

Пример класса шаблона электронной почты:

from app.core.services.mail import BaseTemplate

class UserRegistration(BaseTemplate):
    def __init__(self, username: str, project_name: str):
        self.username = username
        self.project_name = project_name
        
    def _get_dir(self) -> Path:
        return Path('app/auth/emails/views')

    def _get_name(self) -> str:
        return 'user_registration.html'

И HTML-шаблон user_registration.html:

<h1> Hello {{ username }}!</h1>

<p>You have successfully registered on <b>{{ project_name }}</b>.</p>
<p>Thank you and welcome to your new account!</p>

Для отправки электронного письма нам следует использовать BaseMailService:

from dishka import FromDishka
from app.core.services.mail.service import BaseMailService, EmailData
from app.core.configs.app import app_config

@router.get('/')
async def index(
    mail_service: FromDishka[BaseMailService]
) -> Response:
    ...
    email_data = EmailData(
        subject='Successful registration',
        recipient=user.email
    )
    template = UserRegistration(
        username=user.username,
        project_name=app_config.PROJECT_NAME
    )
    
    # Синхронная отправка
    await mail_service.send(template=template, email_data=email_data)

    # Или отправка в фоновом режиме через QueueService
    await mail_service.queue(template=template, email_data=email_data)

Storage Service

Used:

  • Object storage: MinIO (S3-compatible)
  • Python client: minio

Key Notes:

Сервис предоставляет абстракцию для работы с объектным хранилищем, поддерживающим S3 API. Реализация использует MinIO, но может быть легко заменена на AWS S3 или другие S3-совместимые хранилища.

  1. Основные возможности:

    • Загрузка файлов
    • Генерация presigned URLs для загрузки/скачивания
    • Удаление файлов
    • Скачивание файлов
    • Управление bucket policies
  2. Использование Storage Service:

    from dishka import FromDishka
    from app.core.services.storage.service import BaseStorageService
    
    @router.post("/upload")
    async def upload_file(
        file: UploadFile,
        storage_service: FromDishka[BaseStorageService]
    ):
        # Загрузка файла
        file_key = await storage_service.upload_file(
            bucket_name="base",
            file_content=file.file,
            file_key=f"uploads/{file.filename}",
            content_type=file.content_type
        )
        return {"file_key": file_key}
    
    @router.get("/download/{file_key}")
    async def download_file(
        file_key: str,
        storage_service: FromDishka[BaseStorageService]
    ):
        # Генерация presigned URL для скачивания
        url = await storage_service.generate_presigned_url(
            bucket_name="base",
            file_key=file_key,
            expires=3600  # 1 час
        )
        return {"download_url": url}
  3. Bucket Policies: Поддерживаются различные политики доступа к bucket'ам:

    • Policy.NONE - Публичный доступ
    • Policy.READ_ONLY - Только чтение
    • Policy.WRITE_ONLY - Только запись
    • Policy.READ_WRITE - Чтение и запись
  4. Конфигурация: Настройки хранилища в app.core.configs.app_config:

    storage_url: str  # URL MinIO сервера
    STORAGE_ACCESS_KEY: str
    STORAGE_SECRET_KEY: str

WebSocket Service

Used:

Key Notes:

Сервис предоставляет управление WebSocket соединениями с поддержкой группировки по ключам и потокобезопасности.

  1. Основные возможности:

    • Управление множественными соединениями
    • Группировка соединений по ключам
    • Отправка сообщений всем соединениям в группе
    • Потокобезопасное управление соединениями
  2. Использование WebSocket Service:

    from dishka import FromDishka
    from app.core.websockets.base import BaseConnectionManager
    from fastapi import WebSocket
    
    @router.websocket("/ws/{room_id}")
    async def websocket_endpoint(
        websocket: WebSocket,
        room_id: str,
        manager: FromDishka[BaseConnectionManager]
    ):
        await manager.accept_connection(websocket, key=room_id)
        try:
            while True:
                data = await websocket.receive_text()
                # Отправка сообщения всем в комнате
                await manager.send_json_all(
                    key=room_id,
                    data={"message": data, "room": room_id}
                )
        except Exception:
            await manager.remove_connection(websocket, key=room_id)
  3. Методы ConnectionManager:

    • accept_connection() - Принять новое соединение
    • remove_connection() - Удалить соединение
    • send_all() - Отправить bytes всем в группе
    • send_json_all() - Отправить JSON всем в группе
    • disconnect_all() - Отключить все соединения в группе

Message Brokers

Used:

Key Notes:

Проект поддерживает два типа message brokers: Kafka и Redis. Оба реализуют общий интерфейс BaseMessageBroker, что позволяет легко переключаться между ними.

  1. Поддерживаемые брокеры:

    • Kafka - Для высоконагруженных распределенных систем
    • Redis Pub/Sub - Для простых сценариев pub/sub
  2. Использование Message Broker:

    from dishka import FromDishka
    from app.core.message_brokers.base import BaseMessageBroker
    from app.core.events.event import BaseEvent
    
    @router.post("/publish")
    async def publish_event(
        broker: FromDishka[BaseMessageBroker]
    ):
        # Отправка события
        await broker.send_event(
            key="user_123",
            event=UserCreatedEvent(email="user@example.com")
        )
    
        # Или отправка произвольных данных
        await broker.send_data(
            key="user_123",
            topic="user_events",
            data={"action": "created", "user_id": 123}
        )
  3. Потребление сообщений:

    async def consume_messages(broker: BaseMessageBroker):
        async for message in broker.start_consuming(["user_events"]):
            # Обработка сообщения
            print(f"Received: {message}")
  4. Конфигурация: Настройки брокера в app.core.configs.app_config:

    BROKER_URL: str  # URL брокера (Kafka или Redis)
    GROUP_ID: str    # ID группы потребителей (для Kafka)
  5. Lifecycle: Message broker автоматически инициализируется и закрывается в lifespan функции приложения.

Logging Service

Used:

Key Notes:

  • Мы можем настроить конфигурацию structlog в app/core/log/init.py.
  • Так же добавлен в app/core/log/processors.py обработка логов.
  • Логирование интегрировано с middleware для автоматического добавления request_id.

logger можно использовать следующим образом:

import logging

logger = logging.getLogger(__name__)

logger.info('Something happened', extra={"key": "value"})
logger.error('Error occurred', exc_info=True)

Middleware

Used:

  • FastAPI middleware system
  • Custom middleware implementations

Key Notes:

  1. Core Middlewares:

    • ContextMiddleware - Добавляет уникальный ID к каждому запросу
    • LoggingMiddleware - Логирует информацию о запросах и ответах
  2. Пример Custom Middleware:

    class ContextMiddleware(BaseHTTPMiddleware):
    def __init__(self, app: ASGIApp):
        super().__init__(app)
    
    async def dispatch(self, request: Request, call_next):
        request.state.request_id = uuid4()
        with structlog.contextvars.bound_contextvars(request_id=str(request.state.request_id)):
            return await call_next(request)
  3. Регистрация Middleware:

     from app.core.middlewares import ContextMiddleware, LoggingMiddleware
    
     app = FastAPI()
     app.add_middleware(LoggingMiddleware)
     app.add_middleware(ContextMiddleware)
  4. Middleware Order (порядок выполнения):

    • ContextMiddleware → CORS (если включен) → LoggingMiddleware → Application
    • Важно: В FastAPI middleware выполняются в обратном порядке добавления (последний добавленный выполняется первым)
    • ContextMiddleware должен быть добавлен последним, чтобы выполниться первым и установить request_id для последующих middleware

Application Lifecycle

Startup Sequence

  1. Pre-start проверки (pre_start.py):

     @retry(
         stop=stop_after_attempt(max_tries),
         wait=wait_fixed(wait_seconds),
         before=before_log(logger, logging.INFO),  # type: ignore
         after=after_log(logger, logging.INFO),  # type: ignore
     )
     async def init(db: AsyncSession) -> None:
         try:
             await db.execute(select(1))
         except Exception as exc:
             logger.exception('database_init_error')
             raise exc
  2. Инициализация приложения (main.py):

    @asynccontextmanager
    async def lifespan(app: FastAPI):
        logger.info("Starting FastAPI")
        await pre_start()
        await init_data()
        redis_client = redis.from_url(app_config.redis_url)
        await FastAPILimiter.init(redis_client)
        message_broker = await app.state.dishka_container.get(BaseMessageBroker)
        await message_broker.start()
        yield
        await redis_client.aclose()
        await message_broker.close()
        await app.state.dishka_container.close()
        logger.info("Shutting down FastAPI")
    
    def init_app() -> FastAPI:
        app = FastAPI(
            openapi_url=f"{app_config.API_V1_STR}/openapi.json" if app_config.ENVIRONMENT in ["local", "staging"] else None,
            lifespan=lifespan,
        )
    
        configure_logging()
        container = create_container()
        setup_dishka(container=container, app=app)
    
        setup_middleware(app)
        setup_router(app)
    
        app.add_exception_handler(Exception, handle_uncown_exception)
        app.add_exception_handler(ApplicationException, handle_application_exeption)
        app.add_exception_handler(RequestValidationError, handle_validation_exeption)
    
        return app
  3. Инициализация данных (init_data.py):

    async def create_first_data(db: AsyncSession) -> None:
        roles = RolesEnum.get_all_roles()
        for base_role in roles:
            role = await db.execute(select(Role).where(Role.name==base_role.name))
            if role.scalar() is None:
                db.add(base_role)
        await db.commit()
    
    async def init_data() -> None:
        """Создание начальных данных при первом запуске."""
        async for db in get_session():
            await create_first_data(db)
            break

Создание нового модуля

1. Структура модуля

new_module/
├── __init__.py
├── models/              # Модели данных
│   ├── __init__.py
│   └── entity.py
├── repositories/        # Репозитории
│   ├── __init__.py
│   └── entity.py
├── routes/                # API endpoints
│   ├── __init__.py
│   └── v1/
│       ├── __init__.py
│       └── entity.py
├── schemas/            # Pydantic models
│   ├── __init__.py
│   └── entity.py
├── commands/           # Command handlers
│   ├── __init__.py
│   └── entity/
│       ├── __init__.py
│       └── create.py
├── queries/           # Query handlers
│   ├── __init__.py
│   └── entity/
│       ├── __init__.py
│       └── get.py
├── events/            # Event handlers
│   ├── __init__.py
│   └── entity/
│       ├── __init__.py
│       └── created.py
├── providers.py    # DI configuration
└── routers.py      # Main router module

2. Шаги создания нового модуля

  1. Создание моделей данных:

    from app.core.db import BaseModel, DateMixin
    
    class Entity(BaseModel, DateMixin):
        __tablename__ = "entities"
    
        id: Mapped[int] = mapped_column(primary_key=True)
        name: Mapped[str] = mapped_column(String(50))
  2. Создание репозитория:

    class EntityRepository:
        session: AsyncSession
    
        async def create(self, entity: Entity) -> None:
            self.add(entity)
  3. Настройка DI:

    # providers.py
    from dishka import Provider, Scope, provide
    
    class ModuleProvider(Provider):
        scope = Scope.REQUEST
        
        @provide
        def entity_repository(self, session: AsyncSession) -> EntityRepository:
            return EntityRepository(session=session)
  4. Создание API endpoints:

    router = APIRouter(prefix="/api/v1/entities")
    
    @router.post("/")
    async def create_entity(
        data: EntityCreate,
        service: FromDishka[EntityService],
    ) -> EntityResponse:
        entity = await service.create(data)
        return EntityResponse.model_validate(entity)
  5. Регистрация модуля:

    # В app/core/di/container.py
    from app.new_module.providers import NewModuleProvider
    
    def create_container(*app_providers: Provider) -> AsyncContainer:
        providers = [
            *get_core_providers(),
            NewModuleProvider(),  # Добавляем providers нового модуля
        ]
        return make_async_container(*providers, *app_providers)
    
    # В app/main.py
    from app.new_module.routers import router_v1 as new_module_router_v1
    
    def setup_router(app: FastAPI) -> None:
        app.include_router(auth_router_v1, prefix=app_config.API_V1_STR)
        app.include_router(new_module_router_v1, prefix=app_config.API_V1_STR)
    

3. Лучшие практики

  1. Организация кода:

    • Следуйте принципу единой ответственности
    • Разделяйте слои абстракции
    • Используйте типизацию
  2. Именование:

    • Модули: существительные во множественном числе (users, orders)
    • Команды: глаголы (create_user, update_order)
    • События: прошедшее время (user_created, order_updated)
  3. Тестирование:

    • Создавайте тесты одновременно с кодом
    • Следуйте структуре модуля в тестах
    • Используйте фабрики для создания тестовых данных

About

Шаблон для приложения FastAPI RESTful, построенного на принципах модульного монолита и многоуровневой архитектуры

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages