From d8cf08df91c244e17c5c529b2561392fedb24fe9 Mon Sep 17 00:00:00 2001 From: pablohashescobar Date: Mon, 2 Dec 2024 13:44:15 +0530 Subject: [PATCH 1/5] chore: add issue and issue description version --- ...emove_issueversion_description_and_more.py | 59 ++++++++++++++ apiserver/plane/db/models/issue.py | 77 ++++++++++--------- 2 files changed, 98 insertions(+), 38 deletions(-) create mode 100644 apiserver/plane/db/migrations/0087_remove_issueversion_description_and_more.py diff --git a/apiserver/plane/db/migrations/0087_remove_issueversion_description_and_more.py b/apiserver/plane/db/migrations/0087_remove_issueversion_description_and_more.py new file mode 100644 index 00000000000..b88364c5fae --- /dev/null +++ b/apiserver/plane/db/migrations/0087_remove_issueversion_description_and_more.py @@ -0,0 +1,59 @@ +# Generated by Django 4.2.15 on 2024-12-02 06:34 + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion +import django.utils.timezone +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ('db', '0086_issueversion_alter_teampage_unique_together_and_more'), + ] + + operations = [ + migrations.RemoveField( + model_name='issueversion', + name='description', + ), + migrations.RemoveField( + model_name='issueversion', + name='description_html', + ), + migrations.RemoveField( + model_name='issueversion', + name='description_stripped', + ), + migrations.AddField( + model_name='issueversion', + name='activity', + field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='versions', to='db.issueactivity'), + ), + migrations.CreateModel( + name='IssueDescriptionVersion', + fields=[ + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='Created At')), + ('updated_at', models.DateTimeField(auto_now=True, verbose_name='Last Modified At')), + ('deleted_at', models.DateTimeField(blank=True, null=True, verbose_name='Deleted At')), + ('id', models.UUIDField(db_index=True, default=uuid.uuid4, editable=False, primary_key=True, serialize=False, unique=True)), + ('description_binary', models.BinaryField(null=True)), + ('description_html', models.TextField(blank=True, default='

')), + ('description_stripped', models.TextField(blank=True, null=True)), + ('description_json', models.JSONField(blank=True, default=dict)), + ('last_saved_at', models.DateTimeField(default=django.utils.timezone.now)), + ('created_by', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='%(class)s_created_by', to=settings.AUTH_USER_MODEL, verbose_name='Created By')), + ('issue', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='description_versions', to='db.issue')), + ('owned_by', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='description_versions', to=settings.AUTH_USER_MODEL)), + ('project', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='project_%(class)s', to='db.project')), + ('updated_by', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='%(class)s_updated_by', to=settings.AUTH_USER_MODEL, verbose_name='Last Modified By')), + ('workspace', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='workspace_%(class)s', to='db.workspace')), + ], + options={ + 'verbose_name': 'Issue Description Version', + 'verbose_name_plural': 'Issue Description Versions', + 'db_table': 'issue_description_versions', + }, + ), + ] diff --git a/apiserver/plane/db/models/issue.py b/apiserver/plane/db/models/issue.py index e50dbe7ce82..60277aa6d18 100644 --- a/apiserver/plane/db/models/issue.py +++ b/apiserver/plane/db/models/issue.py @@ -660,11 +660,15 @@ def __str__(self): class IssueVersion(ProjectBaseModel): - issue = models.ForeignKey( - "db.Issue", - on_delete=models.CASCADE, + activity = models.ForeignKey( + "db.IssueActivity", + on_delete=models.SET_NULL, + null=True, related_name="versions", ) + issue = models.ForeignKey( + "db.Issue", on_delete=models.CASCADE, related_name="versions" + ) PRIORITY_CHOICES = ( ("urgent", "Urgent"), ("high", "High"), @@ -676,9 +680,6 @@ class IssueVersion(ProjectBaseModel): state = models.UUIDField(blank=True, null=True) estimate_point = models.UUIDField(blank=True, null=True) name = models.CharField(max_length=255, verbose_name="Issue Name") - description = models.JSONField(blank=True, default=dict) - description_html = models.TextField(blank=True, default="

") - description_stripped = models.TextField(blank=True, null=True) description_binary = models.BinaryField(null=True) priority = models.CharField( max_length=30, @@ -688,9 +689,7 @@ class IssueVersion(ProjectBaseModel): ) start_date = models.DateField(null=True, blank=True) target_date = models.DateField(null=True, blank=True) - sequence_id = models.IntegerField( - default=1, verbose_name="Issue Sequence ID" - ) + sequence_id = models.IntegerField(default=1, verbose_name="Issue Sequence ID") sort_order = models.FloatField(default=65535) completed_at = models.DateTimeField(null=True) archived_at = models.DateField(null=True) @@ -700,25 +699,10 @@ class IssueVersion(ProjectBaseModel): type = models.UUIDField(blank=True, null=True) last_saved_at = models.DateTimeField(default=timezone.now) owned_by = models.UUIDField() - assignees = ArrayField( - models.UUIDField(), - blank=True, - default=list, - ) - labels = ArrayField( - models.UUIDField(), - blank=True, - default=list, - ) - cycle = models.UUIDField( - null=True, - blank=True, - ) - modules = ArrayField( - models.UUIDField(), - blank=True, - default=list, - ) + assignees = ArrayField(models.UUIDField(), blank=True, default=list) + labels = ArrayField(models.UUIDField(), blank=True, default=list) + cycle = models.UUIDField(null=True, blank=True) + modules = ArrayField(models.UUIDField(), blank=True, default=list) properties = models.JSONField(default=dict) meta = models.JSONField(default=dict) @@ -741,16 +725,14 @@ def log_issue_version(cls, issue, user): Module = apps.get_model("db.Module") CycleIssue = apps.get_model("db.CycleIssue") - cycle_issue = CycleIssue.objects.filter( - issue=issue, - ).first() + cycle_issue = CycleIssue.objects.filter(issue=issue).first() cls.objects.create( issue=issue, - parent=issue.parent, - state=issue.state, + parent=issue.parent_id, + state=issue.state_id, point=issue.point, - estimate_point=issue.estimate_point, + estimate_point=issue.estimate_point_id, name=issue.name, description=issue.description, description_html=issue.description_html, @@ -766,17 +748,36 @@ def log_issue_version(cls, issue, user): is_draft=issue.is_draft, external_source=issue.external_source, external_id=issue.external_id, - type=issue.type, + type=issue.type_id, last_saved_at=issue.last_saved_at, assignees=issue.assignees, labels=issue.labels, cycle=cycle_issue.cycle if cycle_issue else None, - modules=Module.objects.filter(issue=issue).values_list( - "id", flat=True - ), + modules=Module.objects.filter(issue=issue).values_list("id", flat=True), owned_by=user, ) return True except Exception as e: log_exception(e) return False + + +class IssueDescriptionVersion(ProjectBaseModel): + issue = models.ForeignKey( + "db.Issue", on_delete=models.CASCADE, related_name="description_versions" + ) + description_binary = models.BinaryField(null=True) + description_html = models.TextField(blank=True, default="

