Skip to content

[ECO-5426][LiveObjects] Implement object sync / operation #1137

Merged
sacOO7 merged 20 commits intomainfrom
feature/object-sync-refactored
Aug 8, 2025
Merged

[ECO-5426][LiveObjects] Implement object sync / operation #1137
sacOO7 merged 20 commits intomainfrom
feature/object-sync-refactored

Conversation

@sacOO7
Copy link
Collaborator

@sacOO7 sacOO7 commented Aug 6, 2025

Summary by CodeRabbit

  • New Features

    • Introduced comprehensive live object management, including live maps and live counters, with support for synchronization, state updates, garbage collection, and error handling.
    • Added new error codes for invalid objects and map-specific errors.
    • Enhanced handling of channel state changes and object synchronization for real-time updates.
  • Bug Fixes

    • Improved robustness in deserialization and error reporting for unknown enum values and invalid object data.
  • Tests

    • Added extensive unit tests for live objects, counters, maps, synchronization, and error handling to ensure reliability and correctness.
  • Documentation

    • Improved inline documentation and comments for clarity on state transitions, error handling, and object lifecycle management.
  • Style

    • Refactored code for better type safety and readability, including sealed class hierarchies and explicit type usage.

sacOO7 added 20 commits July 2, 2025 14:30
…erfaces

- Enhanced LiveCounter interface with increment/decrement operations and comprehensive JavaDoc
- Extended LiveObjectsPlugin interface for modular plugin architecture
- Updated ErrorCodes enum for standardized live objects error handling
- Established blocking/non-blocking operation annotations for API consistency
- Created ObjectId class with comprehensive validation and parsing logic
- Enhanced Utils helper functions for common object operations
- Updated Helpers class for shared live objects functionality
- Established consistent logging patterns and error handling across components
- Enhanced DefaultLiveObjectsPlugin with seamless channel integration
- Updated DefaultLiveObjects with coroutine-based sequential message processing
- Added objects pool initialization and comprehensive lifecycle management
- Implemented channel state change handling for live objects synchronization
…sting

- Added BaseLiveObject abstract class as foundation for LiveMap/LiveCounter
- Implemented site timeserials tracking and tombstone lifecycle management
- Created BaseLiveObjectTest with spec-compliant validation and edge cases
- Established thread-safe object state management and operation handling
…system

- Enhanced ObjectMessage data class with complete serialization field mapping
- Created comprehensive ObjectId validation with extensive test coverage
- Added message size calculation and validation logic for protocol compliance
- Established edge case testing for object ID formats and parsing scenarios
- Enhanced TestUtils with advanced mock object generation capabilities
- Updated TestHelpers with channel and adapter mocking for integration tests
- Added parameterized test base classes for systematic integration testing
- Established consistent test patterns and mock factories for all live object types
…e management

- Added ObjectsPool with ConcurrentHashMap for thread-safe object storage
- Implemented automatic garbage collection for tombstoned objects
- Created pool initialization with root object and comprehensive lifecycle management
- Added extensive ObjectsPoolTest with concurrent access validation and edge cases
…ocessing

- Implemented ObjectsManager for OBJECT and OBJECT_SYNC message processing
- Added sync objects data pool for collecting and managing sync sequences
- Created buffered object operations during sync state with proper ordering
- Established comprehensive ObjectsManagerTest with sync sequence validation scenarios
…tions

- Added ObjectsSyncTracker for managing sync sequences and cursor tracking
- Implemented sync state transitions and channel serial management
- Created comprehensive sync tracking tests with edge case coverage
- Established sync completion detection and buffered operation processing logic
- Created LiveMapEntry with timestamp and value management for conflict resolution
- Enhanced ObjectMessageFixtures for consistent test data generation across test suites
- Added entry lifecycle management with tombstone support and GC integration
- Established fixture patterns for various object message scenarios and edge cases
- Added DefaultLiveMap with ConcurrentHashMap for thread-safe map operations
- Implemented map semantics support (LWW) and comprehensive entry management
- Created essential LiveMap operations (get, set, remove, size) with validation
- Established DefaultLiveMapTest with fundamental operation validation and concurrency tests
…ocessing

