feat(composio): incremental sync with per-item persistence for Gmail and Notion#519
Conversation
…mail and Notion providers - Enhanced the Gmail and Notion providers to support incremental synchronization with per-item persistence, improving data handling efficiency. - Introduced a new `SyncState` module to manage persistent sync state, including cursor tracking, synced IDs, and daily request budget management. - Updated sync logic to load state from a KV store, check daily budget limits, and handle paginated API requests, ensuring robust data retrieval and deduplication. - Refactored existing sync methods to utilize the new state management, enhancing overall reliability and performance of the providers. - Improved documentation for the sync process and state management, clarifying the operational flow and usage of the new features.
…otion providers - Enhanced error handling in the Gmail provider to format error messages more clearly during email fetching. - Streamlined debug logging in both Gmail and Notion providers to improve readability by consolidating multiline statements into single lines. - Refactored the `extract_page_title` function in the Notion provider for better clarity in property extraction logic. - Overall, these changes aim to enhance maintainability and improve the clarity of error reporting and logging across the providers.
📝 WalkthroughWalkthroughReplaces snapshot-based syncing with incremental, stateful synchronization for Gmail and Notion providers. Introduces a new Changes
Sequence DiagramsequenceDiagram
participant Provider as Gmail/Notion<br/>Provider
participant Memory as Memory Store<br/>(KV)
participant API as External API<br/>(Gmail/Notion)
participant Persist as Memory Persistence
Provider->>Memory: Load SyncState<br/>(cursor, synced_ids, budget)
Memory-->>Provider: Return persisted state
alt Budget Exhausted
Provider-->>Provider: Early exit with<br/>budget_exhausted
else Budget Available
loop Paginate while budget available
Provider->>API: Fetch items (paginated)<br/>with optional cursor filter
API-->>Provider: Return page of items
loop Process each item
Provider->>Provider: Extract ID, compute sync_key
alt Item already synced
Provider->>Provider: Skip (dedup)
else New/Updated item
Provider->>Persist: Persist single item<br/>as memory document
Persist-->>Provider: Document persisted
Provider->>Provider: Mark in synced_ids,<br/>record request
end
end
Provider->>Provider: Advance cursor to<br/>newest item timestamp
end
end
Provider->>Memory: Save updated SyncState<br/>(cursor, synced_ids, budget)
Memory-->>Provider: State persisted
Provider-->>Provider: Return sync outcome
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/openhuman/composio/providers/sync_state.rs (1)
53-57: Potential unbounded growth ofsynced_idsset.For high-volume providers like Gmail,
synced_idscould grow to tens of thousands of entries over time. Consider adding a pruning mechanism or a size cap with LRU eviction to prevent excessive memory/storage consumption.This isn't blocking for the initial implementation, but should be tracked for follow-up.
Would you like me to open an issue to track implementing a pruning strategy for the synced_ids set?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/openhuman/composio/providers/sync_state.rs` around lines 53 - 57, The synced_ids HashSet<String> can grow unbounded for high-volume providers; replace or wrap it with a bounded, eviction-capable structure (e.g., an LRU cache) and perform insertions via that API instead of direct HashSet inserts so old IDs are pruned; specifically change the synced_ids field (or add a new field in the same struct) to use a capacity-limited container (for example lru::LruCache<String, ()> or a custom RingBuffer+HashSet combo), update any methods that modify/access synced_ids to use the new API (insert/check/evict), and ensure serde (de)serialization is handled (use a serializable wrapper or convert on save/load) so persistence semantics remain intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/openhuman/composio/providers/sync_state.rs`:
- Around line 53-57: The synced_ids HashSet<String> can grow unbounded for
high-volume providers; replace or wrap it with a bounded, eviction-capable
structure (e.g., an LRU cache) and perform insertions via that API instead of
direct HashSet inserts so old IDs are pruned; specifically change the synced_ids
field (or add a new field in the same struct) to use a capacity-limited
container (for example lru::LruCache<String, ()> or a custom RingBuffer+HashSet
combo), update any methods that modify/access synced_ids to use the new API
(insert/check/evict), and ensure serde (de)serialization is handled (use a
serializable wrapper or convert on save/load) so persistence semantics remain
intact.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f2fe14d9-e289-43bd-bc82-c2debb5bff98
📒 Files selected for processing (4)
src/openhuman/composio/providers/gmail.rssrc/openhuman/composio/providers/mod.rssrc/openhuman/composio/providers/notion.rssrc/openhuman/composio/providers/sync_state.rs
…and Notion (tinyhumansai#519) * feat(composio): implement incremental sync and state management for Gmail and Notion providers - Enhanced the Gmail and Notion providers to support incremental synchronization with per-item persistence, improving data handling efficiency. - Introduced a new `SyncState` module to manage persistent sync state, including cursor tracking, synced IDs, and daily request budget management. - Updated sync logic to load state from a KV store, check daily budget limits, and handle paginated API requests, ensuring robust data retrieval and deduplication. - Refactored existing sync methods to utilize the new state management, enhancing overall reliability and performance of the providers. - Improved documentation for the sync process and state management, clarifying the operational flow and usage of the new features. * refactor(composio): improve error handling and logging in Gmail and Notion providers - Enhanced error handling in the Gmail provider to format error messages more clearly during email fetching. - Streamlined debug logging in both Gmail and Notion providers to improve readability by consolidating multiline statements into single lines. - Refactored the `extract_page_title` function in the Notion provider for better clarity in property extraction logic. - Overall, these changes aim to enhance maintainability and improve the clarity of error reporting and logging across the providers.
Summary
composio-gmail-msg-{id},composio-notion-page-{id}) instead of one giant snapshot blob, improving agent recall granularity.execute_toolAPI calls per calendar day (auto-resets), preventing runaway backfills during initial sync.nextPageTokenfor Gmail,next_cursorfor Notion) across multiple pages within budget.{page_id}@{edited_time}keys so pages edited after their last sync are re-persisted.Changes
src/openhuman/composio/providers/sync_state.rsSyncState,DailyBudget, KV-backed load/save, dedup helpers, per-item persist helpersrc/openhuman/composio/providers/gmail.rssrc/openhuman/composio/providers/notion.rssrc/openhuman/composio/providers/mod.rspub mod sync_stateTest plan
cargo checkpassescargo testsuite passes (2464 passed, 1 pre-existing flaky failure unrelated to this PR)Summary by CodeRabbit