") + description_stripped = models.TextField(blank=True, null=True) + description_json = models.JSONField(default=dict, blank=True) + last_saved_at = models.DateTimeField(default=timezone.now) + owned_by = models.ForeignKey( + settings.AUTH_USER_MODEL, + on_delete=models.CASCADE, + related_name="description_versions", + ) + + class Meta: + verbose_name = "Issue Description Version" + verbose_name_plural = "Issue Description Versions" + db_table = "issue_description_versions" From 7cfb1f8ace04a0b413d2bb0dfd36f6e5313a2d2b Mon Sep 17 00:00:00 2001 From: gurusainath Date: Tue, 10 Dec 2024 16:10:13 +0530 Subject: [PATCH 2/5] chore: issue and issue description versioning commands --- apiserver/plane/bgtasks/issue_version_task.py | 419 ++++++++++++++++++ .../create_issue_description_version.py | 23 + .../commands/create_issue_version.py | 19 + ...emove_issueversion_description_and_more.py | 35 +- apiserver/plane/db/models/__init__.py | 2 + apiserver/plane/db/models/issue.py | 74 +++- apiserver/plane/db/models/user.py | 12 + 7 files changed, 558 insertions(+), 26 deletions(-) create mode 100644 apiserver/plane/bgtasks/issue_version_task.py create mode 100644 apiserver/plane/db/management/commands/create_issue_description_version.py create mode 100644 apiserver/plane/db/management/commands/create_issue_version.py diff --git a/apiserver/plane/bgtasks/issue_version_task.py b/apiserver/plane/bgtasks/issue_version_task.py new file mode 100644 index 00000000000..c910473d7ab --- /dev/null +++ b/apiserver/plane/bgtasks/issue_version_task.py @@ -0,0 +1,419 @@ +# Python imports +import json +from typing import Optional, List, Dict + +# Django imports +from django.utils import timezone +from django.db import transaction + +# Third party imports +from celery import shared_task + +# Module imports +from plane.db.models import ( + Issue, + IssueVersion, + IssueDescriptionVersion, + ProjectMember, + CycleIssue, + ModuleIssue, + IssueActivity, + IssueAssignee, + IssueLabel, +) +from plane.utils.exception_logger import log_exception + + +@shared_task +def issue_task(updated_issue, issue_id, user_id): + try: + print("====== Issue detail update ======") + current_issue = json.loads(updated_issue) if updated_issue else {} + issue = Issue.objects.get(id=issue_id) + + updated_current_issue = {} + for key, value in current_issue.items(): + if getattr(issue, key) != value: + updated_current_issue[key] = value + + if updated_current_issue: + issue_version = ( + IssueVersion.objects.filter(issue_id=issue_id) + .order_by("-last_saved_at") + .first() + ) + + if ( + issue_version + and str(issue_version.owned_by) == str(user_id) + and (timezone.now() - issue_version.last_saved_at).total_seconds() + <= 600 + ): + for key, value in updated_current_issue.items(): + setattr(issue_version, key, value) + issue_version.last_saved_at = timezone.now() + issue_version.save( + update_fields=list(updated_current_issue.keys()) + ["last_saved_at"] + ) + else: + IssueVersion.log_issue_version(issue, user_id) + + return + except Issue.DoesNotExist: + return + except Exception as e: + log_exception(e) + return + + +@shared_task +def issue_description_task(updated_issue, issue_id, user_id): + try: + print("====== Issue description update ======") + current_issue = json.loads(updated_issue) if updated_issue else {} + issue = Issue.objects.get(id=issue_id) + + current_issue_description_html = current_issue.get("description_html") + issue_description_html = issue.description_html + + if current_issue_description_html != issue_description_html: + issue_description_version = ( + IssueVersion.objects.filter(issue_id=issue_id) + .order_by("-last_saved_at") + .first() + ) + + current_issue_description_json = current_issue.get("description") + current_issue_description_html = current_issue.get("description_html") + current_issue_description_binary = current_issue.get("description_binary") + current_issue_description_stripped = current_issue.get( + "description_stripped" + ) + + if ( + issue_description_version + and str(issue_description_version.owned_by) == str(user_id) + and ( + timezone.now() - issue_description_version.last_saved_at + ).total_seconds() + <= 600 + ): + issue_description_version.description_json = ( + current_issue_description_json + ) + issue_description_version.description_html = ( + current_issue_description_html + ) + issue_description_version.description_binary = ( + current_issue_description_binary + ) + issue_description_version.description_stripped = ( + current_issue_description_stripped + ) + issue_description_version.last_saved_at = timezone.now() + issue_description_version.save( + update_fields=[ + "description_json", + "description_html", + "description_binary", + "description_stripped", + "last_saved_at", + ] + ) + else: + IssueDescriptionVersion.log_issue_description_version( + current_issue, user_id + ) + + return + except Issue.DoesNotExist: + return + except Exception as e: + log_exception(e) + return + + +@shared_task +def issue_versioning_task(updated_issue, issue_id, user_id): + try: + issue_task.delay(updated_issue, issue_id, user_id) + issue_description_task.delay(updated_issue, issue_id, user_id) + except Exception as e: + log_exception(e) + return + + +def get_owner_id(issue: Issue) -> Optional[int]: + """Get the owner ID of the issue""" + + if issue.updated_by_id: + return issue.updated_by_id + + if issue.created_by_id: + return issue.created_by_id + + # Find project admin as fallback + project_member = ProjectMember.objects.filter( + project_id=issue.project_id, + role=20, # Admin role + ).first() + + return project_member.member_id if project_member else None + + +# ============ Issue versioning management task starts ============ +def get_related_data(issue_ids: List[int]) -> Dict: + """Get related data for the given issue IDs""" + + cycle_issues = { + ci.issue_id: ci.cycle_id + for ci in CycleIssue.objects.filter(issue_id__in=issue_ids) + } + + assignees = { + issue_id: list(assignee_ids) + for issue_id, assignee_ids in ( + IssueAssignee.objects.filter(issue_id__in=issue_ids) + .values_list("issue_id", "assignee_id") + .order_by("issue_id") + ).groupby("issue_id") + } + + labels = { + issue_id: list(label_ids) + for issue_id, label_ids in ( + IssueLabel.objects.filter(issue_id__in=issue_ids) + .values_list("issue_id", "label_id") + .order_by("issue_id") + ).groupby("issue_id") + } + + modules = { + issue_id: list(module_ids) + for issue_id, module_ids in ( + ModuleIssue.objects.filter(issue_id__in=issue_ids) + .values_list("issue_id", "module_id") + .order_by("issue_id") + ).groupby("issue_id") + } + + latest_activities = { + ia.issue_id: ia.id + for ia in IssueActivity.objects.filter(issue_id__in=issue_ids) + .order_by("issue_id", "-created_at") + .distinct("issue_id") + } + + return { + "cycle_issues": cycle_issues, + "assignees": assignees, + "labels": labels, + "modules": modules, + "activities": latest_activities, + } + + +def create_issue_version(issue: Issue, related_data: Dict) -> Optional[IssueVersion]: + """Create IssueVersion object from the given issue and related data""" + + if not issue.workspace_id or not issue.project_id: + print(f"Skipping issue {issue.id} - missing workspace_id or project_id") + return None + + owned_by_id = get_owner_id(issue) + if owned_by_id is None: + print(f"Skipping issue {issue.id} - missing owned_by") + return None + + return IssueVersion( + workspace_id=issue.workspace_id, + project_id=issue.project_id, + created_by_id=issue.created_by_id, + updated_by_id=issue.updated_by_id, + owned_by_id=owned_by_id, + last_saved_at=timezone.now(), + activity_id=related_data["activities"].get(issue.id), + properties=getattr(issue, "properties", {}), + meta=getattr(issue, "meta", {}), + issue_id=issue.id, + parent=issue.parent_id, + state=issue.state_id, + point=issue.point, + estimate_point=issue.estimate_point_id, + name=issue.name, + priority=issue.priority, + start_date=issue.start_date, + target_date=issue.target_date, + assignees=related_data["assignees"].get(issue.id, []), + sequence_id=issue.sequence_id, + labels=related_data["labels"].get(issue.id, []), + sort_order=issue.sort_order, + completed_at=issue.completed_at, + archived_at=issue.archived_at, + is_draft=issue.is_draft, + external_source=issue.external_source, + external_id=issue.external_id, + type=issue.type_id, + cycle=related_data["cycle_issues"].get(issue.id), + modules=related_data["modules"].get(issue.id, []), + ) + + +@shared_task +def issue_management_command_task(batch_size=5000, offset=0, countdown=300): + """Task to create IssueVersion records for existing Issues in batches""" + + try: + with transaction.atomic(): + # Get processed issues and total count + processed_issue_ids = set( + IssueVersion.objects.values_list("issue_id", flat=True).distinct() + ) + + base_query = Issue.objects.exclude(id__in=processed_issue_ids) + total_issues_count = base_query.count() + + if total_issues_count == 0: + return + + end_offset = min(offset + batch_size, total_issues_count) + + # Get issues batch with optimized queries + issues_batch = list( + base_query.order_by("created_at") + .select_related("workspace", "project") + .all()[offset:end_offset] + ) + + if not issues_batch: + return + + # Get all related data in bulk + issue_ids = [issue.id for issue in issues_batch] + related_data = get_related_data(issue_ids) + + issue_versions = [] + for issue in issues_batch: + version = create_issue_version(issue, related_data) + if version: + issue_versions.append(version) + + # Bulk create versions + if issue_versions: + IssueVersion.objects.bulk_create(issue_versions, batch_size=1000) + + # Schedule the next batch if there are more workspaces to process + if end_offset < total_issues_count: + issue_management_command_task.apply_async( + args=[batch_size, end_offset], countdown=countdown + ) + return + except Exception as e: + log_exception(e) + return + + +@shared_task +def schedule_issue_management_command_task(batch_size=5000, countdown=300): + issue_management_command_task.delay(batch_size=batch_size, countdown=countdown) + + +# ============ Issue versioning management task ends ============ + + +# ============ Issue description versioning management task starts ============ +@shared_task +def issue_description_management_command_task(batch_size=5000, offset=0, countdown=300): + """Task to create IssueDescriptionVersion records for existing Issues in batches""" + + try: + with transaction.atomic(): + # Get processed issue IDs and total count + processed_ids = set( + IssueDescriptionVersion.objects.values_list( + "issue_id", flat=True + ).distinct() + ) + + base_query = Issue.objects.exclude(id__in=processed_ids) + total_issues_count = base_query.count() + + if total_issues_count == 0: + return + + # Calculate batch range + end_offset = min(offset + batch_size, total_issues_count) + + # Fetch issues with related data + issues_batch = ( + base_query.order_by("created_at") + .select_related("workspace", "project") + .only( + "id", + "workspace_id", + "project_id", + "created_by_id", + "updated_by_id", + "description_binary", + "description_html", + "description_stripped", + "description", + )[offset:end_offset] + ) + + if not issues_batch: + return + + version_objects = [] + for issue in issues_batch: + # Validate required fields + if not issue.workspace_id or not issue.project_id: + print(f"Skipping {issue.id} - missing workspace_id or project_id") + continue + + # Determine owned_by_id + owned_by_id = get_owner_id(issue) + if owned_by_id is None: + print(f"Skipping issue {issue.id} - missing owned_by") + continue + + # Create version object + version_objects.append( + IssueDescriptionVersion( + workspace_id=issue.workspace_id, + project_id=issue.project_id, + created_by_id=issue.created_by_id, + updated_by_id=issue.updated_by_id, + owned_by_id=owned_by_id, + last_saved_at=timezone.now(), + issue_id=issue.id, + description_binary=issue.description_binary, + description_html=issue.description_html, + description_stripped=issue.description_stripped, + description_json=issue.description, + ) + ) + + # Bulk create version objects + if version_objects: + IssueDescriptionVersion.objects.bulk_create(version_objects) + + # Schedule next batch if needed + if end_offset < total_issues_count: + issue_description_management_command_task.apply_async( + args=[batch_size, end_offset], countdown=countdown + ) + return + except Exception as e: + log_exception(e) + return + + +@shared_task +def schedule_issue_description_management_command_task(batch_size=5000, countdown=300): + issue_description_management_command_task.delay( + batch_size=batch_size, countdown=countdown + ) + + +# ============ Issue description versioning management task nds ============ diff --git a/apiserver/plane/db/management/commands/create_issue_description_version.py b/apiserver/plane/db/management/commands/create_issue_description_version.py new file mode 100644 index 00000000000..0de112195f3 --- /dev/null +++ b/apiserver/plane/db/management/commands/create_issue_description_version.py @@ -0,0 +1,23 @@ +# Django imports +from django.core.management.base import BaseCommand + +# Module imports +from plane.bgtasks.issue_version_task import ( + schedule_issue_description_management_command_task, +) + + +class Command(BaseCommand): + help = "Creates IssueDescriptionVersion records for existing Issues in batches" + + def handle(self, *args, **options): + batch_size = input("Enter the batch size: ") + batch_countdown = input("Enter the batch countdown: ") + + schedule_issue_description_management_command_task.delay( + batch_size=int(batch_size), countdown=int(batch_countdown) + ) + + self.stdout.write( + self.style.SUCCESS("Successfully created issue description version task") + ) diff --git a/apiserver/plane/db/management/commands/create_issue_version.py b/apiserver/plane/db/management/commands/create_issue_version.py new file mode 100644 index 00000000000..6c3a612e077 --- /dev/null +++ b/apiserver/plane/db/management/commands/create_issue_version.py @@ -0,0 +1,19 @@ +# Django imports +from django.core.management.base import BaseCommand + +# Module imports +from plane.bgtasks.issue_version_task import schedule_issue_management_command_task + + +class Command(BaseCommand): + help = "Creates IssueVersion records for existing Issues in batches" + + def handle(self, *args, **options): + batch_size = input("Enter the batch size: ") + batch_countdown = input("Enter the batch countdown: ") + + schedule_issue_management_command_task.delay( + batch_size=int(batch_size), countdown=int(batch_countdown) + ) + + self.stdout.write(self.style.SUCCESS("Successfully created issue version task")) diff --git a/apiserver/plane/db/migrations/0087_remove_issueversion_description_and_more.py b/apiserver/plane/db/migrations/0087_remove_issueversion_description_and_more.py index b88364c5fae..866462a8227 100644 --- a/apiserver/plane/db/migrations/0087_remove_issueversion_description_and_more.py +++ b/apiserver/plane/db/migrations/0087_remove_issueversion_description_and_more.py @@ -1,9 +1,11 @@ -# Generated by Django 4.2.15 on 2024-12-02 06:34 +# Generated by Django 4.2.16 on 2024-12-09 10:03 from django.conf import settings +import django.core.validators from django.db import migrations, models import django.db.models.deletion import django.utils.timezone +import plane.db.models.user import uuid @@ -18,6 +20,10 @@ class Migration(migrations.Migration): model_name='issueversion', name='description', ), + migrations.RemoveField( + model_name='issueversion', + name='description_binary', + ), migrations.RemoveField( model_name='issueversion', name='description_html', @@ -31,6 +37,31 @@ class Migration(migrations.Migration): name='activity', field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='versions', to='db.issueactivity'), ), + migrations.AddField( + model_name='issueversion', + name='point', + field=models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0), django.core.validators.MaxValueValidator(12)]), + ), + migrations.AddField( + model_name='profile', + name='is_mobile_onboarded', + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name='profile', + name='mobile_onboarding_step', + field=models.JSONField(default=plane.db.models.user.get_mobile_default_onboarding), + ), + migrations.AddField( + model_name='profile', + name='mobile_timezone_auto_set', + field=models.BooleanField(default=False), + ), + migrations.AlterField( + model_name='issueversion', + name='owned_by', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='issue_versions', to=settings.AUTH_USER_MODEL), + ), migrations.CreateModel( name='IssueDescriptionVersion', fields=[ @@ -45,7 +76,7 @@ class Migration(migrations.Migration): ('last_saved_at', models.DateTimeField(default=django.utils.timezone.now)), ('created_by', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='%(class)s_created_by', to=settings.AUTH_USER_MODEL, verbose_name='Created By')), ('issue', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='description_versions', to='db.issue')), - ('owned_by', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='description_versions', to=settings.AUTH_USER_MODEL)), + ('owned_by', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='issue_description_versions', to=settings.AUTH_USER_MODEL)), ('project', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='project_%(class)s', to='db.project')), ('updated_by', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='%(class)s_updated_by', to=settings.AUTH_USER_MODEL, verbose_name='Last Modified By')), ('workspace', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='workspace_%(class)s', to='db.workspace')), diff --git a/apiserver/plane/db/models/__init__.py b/apiserver/plane/db/models/__init__.py index 36810956c27..d37f29a354a 100644 --- a/apiserver/plane/db/models/__init__.py +++ b/apiserver/plane/db/models/__init__.py @@ -41,6 +41,8 @@ IssueSequence, IssueSubscriber, IssueVote, + IssueVersion, + IssueDescriptionVersion, ) from .module import Module, ModuleIssue, ModuleLink, ModuleMember, ModuleUserProperties from .notification import EmailNotificationLog, Notification, UserNotificationPreference diff --git a/apiserver/plane/db/models/issue.py b/apiserver/plane/db/models/issue.py index 60277aa6d18..44a194863be 100644 --- a/apiserver/plane/db/models/issue.py +++ b/apiserver/plane/db/models/issue.py @@ -660,15 +660,6 @@ def __str__(self): class IssueVersion(ProjectBaseModel): - activity = models.ForeignKey( - "db.IssueActivity", - on_delete=models.SET_NULL, - null=True, - related_name="versions", - ) - issue = models.ForeignKey( - "db.Issue", on_delete=models.CASCADE, related_name="versions" - ) PRIORITY_CHOICES = ( ("urgent", "Urgent"), ("high", "High"), @@ -676,11 +667,17 @@ class IssueVersion(ProjectBaseModel): ("low", "Low"), ("none", "None"), ) + + issue = models.ForeignKey( + "db.Issue", on_delete=models.CASCADE, related_name="versions" + ) parent = models.UUIDField(blank=True, null=True) state = models.UUIDField(blank=True, null=True) + point = models.IntegerField( + validators=[MinValueValidator(0), MaxValueValidator(12)], null=True, blank=True + ) estimate_point = models.UUIDField(blank=True, null=True) name = models.CharField(max_length=255, verbose_name="Issue Name") - description_binary = models.BinaryField(null=True) priority = models.CharField( max_length=30, choices=PRIORITY_CHOICES, @@ -689,7 +686,9 @@ class IssueVersion(ProjectBaseModel): ) start_date = models.DateField(null=True, blank=True) target_date = models.DateField(null=True, blank=True) + assignees = ArrayField(models.UUIDField(), blank=True, default=list) sequence_id = models.IntegerField(default=1, verbose_name="Issue Sequence ID") + labels = ArrayField(models.UUIDField(), blank=True, default=list) sort_order = models.FloatField(default=65535) completed_at = models.DateTimeField(null=True) archived_at = models.DateField(null=True) @@ -697,14 +696,22 @@ class IssueVersion(ProjectBaseModel): external_source = models.CharField(max_length=255, null=True, blank=True) external_id = models.CharField(max_length=255, blank=True, null=True) type = models.UUIDField(blank=True, null=True) - last_saved_at = models.DateTimeField(default=timezone.now) - owned_by = models.UUIDField() - assignees = ArrayField(models.UUIDField(), blank=True, default=list) - labels = ArrayField(models.UUIDField(), blank=True, default=list) cycle = models.UUIDField(null=True, blank=True) modules = ArrayField(models.UUIDField(), blank=True, default=list) - properties = models.JSONField(default=dict) - meta = models.JSONField(default=dict) + activity = models.ForeignKey( + "db.IssueActivity", + on_delete=models.SET_NULL, + null=True, + related_name="versions", + ) + properties = models.JSONField(default=dict) # issue properties + meta = models.JSONField(default=dict) # issue meta + last_saved_at = models.DateTimeField(default=timezone.now) + owned_by = models.ForeignKey( + settings.AUTH_USER_MODEL, + on_delete=models.CASCADE, + related_name="issue_versions", + ) class Meta: verbose_name = "Issue Version" @@ -734,14 +741,12 @@ def log_issue_version(cls, issue, user): point=issue.point, estimate_point=issue.estimate_point_id, name=issue.name, - description=issue.description, - description_html=issue.description_html, - description_stripped=issue.description_stripped, - description_binary=issue.description_binary, priority=issue.priority, start_date=issue.start_date, target_date=issue.target_date, + assignees=issue.assignees, sequence_id=issue.sequence_id, + labels=issue.labels, sort_order=issue.sort_order, completed_at=issue.completed_at, archived_at=issue.archived_at, @@ -749,11 +754,12 @@ def log_issue_version(cls, issue, user): external_source=issue.external_source, external_id=issue.external_id, type=issue.type_id, - last_saved_at=issue.last_saved_at, - assignees=issue.assignees, - labels=issue.labels, cycle=cycle_issue.cycle if cycle_issue else None, modules=Module.objects.filter(issue=issue).values_list("id", flat=True), + # activity=issue.activity_id, + properties={}, + meta={}, + last_saved_at=timezone.now(), owned_by=user, ) return True @@ -774,10 +780,30 @@ class IssueDescriptionVersion(ProjectBaseModel): owned_by = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, - related_name="description_versions", + related_name="issue_description_versions", ) class Meta: verbose_name = "Issue Description Version" verbose_name_plural = "Issue Description Versions" db_table = "issue_description_versions" + + @classmethod + def log_issue_description_version(cls, issue, user): + try: + """ + Log the issue description version + """ + cls.objects.create( + issue=issue, + description_binary=issue.description_binary, + description_html=issue.description_html, + description_stripped=issue.description_stripped, + description_json=issue.description, + last_saved_at=timezone.now(), + owned_by=user, + ) + return True + except Exception as e: + log_exception(e) + return False diff --git a/apiserver/plane/db/models/user.py b/apiserver/plane/db/models/user.py index 34a86a2519e..001889875f5 100644 --- a/apiserver/plane/db/models/user.py +++ b/apiserver/plane/db/models/user.py @@ -26,6 +26,14 @@ def get_default_onboarding(): } +def get_mobile_default_onboarding(): + return { + "profile_complete": False, + "workspace_create": False, + "workspace_join": False, + } + + class User(AbstractBaseUser, PermissionsMixin): id = models.UUIDField( default=uuid.uuid4, unique=True, editable=False, db_index=True, primary_key=True @@ -178,6 +186,10 @@ class Profile(TimeAuditModel): billing_address = models.JSONField(null=True) has_billing_address = models.BooleanField(default=False) company_name = models.CharField(max_length=255, blank=True) + # mobile + is_mobile_onboarded = models.BooleanField(default=False) + mobile_onboarding_step = models.JSONField(default=get_mobile_default_onboarding) + mobile_timezone_auto_set = models.BooleanField(default=False) class Meta: verbose_name = "Profile" From b4d008d4bb8097891c61140ffa4cfe760f1eb9fd Mon Sep 17 00:00:00 2001 From: gurusainath Date: Tue, 10 Dec 2024 17:59:44 +0530 Subject: [PATCH 3/5] chore: updated the logic and added sync tasks in celery imports --- .../bgtasks/issue_description_version_sync.py | 115 +++++ .../bgtasks/issue_description_version_task.py | 85 ++++ apiserver/plane/bgtasks/issue_version_sync.py | 250 ++++++++++ apiserver/plane/bgtasks/issue_version_task.py | 437 +++--------------- ...n.py => sync_issue_description_version.py} | 6 +- ...issue_version.py => sync_issue_version.py} | 4 +- apiserver/plane/settings/common.py | 3 + 7 files changed, 512 insertions(+), 388 deletions(-) create mode 100644 apiserver/plane/bgtasks/issue_description_version_sync.py create mode 100644 apiserver/plane/bgtasks/issue_description_version_task.py create mode 100644 apiserver/plane/bgtasks/issue_version_sync.py rename apiserver/plane/db/management/commands/{create_issue_description_version.py => sync_issue_description_version.py} (77%) rename apiserver/plane/db/management/commands/{create_issue_version.py => sync_issue_version.py} (78%) diff --git a/apiserver/plane/bgtasks/issue_description_version_sync.py b/apiserver/plane/bgtasks/issue_description_version_sync.py new file mode 100644 index 00000000000..c259ff736b6 --- /dev/null +++ b/apiserver/plane/bgtasks/issue_description_version_sync.py @@ -0,0 +1,115 @@ +# Python imports +from typing import Optional + +# Django imports +from django.utils import timezone +from django.db import transaction + +# Third party imports +from celery import shared_task + +# Module imports +from plane.db.models import Issue, IssueDescriptionVersion, ProjectMember +from plane.utils.exception_logger import log_exception + + +def get_owner_id(issue: Issue) -> Optional[int]: + """Get the owner ID of the issue""" + + if issue.updated_by_id: + return issue.updated_by_id + + if issue.created_by_id: + return issue.created_by_id + + # Find project admin as fallback + project_member = ProjectMember.objects.filter( + project_id=issue.project_id, + role=20, # Admin role + ).first() + + return project_member.member_id if project_member else None + + +@shared_task +def sync_issue_description_version(batch_size=5000, offset=0, countdown=300): + """Task to create IssueDescriptionVersion records for existing Issues in batches""" + try: + with transaction.atomic(): + base_query = Issue.objects + total_issues_count = base_query.count() + + if total_issues_count == 0: + return + + # Calculate batch range + end_offset = min(offset + batch_size, total_issues_count) + + # Fetch issues with related data + issues_batch = ( + base_query.order_by("created_at") + .select_related("workspace", "project") + .only( + "id", + "workspace_id", + "project_id", + "created_by_id", + "updated_by_id", + "description_binary", + "description_html", + "description_stripped", + "description", + )[offset:end_offset] + ) + + if not issues_batch: + return + + version_objects = [] + for issue in issues_batch: + # Validate required fields + if not issue.workspace_id or not issue.project_id: + print(f"Skipping {issue.id} - missing workspace_id or project_id") + continue + + # Determine owned_by_id + owned_by_id = get_owner_id(issue) + if owned_by_id is None: + print(f"Skipping issue {issue.id} - missing owned_by") + continue + + # Create version object + version_objects.append( + IssueDescriptionVersion( + workspace_id=issue.workspace_id, + project_id=issue.project_id, + created_by_id=issue.created_by_id, + updated_by_id=issue.updated_by_id, + owned_by_id=owned_by_id, + last_saved_at=timezone.now(), + issue_id=issue.id, + description_binary=issue.description_binary, + description_html=issue.description_html, + description_stripped=issue.description_stripped, + description_json=issue.description, + ) + ) + + # Bulk create version objects + if version_objects: + IssueDescriptionVersion.objects.bulk_create(version_objects) + + # Schedule next batch if needed + if end_offset < total_issues_count: + sync_issue_description_version.apply_async( + args=[batch_size, end_offset], countdown=countdown + ) + return + except Exception as e: + log_exception(e) + return + + +@shared_task +def schedule_issue_description_version(batch_size=5000, countdown=300): + sync_issue_description_version.delay(batch_size=batch_size, countdown=countdown) diff --git a/apiserver/plane/bgtasks/issue_description_version_task.py b/apiserver/plane/bgtasks/issue_description_version_task.py new file mode 100644 index 00000000000..69a2f5e992d --- /dev/null +++ b/apiserver/plane/bgtasks/issue_description_version_task.py @@ -0,0 +1,85 @@ +from celery import shared_task +from django.db import transaction +from django.utils import timezone +from typing import Optional, Dict, Any +import json + +from plane.db.models import Issue, IssueDescriptionVersion +from plane.utils.logging import log_exception + + +def should_update_existing_version( + version: IssueDescriptionVersion, user_id: str, max_time_difference: int = 600 +) -> bool: + if not version: + return + + time_difference = (timezone.now() - version.last_saved_at).total_seconds() + return ( + str(version.owned_by_id) == str(user_id) + and time_difference <= max_time_difference + ) + + +def update_existing_version( + version: IssueDescriptionVersion, description_data: Dict[str, Any] +) -> None: + version.description_json = description_data.get("description") + version.description_html = description_data.get("description_html") + version.description_binary = description_data.get("description_binary") + version.description_stripped = description_data.get("description_stripped") + version.last_saved_at = timezone.now() + + version.save( + update_fields=[ + "description_json", + "description_html", + "description_binary", + "description_stripped", + "last_saved_at", + ] + ) + + +@shared_task +def issue_description_version_task( + updated_issue: Optional[str], issue_id: str, user_id: str +) -> Optional[bool]: + try: + # Parse updated issue data + current_issue: Dict = json.loads(updated_issue) if updated_issue else {} + + # Get current issue + issue = Issue.objects.get(id=issue_id) + + # Check if description has changed + if current_issue.get("description_html") == issue.description_html: + return + + with transaction.atomic(): + # Get latest version + latest_version = ( + IssueDescriptionVersion.objects.filter(issue_id=issue_id) + .order_by("-last_saved_at") + .first() + ) + + # Determine whether to update existing or create new version + if should_update_existing_version(latest_version, user_id): + update_existing_version(latest_version, current_issue) + else: + IssueDescriptionVersion.log_issue_description_version( + current_issue, user_id + ) + + return + + except Issue.DoesNotExist: + # Issue no longer exists, skip processing + return + except json.JSONDecodeError as e: + log_exception(f"Invalid JSON for updated_issue: {e}") + return + except Exception as e: + log_exception(f"Error processing issue description version: {e}") + return diff --git a/apiserver/plane/bgtasks/issue_version_sync.py b/apiserver/plane/bgtasks/issue_version_sync.py new file mode 100644 index 00000000000..12db17c2036 --- /dev/null +++ b/apiserver/plane/bgtasks/issue_version_sync.py @@ -0,0 +1,250 @@ +# Python imports +import json +from typing import Optional, List, Dict +from uuid import UUID +from itertools import groupby + +# Django imports +from django.utils import timezone +from django.db import transaction + +# Third party imports +from celery import shared_task + +# Module imports +from plane.db.models import ( + Issue, + IssueVersion, + ProjectMember, + CycleIssue, + ModuleIssue, + IssueActivity, + IssueAssignee, + IssueLabel, +) +from plane.utils.exception_logger import log_exception + + +@shared_task +def issue_task(updated_issue, issue_id, user_id): + try: + current_issue = json.loads(updated_issue) if updated_issue else {} + issue = Issue.objects.get(id=issue_id) + + updated_current_issue = {} + for key, value in current_issue.items(): + if getattr(issue, key) != value: + updated_current_issue[key] = value + + if updated_current_issue: + issue_version = ( + IssueVersion.objects.filter(issue_id=issue_id) + .order_by("-last_saved_at") + .first() + ) + + if ( + issue_version + and str(issue_version.owned_by) == str(user_id) + and (timezone.now() - issue_version.last_saved_at).total_seconds() + <= 600 + ): + for key, value in updated_current_issue.items(): + setattr(issue_version, key, value) + issue_version.last_saved_at = timezone.now() + issue_version.save( + update_fields=list(updated_current_issue.keys()) + ["last_saved_at"] + ) + else: + IssueVersion.log_issue_version(issue, user_id) + + return + except Issue.DoesNotExist: + return + except Exception as e: + log_exception(e) + return + + +def get_owner_id(issue: Issue) -> Optional[int]: + """Get the owner ID of the issue""" + + if issue.updated_by_id: + return issue.updated_by_id + + if issue.created_by_id: + return issue.created_by_id + + # Find project admin as fallback + project_member = ProjectMember.objects.filter( + project_id=issue.project_id, + role=20, # Admin role + ).first() + + return project_member.member_id if project_member else None + + +def get_related_data(issue_ids: List[UUID]) -> Dict: + """Get related data for the given issue IDs""" + + cycle_issues = { + ci.issue_id: ci.cycle_id + for ci in CycleIssue.objects.filter(issue_id__in=issue_ids) + } + + # Get assignees with proper grouping + assignee_records = list( + IssueAssignee.objects.filter(issue_id__in=issue_ids) + .values_list("issue_id", "assignee_id") + .order_by("issue_id") + ) + assignees = {} + for issue_id, group in groupby(assignee_records, key=lambda x: x[0]): + assignees[issue_id] = [str(g[1]) for g in group] + + # Get labels with proper grouping + label_records = list( + IssueLabel.objects.filter(issue_id__in=issue_ids) + .values_list("issue_id", "label_id") + .order_by("issue_id") + ) + labels = {} + for issue_id, group in groupby(label_records, key=lambda x: x[0]): + labels[issue_id] = [str(g[1]) for g in group] + + # Get modules with proper grouping + module_records = list( + ModuleIssue.objects.filter(issue_id__in=issue_ids) + .values_list("issue_id", "module_id") + .order_by("issue_id") + ) + modules = {} + for issue_id, group in groupby(module_records, key=lambda x: x[0]): + modules[issue_id] = [str(g[1]) for g in group] + + # Get latest activities + latest_activities = {} + activities = IssueActivity.objects.filter(issue_id__in=issue_ids).order_by( + "issue_id", "-created_at" + ) + for issue_id, activities_group in groupby(activities, key=lambda x: x.issue_id): + first_activity = next(activities_group, None) + if first_activity: + latest_activities[issue_id] = first_activity.id + + return { + "cycle_issues": cycle_issues, + "assignees": assignees, + "labels": labels, + "modules": modules, + "activities": latest_activities, + } + + +def create_issue_version(issue: Issue, related_data: Dict) -> Optional[IssueVersion]: + """Create IssueVersion object from the given issue and related data""" + + try: + if not issue.workspace_id or not issue.project_id: + print(f"Skipping issue {issue.id} - missing workspace_id or project_id") + return None + + owned_by_id = get_owner_id(issue) + if owned_by_id is None: + print(f"Skipping issue {issue.id} - missing owned_by") + return None + + return IssueVersion( + workspace_id=issue.workspace_id, + project_id=issue.project_id, + created_by_id=issue.created_by_id, + updated_by_id=issue.updated_by_id, + owned_by_id=owned_by_id, + last_saved_at=timezone.now(), + activity_id=related_data["activities"].get(issue.id), + properties=getattr(issue, "properties", {}), + meta=getattr(issue, "meta", {}), + issue_id=issue.id, + parent=issue.parent_id, + state=issue.state_id, + point=issue.point, + estimate_point=issue.estimate_point_id, + name=issue.name, + priority=issue.priority, + start_date=issue.start_date, + target_date=issue.target_date, + assignees=related_data["assignees"].get(issue.id, []), + sequence_id=issue.sequence_id, + labels=related_data["labels"].get(issue.id, []), + sort_order=issue.sort_order, + completed_at=issue.completed_at, + archived_at=issue.archived_at, + is_draft=issue.is_draft, + external_source=issue.external_source, + external_id=issue.external_id, + type=issue.type_id, + cycle=related_data["cycle_issues"].get(issue.id), + modules=related_data["modules"].get(issue.id, []), + ) + except Exception as e: + log_exception(e) + return None + + +@shared_task +def sync_issue_version(batch_size=5000, offset=0, countdown=300): + """Task to create IssueVersion records for existing Issues in batches""" + + try: + with transaction.atomic(): + base_query = Issue.objects + total_issues_count = base_query.count() + + if total_issues_count == 0: + return + + print(f"Offset: {offset}") + print(f"Total Issues: {total_issues_count}") + + end_offset = min(offset + batch_size, total_issues_count) + + # Get issues batch with optimized queries + issues_batch = list( + base_query.order_by("created_at") + .select_related("workspace", "project") + .all()[offset:end_offset] + ) + + if not issues_batch: + return + + # Get all related data in bulk + issue_ids = [issue.id for issue in issues_batch] + related_data = get_related_data(issue_ids) + + issue_versions = [] + for issue in issues_batch: + version = create_issue_version(issue, related_data) + if version: + issue_versions.append(version) + + # Bulk create versions + if issue_versions: + IssueVersion.objects.bulk_create(issue_versions, batch_size=1000) + + # Schedule the next batch if there are more workspaces to process + if end_offset < total_issues_count: + sync_issue_version.apply_async( + args=[batch_size, end_offset], countdown=countdown + ) + + print(f"Processed Issues: {end_offset}") + return + except Exception as e: + log_exception(e) + return + + +@shared_task +def schedule_issue_version(batch_size=5000, countdown=300): + sync_issue_version.delay(batch_size=batch_size, countdown=countdown) diff --git a/apiserver/plane/bgtasks/issue_version_task.py b/apiserver/plane/bgtasks/issue_version_task.py index c910473d7ab..34f3f4e0305 100644 --- a/apiserver/plane/bgtasks/issue_version_task.py +++ b/apiserver/plane/bgtasks/issue_version_task.py @@ -1,6 +1,6 @@ # Python imports import json -from typing import Optional, List, Dict +from typing import Dict, Any, Optional, List # Django imports from django.utils import timezone @@ -10,410 +10,81 @@ from celery import shared_task # Module imports -from plane.db.models import ( - Issue, - IssueVersion, - IssueDescriptionVersion, - ProjectMember, - CycleIssue, - ModuleIssue, - IssueActivity, - IssueAssignee, - IssueLabel, -) +from plane.db.models import Issue, IssueVersion from plane.utils.exception_logger import log_exception -@shared_task -def issue_task(updated_issue, issue_id, user_id): - try: - print("====== Issue detail update ======") - current_issue = json.loads(updated_issue) if updated_issue else {} - issue = Issue.objects.get(id=issue_id) - - updated_current_issue = {} - for key, value in current_issue.items(): - if getattr(issue, key) != value: - updated_current_issue[key] = value - - if updated_current_issue: - issue_version = ( - IssueVersion.objects.filter(issue_id=issue_id) - .order_by("-last_saved_at") - .first() - ) - - if ( - issue_version - and str(issue_version.owned_by) == str(user_id) - and (timezone.now() - issue_version.last_saved_at).total_seconds() - <= 600 - ): - for key, value in updated_current_issue.items(): - setattr(issue_version, key, value) - issue_version.last_saved_at = timezone.now() - issue_version.save( - update_fields=list(updated_current_issue.keys()) + ["last_saved_at"] - ) - else: - IssueVersion.log_issue_version(issue, user_id) - - return - except Issue.DoesNotExist: - return - except Exception as e: - log_exception(e) - return - - -@shared_task -def issue_description_task(updated_issue, issue_id, user_id): - try: - print("====== Issue description update ======") - current_issue = json.loads(updated_issue) if updated_issue else {} - issue = Issue.objects.get(id=issue_id) - - current_issue_description_html = current_issue.get("description_html") - issue_description_html = issue.description_html - - if current_issue_description_html != issue_description_html: - issue_description_version = ( - IssueVersion.objects.filter(issue_id=issue_id) - .order_by("-last_saved_at") - .first() - ) - - current_issue_description_json = current_issue.get("description") - current_issue_description_html = current_issue.get("description_html") - current_issue_description_binary = current_issue.get("description_binary") - current_issue_description_stripped = current_issue.get( - "description_stripped" - ) - - if ( - issue_description_version - and str(issue_description_version.owned_by) == str(user_id) - and ( - timezone.now() - issue_description_version.last_saved_at - ).total_seconds() - <= 600 - ): - issue_description_version.description_json = ( - current_issue_description_json - ) - issue_description_version.description_html = ( - current_issue_description_html - ) - issue_description_version.description_binary = ( - current_issue_description_binary - ) - issue_description_version.description_stripped = ( - current_issue_description_stripped - ) - issue_description_version.last_saved_at = timezone.now() - issue_description_version.save( - update_fields=[ - "description_json", - "description_html", - "description_binary", - "description_stripped", - "last_saved_at", - ] - ) - else: - IssueDescriptionVersion.log_issue_description_version( - current_issue, user_id - ) - - return - except Issue.DoesNotExist: - return - except Exception as e: - log_exception(e) - return - - -@shared_task -def issue_versioning_task(updated_issue, issue_id, user_id): - try: - issue_task.delay(updated_issue, issue_id, user_id) - issue_description_task.delay(updated_issue, issue_id, user_id) - except Exception as e: - log_exception(e) - return - - -def get_owner_id(issue: Issue) -> Optional[int]: - """Get the owner ID of the issue""" - - if issue.updated_by_id: - return issue.updated_by_id - - if issue.created_by_id: - return issue.created_by_id - - # Find project admin as fallback - project_member = ProjectMember.objects.filter( - project_id=issue.project_id, - role=20, # Admin role - ).first() - - return project_member.member_id if project_member else None - - -# ============ Issue versioning management task starts ============ -def get_related_data(issue_ids: List[int]) -> Dict: - """Get related data for the given issue IDs""" - - cycle_issues = { - ci.issue_id: ci.cycle_id - for ci in CycleIssue.objects.filter(issue_id__in=issue_ids) - } - - assignees = { - issue_id: list(assignee_ids) - for issue_id, assignee_ids in ( - IssueAssignee.objects.filter(issue_id__in=issue_ids) - .values_list("issue_id", "assignee_id") - .order_by("issue_id") - ).groupby("issue_id") - } - - labels = { - issue_id: list(label_ids) - for issue_id, label_ids in ( - IssueLabel.objects.filter(issue_id__in=issue_ids) - .values_list("issue_id", "label_id") - .order_by("issue_id") - ).groupby("issue_id") - } - - modules = { - issue_id: list(module_ids) - for issue_id, module_ids in ( - ModuleIssue.objects.filter(issue_id__in=issue_ids) - .values_list("issue_id", "module_id") - .order_by("issue_id") - ).groupby("issue_id") - } - - latest_activities = { - ia.issue_id: ia.id - for ia in IssueActivity.objects.filter(issue_id__in=issue_ids) - .order_by("issue_id", "-created_at") - .distinct("issue_id") - } - +def get_changed_fields(current_issue: Dict[str, Any], issue: Issue) -> Dict[str, Any]: return { - "cycle_issues": cycle_issues, - "assignees": assignees, - "labels": labels, - "modules": modules, - "activities": latest_activities, + key: value + for key, value in current_issue.items() + if getattr(issue, key) != value } -def create_issue_version(issue: Issue, related_data: Dict) -> Optional[IssueVersion]: - """Create IssueVersion object from the given issue and related data""" - - if not issue.workspace_id or not issue.project_id: - print(f"Skipping issue {issue.id} - missing workspace_id or project_id") - return None +def should_update_existing_version( + version: Optional[IssueVersion], user_id: str, max_time_difference: int = 600 +) -> bool: + if not version: + return False - owned_by_id = get_owner_id(issue) - if owned_by_id is None: - print(f"Skipping issue {issue.id} - missing owned_by") - return None - - return IssueVersion( - workspace_id=issue.workspace_id, - project_id=issue.project_id, - created_by_id=issue.created_by_id, - updated_by_id=issue.updated_by_id, - owned_by_id=owned_by_id, - last_saved_at=timezone.now(), - activity_id=related_data["activities"].get(issue.id), - properties=getattr(issue, "properties", {}), - meta=getattr(issue, "meta", {}), - issue_id=issue.id, - parent=issue.parent_id, - state=issue.state_id, - point=issue.point, - estimate_point=issue.estimate_point_id, - name=issue.name, - priority=issue.priority, - start_date=issue.start_date, - target_date=issue.target_date, - assignees=related_data["assignees"].get(issue.id, []), - sequence_id=issue.sequence_id, - labels=related_data["labels"].get(issue.id, []), - sort_order=issue.sort_order, - completed_at=issue.completed_at, - archived_at=issue.archived_at, - is_draft=issue.is_draft, - external_source=issue.external_source, - external_id=issue.external_id, - type=issue.type_id, - cycle=related_data["cycle_issues"].get(issue.id), - modules=related_data["modules"].get(issue.id, []), + time_difference = (timezone.now() - version.last_saved_at).total_seconds() + return ( + str(version.owned_by_id) == str(user_id) + and time_difference <= max_time_difference ) -@shared_task -def issue_management_command_task(batch_size=5000, offset=0, countdown=300): - """Task to create IssueVersion records for existing Issues in batches""" +def update_version_fields( + version: IssueVersion, changed_fields: Dict[str, Any] +) -> List[str]: + for key, value in changed_fields.items(): + setattr(version, key, value) - try: - with transaction.atomic(): - # Get processed issues and total count - processed_issue_ids = set( - IssueVersion.objects.values_list("issue_id", flat=True).distinct() - ) - - base_query = Issue.objects.exclude(id__in=processed_issue_ids) - total_issues_count = base_query.count() - - if total_issues_count == 0: - return - - end_offset = min(offset + batch_size, total_issues_count) - - # Get issues batch with optimized queries - issues_batch = list( - base_query.order_by("created_at") - .select_related("workspace", "project") - .all()[offset:end_offset] - ) - - if not issues_batch: - return - - # Get all related data in bulk - issue_ids = [issue.id for issue in issues_batch] - related_data = get_related_data(issue_ids) - - issue_versions = [] - for issue in issues_batch: - version = create_issue_version(issue, related_data) - if version: - issue_versions.append(version) - - # Bulk create versions - if issue_versions: - IssueVersion.objects.bulk_create(issue_versions, batch_size=1000) - - # Schedule the next batch if there are more workspaces to process - if end_offset < total_issues_count: - issue_management_command_task.apply_async( - args=[batch_size, end_offset], countdown=countdown - ) - return - except Exception as e: - log_exception(e) - return + version.last_saved_at = timezone.now() + update_fields = list(changed_fields.keys()) + ["last_saved_at"] + return update_fields @shared_task -def schedule_issue_management_command_task(batch_size=5000, countdown=300): - issue_management_command_task.delay(batch_size=batch_size, countdown=countdown) - - -# ============ Issue versioning management task ends ============ - - -# ============ Issue description versioning management task starts ============ -@shared_task -def issue_description_management_command_task(batch_size=5000, offset=0, countdown=300): - """Task to create IssueDescriptionVersion records for existing Issues in batches""" - +def issue_version_task( + updated_issue: Optional[str], issue_id: str, user_id: str +) -> Optional[bool]: try: - with transaction.atomic(): - # Get processed issue IDs and total count - processed_ids = set( - IssueDescriptionVersion.objects.values_list( - "issue_id", flat=True - ).distinct() - ) + # Parse updated issue data + current_issue: Dict = json.loads(updated_issue) if updated_issue else {} - base_query = Issue.objects.exclude(id__in=processed_ids) - total_issues_count = base_query.count() + with transaction.atomic(): + # Get current issue + issue = Issue.objects.get(id=issue_id) - if total_issues_count == 0: - return + # Get changed fields + changed_fields = get_changed_fields(current_issue, issue) - # Calculate batch range - end_offset = min(offset + batch_size, total_issues_count) + if not changed_fields: + return True - # Fetch issues with related data - issues_batch = ( - base_query.order_by("created_at") - .select_related("workspace", "project") - .only( - "id", - "workspace_id", - "project_id", - "created_by_id", - "updated_by_id", - "description_binary", - "description_html", - "description_stripped", - "description", - )[offset:end_offset] + # Get latest version + latest_version = ( + IssueVersion.objects.filter(issue_id=issue_id) + .order_by("-last_saved_at") + .first() ) - if not issues_batch: - return - - version_objects = [] - for issue in issues_batch: - # Validate required fields - if not issue.workspace_id or not issue.project_id: - print(f"Skipping {issue.id} - missing workspace_id or project_id") - continue - - # Determine owned_by_id - owned_by_id = get_owner_id(issue) - if owned_by_id is None: - print(f"Skipping issue {issue.id} - missing owned_by") - continue - - # Create version object - version_objects.append( - IssueDescriptionVersion( - workspace_id=issue.workspace_id, - project_id=issue.project_id, - created_by_id=issue.created_by_id, - updated_by_id=issue.updated_by_id, - owned_by_id=owned_by_id, - last_saved_at=timezone.now(), - issue_id=issue.id, - description_binary=issue.description_binary, - description_html=issue.description_html, - description_stripped=issue.description_stripped, - description_json=issue.description, - ) - ) + # Update existing or create new version + if should_update_existing_version(latest_version, user_id): + update_fields = update_version_fields(latest_version, changed_fields) + latest_version.save(update_fields=update_fields) + else: + IssueVersion.log_issue_version(issue, user_id) - # Bulk create version objects - if version_objects: - IssueDescriptionVersion.objects.bulk_create(version_objects) + return True - # Schedule next batch if needed - if end_offset < total_issues_count: - issue_description_management_command_task.apply_async( - args=[batch_size, end_offset], countdown=countdown - ) - return + except Issue.DoesNotExist: + return None + except json.JSONDecodeError as e: + log_exception(f"Invalid JSON for updated_issue: {e}") + return False except Exception as e: - log_exception(e) - return - - -@shared_task -def schedule_issue_description_management_command_task(batch_size=5000, countdown=300): - issue_description_management_command_task.delay( - batch_size=batch_size, countdown=countdown - ) - - -# ============ Issue description versioning management task nds ============ + log_exception(f"Error processing issue version: {e}") + return False diff --git a/apiserver/plane/db/management/commands/create_issue_description_version.py b/apiserver/plane/db/management/commands/sync_issue_description_version.py similarity index 77% rename from apiserver/plane/db/management/commands/create_issue_description_version.py rename to apiserver/plane/db/management/commands/sync_issue_description_version.py index 0de112195f3..2246cfb16fd 100644 --- a/apiserver/plane/db/management/commands/create_issue_description_version.py +++ b/apiserver/plane/db/management/commands/sync_issue_description_version.py @@ -2,8 +2,8 @@ from django.core.management.base import BaseCommand # Module imports -from plane.bgtasks.issue_version_task import ( - schedule_issue_description_management_command_task, +from plane.bgtasks.issue_description_version_sync import ( + schedule_issue_description_version, ) @@ -14,7 +14,7 @@ def handle(self, *args, **options): batch_size = input("Enter the batch size: ") batch_countdown = input("Enter the batch countdown: ") - schedule_issue_description_management_command_task.delay( + schedule_issue_description_version.delay( batch_size=int(batch_size), countdown=int(batch_countdown) ) diff --git a/apiserver/plane/db/management/commands/create_issue_version.py b/apiserver/plane/db/management/commands/sync_issue_version.py similarity index 78% rename from apiserver/plane/db/management/commands/create_issue_version.py rename to apiserver/plane/db/management/commands/sync_issue_version.py index 6c3a612e077..8907e24ef1f 100644 --- a/apiserver/plane/db/management/commands/create_issue_version.py +++ b/apiserver/plane/db/management/commands/sync_issue_version.py @@ -2,7 +2,7 @@ from django.core.management.base import BaseCommand # Module imports -from plane.bgtasks.issue_version_task import schedule_issue_management_command_task +from plane.bgtasks.issue_version_sync import schedule_issue_version class Command(BaseCommand): @@ -12,7 +12,7 @@ def handle(self, *args, **options): batch_size = input("Enter the batch size: ") batch_countdown = input("Enter the batch countdown: ") - schedule_issue_management_command_task.delay( + schedule_issue_version.delay( batch_size=int(batch_size), countdown=int(batch_countdown) ) diff --git a/apiserver/plane/settings/common.py b/apiserver/plane/settings/common.py index ed42dfe19c5..c494d34cc6e 100644 --- a/apiserver/plane/settings/common.py +++ b/apiserver/plane/settings/common.py @@ -262,6 +262,9 @@ "plane.license.bgtasks.tracer", # management tasks "plane.bgtasks.dummy_data_task", + # issue sync tasks + "plane.bgtasks.issue_version_sync", + "plane.bgtasks.issue_description_version_sync", ) # Sentry Settings From 4d12a94b46e3cb55937ea425e6834573ec0ca6c0 Mon Sep 17 00:00:00 2001 From: gurusainath Date: Wed, 11 Dec 2024 18:39:37 +0530 Subject: [PATCH 4/5] chore: removed logs --- apiserver/plane/bgtasks/issue_description_version_sync.py | 7 ++++++- apiserver/plane/bgtasks/issue_version_sync.py | 7 ++++++- apiserver/plane/db/models/issue.py | 1 - 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/apiserver/plane/bgtasks/issue_description_version_sync.py b/apiserver/plane/bgtasks/issue_description_version_sync.py index c259ff736b6..4cd92b2f784 100644 --- a/apiserver/plane/bgtasks/issue_description_version_sync.py +++ b/apiserver/plane/bgtasks/issue_description_version_sync.py @@ -102,7 +102,12 @@ def sync_issue_description_version(batch_size=5000, offset=0, countdown=300): # Schedule next batch if needed if end_offset < total_issues_count: sync_issue_description_version.apply_async( - args=[batch_size, end_offset], countdown=countdown + kwargs={ + "batch_size": batch_size, + "offset": end_offset, + "countdown": countdown, + }, + countdown=countdown, ) return except Exception as e: diff --git a/apiserver/plane/bgtasks/issue_version_sync.py b/apiserver/plane/bgtasks/issue_version_sync.py index 12db17c2036..b701ef1a2dc 100644 --- a/apiserver/plane/bgtasks/issue_version_sync.py +++ b/apiserver/plane/bgtasks/issue_version_sync.py @@ -235,7 +235,12 @@ def sync_issue_version(batch_size=5000, offset=0, countdown=300): # Schedule the next batch if there are more workspaces to process if end_offset < total_issues_count: sync_issue_version.apply_async( - args=[batch_size, end_offset], countdown=countdown + kwargs={ + "batch_size": batch_size, + "offset": end_offset, + "countdown": countdown, + }, + countdown=countdown, ) print(f"Processed Issues: {end_offset}") diff --git a/apiserver/plane/db/models/issue.py b/apiserver/plane/db/models/issue.py index 44a194863be..db3f35bb21d 100644 --- a/apiserver/plane/db/models/issue.py +++ b/apiserver/plane/db/models/issue.py @@ -756,7 +756,6 @@ def log_issue_version(cls, issue, user): type=issue.type_id, cycle=cycle_issue.cycle if cycle_issue else None, modules=Module.objects.filter(issue=issue).values_list("id", flat=True), - # activity=issue.activity_id, properties={}, meta={}, last_saved_at=timezone.now(), From 954167c5ffc571a80d5246b63c1c778b54c2e80b Mon Sep 17 00:00:00 2001 From: gurusainath Date: Wed, 11 Dec 2024 19:07:58 +0530 Subject: [PATCH 5/5] chore: updated the issue log classmethod --- apiserver/plane/db/models/issue.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/apiserver/plane/db/models/issue.py b/apiserver/plane/db/models/issue.py index db3f35bb21d..ac6da307712 100644 --- a/apiserver/plane/db/models/issue.py +++ b/apiserver/plane/db/models/issue.py @@ -731,6 +731,8 @@ def log_issue_version(cls, issue, user): Module = apps.get_model("db.Module") CycleIssue = apps.get_model("db.CycleIssue") + IssueAssignee = apps.get_model("db.IssueAssignee") + IssueLabel = apps.get_model("db.IssueLabel") cycle_issue = CycleIssue.objects.filter(issue=issue).first() @@ -744,9 +746,17 @@ def log_issue_version(cls, issue, user): priority=issue.priority, start_date=issue.start_date, target_date=issue.target_date, - assignees=issue.assignees, + assignees=list( + IssueAssignee.objects.filter(issue=issue).values_list( + "assignee_id", flat=True + ) + ), sequence_id=issue.sequence_id, - labels=issue.labels, + labels=list( + IssueLabel.objects.filter(issue=issue).values_list( + "label_id", flat=True + ) + ), sort_order=issue.sort_order, completed_at=issue.completed_at, archived_at=issue.archived_at, @@ -754,8 +764,10 @@ def log_issue_version(cls, issue, user): external_source=issue.external_source, external_id=issue.external_id, type=issue.type_id, - cycle=cycle_issue.cycle if cycle_issue else None, - modules=Module.objects.filter(issue=issue).values_list("id", flat=True), + cycle=cycle_issue.cycle_id if cycle_issue else None, + modules=list( + Module.objects.filter(issue=issue).values_list("id", flat=True) + ), properties={}, meta={}, last_saved_at=timezone.now(),