- Implemented LiveMapManager for handling complex map operations and state management
- Added operation validation and conflict resolution logic with LWW semantics
- Created comprehensive LiveMapManagerTest with operation scenarios and edge cases
- Established map update calculation and change notification system for subscribers
- Added DefaultLiveCounter with AtomicReference for thread-safe counting operations
- Implemented counter operations (increment, decrement, value) with proper synchronization
- Created DefaultLiveCounterTest with concurrent operation validation and stress testing
- Established counter data management and operation result handling with notifications
- Implemented LiveCounterManager for counter-specific operations and state management
- Added counter operation validation and comprehensive state management logic
- Created extensive LiveCounterManagerTest with edge cases and error scenarios
- Established counter update calculation and notification system for real-time updates
…ting

- Created comprehensive DefaultLiveObjectsTest with integration scenarios
- Added complete live objects lifecycle and state management validation
- Implemented channel state integration testing with various state transitions
- Established end-to-end workflow testing for live objects synchronization
- Enhanced JsonSerialization with custom type adapters for enum serialization
- Improved ObjectMessage JSON serialization and deserialization with proper formatting
- Updated enum code type adapters for ObjectOperationAction and MapSemantics
- Established JSON object conversion utilities with comprehensive error handling
- Enhanced MsgpackSerialization for efficient binary message encoding
- Improved ObjectMessage MessagePack write and read operations with field optimization
- Updated DefaultLiveObjectSerializer supporting both JSON and MessagePack formats
- Established efficient binary serialization with minimal memory allocation
- Enhanced ObjectMessageSerializationTest with comprehensive format validation
- Updated ObjectMessageSizeTest with size calculation validation and limits enforcement
- Improved message size calculation methods for all object message components
- Established size validation utilities with comprehensive test coverage and edge cases
- Enhanced ChannelBase to support live objects functionality with seamless integration
- Updated live objects protocol message handling in channel pipeline
- Improved channel state integration for live objects synchronization
- Established robust integration between channels and live objects with proper lifecycle
@coderabbitai
Copy link

coderabbitai bot commented Aug 6, 2025

Walkthrough

This pull request introduces a comprehensive implementation of live object synchronization and real-time operation handling for Ably's live objects system. It adds core classes for object pooling, synchronization tracking, object management, and type-specific logic for live maps and counters. Extensive changes include new interfaces, state machines, serialization/deserialization updates, and a large suite of unit tests covering the new functionality.

Changes

Cohort / File(s) Change Summary
Live Objects Core Implementation
live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt, .../ObjectsManager.kt, .../ObjectsPool.kt, .../ObjectsSyncTracker.kt, .../Helpers.kt, .../Utils.kt, .../ObjectId.kt, .../ErrorCodes.kt, .../ObjectMessage.kt
Introduces and implements the main classes and helpers for live objects: state machine, event bus, object pooling, sync tracking, error codes, sealed value types, and object ID parsing.
Live Map & Counter Types
.../type/BaseLiveObject.kt, .../type/livemap/DefaultLiveMap.kt, .../type/livemap/LiveMapEntry.kt, .../type/livemap/LiveMapManager.kt, .../type/livecounter/DefaultLiveCounter.kt, .../type/livecounter/LiveCounterManager.kt
Adds abstract base class for live objects, concrete implementations for live map and counter, entry types, and managers for state and operation handling.
Serialization/Deserialization
.../serialization/JsonSerialization.kt, .../serialization/MsgpackSerialization.kt
Updates serialization and deserialization logic to support new sealed ObjectValue hierarchy, robust enum handling, and improved error reporting.
Plugin and Channel Integration
lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java, lib/src/main/java/io/ably/lib/realtime/ChannelBase.java, live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt
Adds handleStateChange method to plugin interfaces and integrates state change notifications into channel state transitions.
Counter Interface Update
lib/src/main/java/io/ably/lib/objects/LiveCounter.java
Changes value() method return type from Long to Double.
Test Infrastructure and Coverage
live-objects/src/test/kotlin/io/ably/lib/objects/unit/*, .../TestHelpers.kt, .../fixtures/ObjectMessageFixtures.kt, .../TestUtils.kt
Adds comprehensive unit tests for all new classes and logic, including object pool, manager, sync tracker, type-specific logic, and serialization. Test helpers and fixtures are added for easier mocking and assertions.
Minor Test Refactoring
.../unit/ObjectMessageSerializationTest.kt, .../unit/ObjectMessageSizeTest.kt, .../unit/RealtimeObjectsTest.kt
Updates test imports, class names, and usage of new sealed ObjectValue subclasses for consistency.

Sequence Diagram(s)

sequenceDiagram
    participant ChannelBase
    participant LiveObjectsPlugin
    participant DefaultLiveObjects
    participant ObjectsManager
    participant ObjectsPool

    ChannelBase->>LiveObjectsPlugin: handleStateChange(channelName, state, hasObjects)
    LiveObjectsPlugin->>DefaultLiveObjects: handleStateChange(state, hasObjects)
    DefaultLiveObjects->>ObjectsManager: startNewSync / endSync (based on state)
    DefaultLiveObjects->>ObjectsManager: handle(protocolMessage)
    ObjectsManager->>ObjectsPool: applyObjectMessages / applyObjectSyncMessages
    ObjectsPool->>ObjectsManager: get/set objects as needed
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~90+ minutes

Assessment against linked issues

Objective Addressed Explanation
Implement object sync (RTO5, RTO6, RTO7, RTO8, RTO9) [#1107, ECO-5426]
Implement incoming object operations for all actions [#1114]
Ensure correct handling of protocol messages and state transitions [#1107, #1114]
Provide robust error handling and test coverage for sync and operations [#1107, #1114, ECO-5426]

Assessment against linked issues: Out-of-scope changes

Code Change Explanation
No out-of-scope changes detected

Possibly related PRs

  • Setup : LiveObject plugin #1085: Introduced the LiveObjectsPlugin interface and its integration, which is further extended and utilized in this PR for live object state change handling.

Suggested reviewers

  • VeskeR
  • ttypic

Poem

In a warren of code, the rabbits did strive,
To sync all the objects and keep them alive.
With counters and maps, and pools oh so neat,
They hop through the states—no bug can defeat!
Now every object, in sync, can reside—
Thanks to these bunnies, who code with great pride! 🐇✨

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/object-sync-refactored

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot temporarily deployed to staging/pull/1137/features August 6, 2025 10:55 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1137/javadoc August 6, 2025 10:56 Inactive
@sacOO7 sacOO7 force-pushed the feature/object-sync-refactored branch from b0c8984 to c1d1d90 Compare August 6, 2025 11:04
@github-actions github-actions bot temporarily deployed to staging/pull/1137/features August 6, 2025 11:05 Inactive
@sacOO7 sacOO7 changed the title Feature/object sync refactored [ECO-5426][LiveObjects] Implement object sync / operation Aug 6, 2025
@github-actions github-actions bot temporarily deployed to staging/pull/1137/javadoc August 6, 2025 11:07 Inactive
@sacOO7 sacOO7 force-pushed the feature/object-sync-refactored branch from c1d1d90 to 0476f23 Compare August 6, 2025 12:41
@github-actions github-actions bot temporarily deployed to staging/pull/1137/features August 6, 2025 12:42 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🔭 Outside diff range comments (1)
lib/src/main/java/io/ably/lib/objects/LiveCounter.java (1)

57-57: Update outdated Javadoc comment.

The comment still references "as a Long" but the return type has been changed to Double.

-     * @return the current value of the counter as a Long.
+     * @return the current value of the counter as a Double.
🧹 Nitpick comments (13)
live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjectsPlugin.kt (1)

25-25: Fix typo in dispose reason message.

"ben" should be "been" in the dispose reason string.

-    liveObjects[channelName]?.dispose("Channel has ben released using channels.release()")
+    liveObjects[channelName]?.dispose("Channel has been released using channels.release()")
live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt (1)

37-55: Reminder: Implement core counter operations.

Several core methods are marked as TODO. These are essential for the LiveCounter functionality.

Would you like me to help implement these counter operations or create an issue to track this work?

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt (2)

4-4: Remove redundant import.

MapSemantics is already imported via the wildcard import on line 3.

-import io.ably.lib.objects.MapSemantics

40-74: Reminder: Implement core map operations.

Several essential LiveMap methods are marked as TODO and need implementation.

Would you like me to help implement these map operations or create an issue to track this work?

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt (1)

445-447: Consider simplifying the null-safety logic.

The function correctly identifies invalid ObjectData, but the logic could be more readable:

 internal fun ObjectData?.isInvalid(): Boolean {
-  return this?.objectId.isNullOrEmpty() && this?.value == null
+  return this == null || (objectId.isNullOrEmpty() && value == null)
 }

This makes it explicit that null ObjectData is also invalid, and uses smart casting for cleaner property access.

live-objects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt (2)

229-231: Redundant Unknown enum lookup.

The second search for code == -1 is unnecessary since Unknown is already defined with code -1. Simplify to:

-        action = ObjectOperationAction.entries.firstOrNull { it.code == actionCode }
-          ?: ObjectOperationAction.entries.firstOrNull { it.code == -1 }
-          ?: throw objectError("Unknown ObjectOperationAction code: $actionCode and no Unknown fallback found")
+        action = ObjectOperationAction.entries.firstOrNull { it.code == actionCode }
+          ?: ObjectOperationAction.Unknown

489-491: Redundant Unknown enum lookup.

Similar to the ObjectOperationAction case, simplify the MapSemantics lookup:

-        semantics = MapSemantics.entries.firstOrNull { it.code == semanticsCode }
-          ?: MapSemantics.entries.firstOrNull { it.code == -1 }
-          ?: throw objectError("Unknown MapSemantics code: $semanticsCode and no UNKNOWN fallback found")
+        semantics = MapSemantics.entries.firstOrNull { it.code == semanticsCode }
+          ?: MapSemantics.Unknown
live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt (1)

66-91: Consider improving null-safety handling.

While the non-null assertions on line 84 are safe (since canApplyOperation throws for null values), consider returning the validated values from canApplyOperation to avoid assertions:

// Return a data class or Pair from canApplyOperation
internal fun validateAndCheckOperation(siteCode: String?, timeSerial: String?): Pair<String, String>? {
    // validation logic...
    return if (canApply) Pair(siteCode, timeSerial) else null
}

This would make the null-safety guarantees explicit in the type system.

live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt (2)

10-10: Test class naming inconsistency.

The class is named DefaultLiveCounterManagerTest but it tests LiveCounterManager. Consider renaming to match the class under test:

-class DefaultLiveCounterManagerTest {
+class LiveCounterManagerTest {

217-255: Consider removing duplicate increment tests.

Tests at lines 217-235 and 237-255 duplicate the increment functionality already tested at lines 175-192. Consider either removing duplicates or differentiating them with unique scenarios.

live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt (1)

50-56: Use lateinit for deferred initialization of gcJob

The gcJob variable is declared without initialization but assigned in the init block. Consider using lateinit to make the deferred initialization pattern explicit.

-  private var gcJob: Job // Job for the garbage collection coroutine
+  private lateinit var gcJob: Job // Job for the garbage collection coroutine
live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt (1)

41-47: Consider documenting the buffer overflow strategy

The MutableSharedFlow uses UNLIMITED buffer capacity which could lead to memory issues if messages accumulate faster than processing. While this ensures no messages are dropped, consider documenting the expected message throughput and any backpressure strategies.

live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt (1)

115-117: Validate data before serial comparison

The data validation happens after the serial comparison check. Consider moving the validation before the serial check to fail fast on invalid data, avoiding unnecessary serial comparisons.

   ): Map<String, String> {
     val existingEntry = liveMap.data[mapOp.key]
 
+    if (mapOp.data.isInvalid()) {
+      throw objectError("Invalid object data for MAP_SET op on objectId=${objectId} on key=${mapOp.key}")
+    }
+
     // RTLM7a
     if (existingEntry != null && !canApplyMapOperation(existingEntry.timeserial, timeSerial)) {
       // RTLM7a1 - the operation's serial <= the entry's serial, ignore the operation
       Log.v(tag,
         "Skipping update for key=\"${mapOp.key}\": op serial $timeSerial <= entry serial ${existingEntry.timeserial};" +
           " objectId=${objectId}"
       )
       return mapOf()
     }
 
-    if (mapOp.data.isInvalid()) {
-      throw objectError("Invalid object data for MAP_SET op on objectId=${objectId} on key=${mapOp.key}")
-    }

@sacOO7 sacOO7 merged commit 3e0ca95 into main Aug 8, 2025
13 checks passed
@sacOO7 sacOO7 deleted the feature/object-sync-refactored branch August 8, 2025 14:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

LiveObject: Implement incoming object operations LiveObject: Implement object sync

2 participants