Async search: list method and a bunch of various improvements and fixes#3
Conversation
📝 WalkthroughWalkthroughThis update introduces a new API for listing asynchronous searches, adds detailed metadata to async search result responses, and improves concurrency and filtering in async search operations. It modifies protobuf definitions, updates core async search logic, enhances JSON marshaling, extends test coverage, and adds or adapts relevant gRPC and mock interfaces. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Note ⚡️ Unit Test Generation is now available in beta!Learn more here, or try it out under "Finishing Touches" below. ✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
6e28a16 to
6063da9
Compare
3659894 to
e92926f
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 6
🔭 Outside diff range comments (1)
fracmanager/encoding.go (1)
390-442: Add versioning toAggregatableSamplesserializationThe delta-encoding change in
marshalAggregatableSamples/unmarshalAggregatableSamplesisn’t gated by a version byte, so existing data will fail to deserialize. Please introduce a version prefix—just likeqprBinVersion—to these methods.• File:
fracmanager/encoding.go
– Define, e.g.:
go const aggSamplesVersion = uint8(1)
– InmarshalAggregatableSamples: prependdst = append(dst, aggSamplesVersion)before writing the length and samples.
– InunmarshalAggregatableSamples: read and validate the version byte beforebe.Uint64(src), returning an error on mismatch.Example diff:
+ const aggSamplesVersion = uint8(1) func marshalAggregatableSamples(s *seq.AggregatableSamples, dst []byte) []byte { + dst = append(dst, aggSamplesVersion) dst = be.AppendUint64(dst, uint64(len(s.SamplesByBin))) prevMID := seq.MID(0) … } func unmarshalAggregatableSamples(q *seq.AggregatableSamples, src []byte) ([]byte, error) { + if len(src) < 1 { + return nil, fmt.Errorf("src too short to read aggSamplesVersion") + } + version := src[0]; src = src[1:] + if version != aggSamplesVersion { + return nil, fmt.Errorf("invalid AggregatableSamples version %d; want %d", version, aggSamplesVersion) + } if len(src) < 8 { return nil, fmt.Errorf("src too short to unmarshal QPRHistogram, want at least 16 bytes, got %d", len(src)) } … }
🧹 Nitpick comments (2)
proxy/search/async.go (1)
314-319: Consider limiting concurrent replica queries.Querying all replicas for all shards simultaneously could create excessive load. Consider implementing a semaphore or worker pool to limit concurrent requests.
+sem := make(chan struct{}, 10) // Limit to 10 concurrent queries for _, replica := range shard { + sem <- struct{}{} + go func(replica string) { + defer func() { <-sem }() storeResp, err := si.clients[replica].GetAsyncSearchesList(storesCtx, &req) if err != nil { if status.Code(err) == codes.NotFound { continue } } respChan <- shardResponse{ data: storeResp, err: err, } + }(replica) }pkg/seqproxyapi/v1/marshaler.go (1)
240-240: Fix the copy-pasted comment.The comment refers to "oldest_time field" but this marshaler handles async search response fields.
-// MarshalJSON overrides oldest_time field with formatted string instead of google.protobuf.Timestamp. +// MarshalJSON overrides timestamp fields and other fields with custom formatting for FetchAsyncSearchResultResponse.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (6)
pkg/seqproxyapi/v1/seq_proxy_api.pb.gois excluded by!**/*.pb.gopkg/seqproxyapi/v1/seq_proxy_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/storeapi/store_api_vtproto.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (17)
api/seqproxyapi/v1/seq_proxy_api.proto(4 hunks)api/storeapi/store_api.proto(3 hunks)fracmanager/async_searcher.go(16 hunks)fracmanager/encoding.go(5 hunks)fracmanager/encoding_test.go(1 hunks)pkg/seqproxyapi/v1/mappings.go(2 hunks)pkg/seqproxyapi/v1/marshaler.go(1 hunks)pkg/seqproxyapi/v1/marshaler_test.go(1 hunks)proxy/search/async.go(11 hunks)proxy/search/mock/store_api_client_mock.go(1 hunks)proxyapi/grpc_async_search.go(3 hunks)proxyapi/grpc_v1.go(1 hunks)proxyapi/mock/grpc_v1.go(1 hunks)seq/qpr.go(0 hunks)storeapi/client.go(1 hunks)storeapi/grpc_async_search.go(6 hunks)tests/integration_tests/integration_test.go(3 hunks)
💤 Files with no reviewable changes (1)
- seq/qpr.go
🧰 Additional context used
🧬 Code Graph Analysis (5)
proxyapi/mock/grpc_v1.go (2)
proxy/search/async.go (2)
GetAsyncSearchesListRequest(103-108)AsyncSearchesListItem(110-122)pkg/seqproxyapi/v1/seq_proxy_api.pb.go (6)
GetAsyncSearchesListRequest(1506-1517)GetAsyncSearchesListRequest(1530-1530)GetAsyncSearchesListRequest(1545-1547)AsyncSearchesListItem(1621-1635)AsyncSearchesListItem(1648-1648)AsyncSearchesListItem(1663-1665)
pkg/seqproxyapi/v1/mappings.go (3)
fracmanager/async_searcher.go (1)
AsyncSearchStatus(626-626)pkg/seqproxyapi/v1/seq_proxy_api.pb.go (8)
AsyncSearchStatus(189-189)AsyncSearchStatus(230-232)AsyncSearchStatus(234-236)AsyncSearchStatus(243-245)AsyncSearchStatus_AsyncSearchStatusDone(196-196)AsyncSearchStatus_AsyncSearchStatusInProgress(194-194)AsyncSearchStatus_AsyncSearchStatusError(201-201)AsyncSearchStatus_AsyncSearchStatusCanceled(198-198)pkg/storeapi/store_api.pb.go (8)
AsyncSearchStatus(186-186)AsyncSearchStatus(221-223)AsyncSearchStatus(225-227)AsyncSearchStatus(234-236)AsyncSearchStatus_AsyncSearchStatusDone(190-190)AsyncSearchStatus_AsyncSearchStatusInProgress(189-189)AsyncSearchStatus_AsyncSearchStatusError(192-192)AsyncSearchStatus_AsyncSearchStatusCanceled(191-191)
proxy/search/mock/store_api_client_mock.go (3)
proxy/search/async.go (1)
GetAsyncSearchesListRequest(103-108)fracmanager/async_searcher.go (1)
GetAsyncSearchesListRequest(932-935)pkg/storeapi/store_api.pb.go (6)
GetAsyncSearchesListRequest(1236-1242)GetAsyncSearchesListRequest(1255-1255)GetAsyncSearchesListRequest(1270-1272)GetAsyncSearchesListResponse(1288-1293)GetAsyncSearchesListResponse(1306-1306)GetAsyncSearchesListResponse(1321-1323)
fracmanager/encoding_test.go (2)
seq/tokenizer.go (1)
Token(52-55)seq/seq.go (1)
MID(17-17)
fracmanager/encoding.go (2)
seq/seq.go (1)
MID(17-17)seq/qpr.go (1)
AggBin(108-111)
🔇 Additional comments (33)
api/seqproxyapi/v1/seq_proxy_api.proto (2)
107-113: New async search listing endpoint looks good.The API design follows RESTful conventions using POST with body for complex filtering.
316-341: Well-structured message definitions for listing functionality.The request supports proper pagination and filtering, and the response includes comprehensive metadata.
api/storeapi/store_api.proto (2)
24-24: New store API method added correctly.The
GetAsyncSearchesListRPC follows the established pattern.
182-186: Conversion logic forfrom/toandretentionis in place
Proxy code converts the store’sgoogle.protobuf.Timestampwith.AsTime()andgoogle.protobuf.Durationwith.AsDuration()(seeproxyapi/grpc_async_search.goandproxy/search/async.go). No action required.proxyapi/grpc_v1.go (1)
36-36: Interface correctly extended for new listing functionality.The method signature aligns with the protobuf definitions and follows the established pattern.
storeapi/client.go (1)
49-51: Client method implementation follows established pattern.The delegation to the underlying store's gRPC method is consistent with other methods.
proxyapi/mock/grpc_v1.go (1)
103-116: Generated mock correctly implements new interface method.The mock follows the established GoMock pattern and properly handles the method signature.
pkg/seqproxyapi/v1/mappings.go (3)
79-80: Critical bug fix - proper slice initializationThe corrected slice size prevents potential out-of-bounds panics during enum mapping.
87-87: Good defensive programmingBoundary check prevents panics and maintains consistency with other mapping functions.
116-129: Clean string-to-enum conversion implementationProvides necessary functionality for API parameter parsing with appropriate error handling.
pkg/seqproxyapi/v1/marshaler_test.go (1)
103-146: Comprehensive marshaling testThoroughly validates JSON serialization of complex nested structures with realistic test data.
fracmanager/encoding_test.go (1)
57-72: Enhanced aggregation bin testingValidates MID-aware histogram bins supporting more granular aggregation metadata tracking.
proxy/search/mock/store_api_client_mock.go (1)
140-158: Standard mock method implementationProperly follows GoMock patterns and enables testing of the new async search listing functionality.
tests/integration_tests/integration_test.go (4)
947-976: Comprehensive async search test setupProper initialization with realistic aggregation queries and ID tracking for lifecycle testing.
986-990: Robust async condition testingr.Eventually provides better async testing compared to manual polling loops.
997-1027: Thorough response validationComprehensive checks for enriched async search responses including request echo and metadata.
1028-1063: Complete async search lifecycle testingThoroughly validates listing, sorting, deletion, and state management of async searches.
fracmanager/async_searcher.go (1)
1020-1022: LGTM! Clean sorting implementation.The sorting by StartedAt in descending order is appropriate for listing recent searches first.
proxy/search/async.go (1)
139-177: LGTM! Well-structured concurrent fetching.The concurrent fetching implementation with proper context cancellation and goroutine synchronization is clean and efficient.
proxyapi/grpc_async_search.go (7)
18-21: Clean formatting improvement.
60-60: Good addition of explicit Order field.
66-69: Correct handling of zero timestamp.
73-88: Comprehensive request details in response.The enrichment with original request parameters provides valuable context for API consumers.
108-133: Well-implemented list retrieval method.Clean handling of optional status filter and proper request/response conversion.
155-166: Efficient aggregation conversion helper.Good pre-allocation and clean mapping.
168-205: Consistent and complete list conversion.Maintains consistency with single-item response formatting and handles all fields properly.
pkg/seqproxyapi/v1/marshaler.go (2)
223-225: LGTM!Simple and consistent with existing marshaler patterns.
258-260: LGTM!Consistent with established marshaler patterns.
storeapi/grpc_async_search.go (5)
18-21: LGTM!Clean multiline formatting improves readability.
Also applies to: 57-60, 96-99, 104-107
73-73: Good fix!Proper zero-time check instead of existence check.
88-93: LGTM!Additional metadata fields properly formatted for protobuf response.
112-130: LGTM!Well-structured implementation with proper optional parameter handling.
150-179: LGTM!Comprehensive conversion logic consistent with existing patterns.
There was a problem hiding this comment.
Actionable comments posted: 6
♻️ Duplicate comments (1)
fracmanager/async_searcher.go (1)
665-683: Potential lock contention during file I/OHolding
qprsMuwhile performing file operations could cause contention. Consider:
- Find files under read lock, release lock, then load results
- Use file-level locking instead of global mutex
- Implement lock-free file listing with atomic operations
🧹 Nitpick comments (1)
fracmanager/async_searcher.go (1)
965-998: Document lock ordering to prevent deadlocksThe code consistently acquires
requestsMubeforeqprsMu. Consider adding a comment documenting this lock ordering requirement to prevent future deadlocks.+// Lock ordering: always acquire requestsMu before qprsMu to prevent deadlocks as.requestsMu.RLock() defer as.requestsMu.RUnlock()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (6)
pkg/seqproxyapi/v1/seq_proxy_api.pb.gois excluded by!**/*.pb.gopkg/seqproxyapi/v1/seq_proxy_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/seqproxyapi/v1/seq_proxy_api_vtproto.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gois excluded by!**/*.pb.gopkg/storeapi/store_api.pb.gw.gois excluded by!**/*.pb.gw.gopkg/storeapi/store_api_vtproto.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (17)
api/seqproxyapi/v1/seq_proxy_api.proto(4 hunks)api/storeapi/store_api.proto(3 hunks)fracmanager/async_searcher.go(16 hunks)fracmanager/encoding.go(5 hunks)fracmanager/encoding_test.go(1 hunks)pkg/seqproxyapi/v1/mappings.go(2 hunks)pkg/seqproxyapi/v1/marshaler.go(1 hunks)pkg/seqproxyapi/v1/marshaler_test.go(1 hunks)proxy/search/async.go(11 hunks)proxy/search/mock/store_api_client_mock.go(1 hunks)proxyapi/grpc_async_search.go(3 hunks)proxyapi/grpc_v1.go(1 hunks)proxyapi/mock/grpc_v1.go(1 hunks)seq/qpr.go(0 hunks)storeapi/client.go(1 hunks)storeapi/grpc_async_search.go(6 hunks)tests/integration_tests/integration_test.go(3 hunks)
💤 Files with no reviewable changes (1)
- seq/qpr.go
🧰 Additional context used
🧬 Code Graph Analysis (7)
pkg/seqproxyapi/v1/marshaler_test.go (1)
pkg/seqproxyapi/v1/seq_proxy_api.pb.go (21)
FetchAsyncSearchResultResponse(1244-1258)FetchAsyncSearchResultResponse(1271-1271)FetchAsyncSearchResultResponse(1286-1288)StartAsyncSearchRequest(1043-1059)StartAsyncSearchRequest(1072-1072)StartAsyncSearchRequest(1087-1089)SearchQuery(460-468)SearchQuery(481-481)SearchQuery(496-498)AggQuery(529-538)AggQuery(551-551)AggQuery(566-568)ComplexSearchResponse(949-961)ComplexSearchResponse(974-974)ComplexSearchResponse(989-991)Document(301-308)Document(321-321)Document(336-338)Histogram(415-420)Histogram(433-433)Histogram(448-450)
fracmanager/encoding_test.go (1)
seq/seq.go (1)
MID(17-17)
proxyapi/mock/grpc_v1.go (2)
proxy/search/async.go (2)
GetAsyncSearchesListRequest(103-108)AsyncSearchesListItem(110-122)pkg/seqproxyapi/v1/seq_proxy_api.pb.go (6)
GetAsyncSearchesListRequest(1506-1517)GetAsyncSearchesListRequest(1530-1530)GetAsyncSearchesListRequest(1545-1547)AsyncSearchesListItem(1621-1635)AsyncSearchesListItem(1648-1648)AsyncSearchesListItem(1663-1665)
proxy/search/mock/store_api_client_mock.go (3)
proxy/search/async.go (1)
GetAsyncSearchesListRequest(103-108)fracmanager/async_searcher.go (1)
GetAsyncSearchesListRequest(932-935)pkg/storeapi/store_api.pb.go (6)
GetAsyncSearchesListRequest(1236-1242)GetAsyncSearchesListRequest(1255-1255)GetAsyncSearchesListRequest(1270-1272)GetAsyncSearchesListResponse(1288-1293)GetAsyncSearchesListResponse(1306-1306)GetAsyncSearchesListResponse(1321-1323)
fracmanager/encoding.go (2)
seq/seq.go (1)
MID(17-17)seq/qpr.go (1)
AggBin(108-111)
tests/integration_tests/integration_test.go (4)
proxy/search/async.go (2)
AsyncRequest(24-32)GetAsyncSearchesListRequest(103-108)fracmanager/async_searcher.go (2)
AsyncSearchStatusDone(629-629)GetAsyncSearchesListRequest(932-935)seq/qpr.go (2)
QPR(68-74)ErrorSource(61-64)tests/common/util.go (1)
IDs(73-80)
proxyapi/grpc_async_search.go (7)
pkg/seqproxyapi/v1/seq_proxy_api.pb.go (36)
StartAsyncSearchRequest(1043-1059)StartAsyncSearchRequest(1072-1072)StartAsyncSearchRequest(1087-1089)FetchAsyncSearchResultRequest(1171-1184)FetchAsyncSearchResultRequest(1197-1197)FetchAsyncSearchResultRequest(1212-1214)FetchAsyncSearchResultResponse(1244-1258)FetchAsyncSearchResultResponse(1271-1271)FetchAsyncSearchResultResponse(1286-1288)Order(143-143)Order(172-174)Order(176-178)Order(185-187)SearchQuery(460-468)SearchQuery(481-481)SearchQuery(496-498)HistQuery(606-611)HistQuery(624-624)HistQuery(639-641)GetAsyncSearchesListRequest(1506-1517)GetAsyncSearchesListRequest(1530-1530)GetAsyncSearchesListRequest(1545-1547)AsyncSearchStatus(189-189)AsyncSearchStatus(230-232)AsyncSearchStatus(234-236)AsyncSearchStatus(243-245)AggQuery(529-538)AggQuery(551-551)AggQuery(566-568)AggFunc(81-81)AggFunc(125-127)AggFunc(129-131)AggFunc(138-140)AsyncSearchesListItem(1621-1635)AsyncSearchesListItem(1648-1648)AsyncSearchesListItem(1663-1665)proxy/search/async.go (4)
FetchAsyncSearchResultRequest(78-83)FetchAsyncSearchResultResponse(85-101)GetAsyncSearchesListRequest(103-108)AsyncSearchesListItem(110-122)seq/seq.go (2)
ID(12-15)MIDToDuration(111-113)pkg/seqproxyapi/v1/mappings.go (1)
MustProtoAsyncSearchStatus(108-114)pkg/storeapi/mappings.go (1)
MustProtoAsyncSearchStatus(137-143)fracmanager/async_searcher.go (3)
GetAsyncSearchesListRequest(932-935)AsyncSearchStatus(626-626)AsyncSearchesListItem(937-957)proxy/search/search_request.go (1)
AggQuery(9-15)
🔇 Additional comments (30)
proxyapi/grpc_v1.go (1)
36-36: LGTM: Clean interface extension.The new method signature follows Go conventions and integrates well with the existing async search methods.
storeapi/client.go (1)
49-51: LGTM: Consistent client delegation.Follows the established pattern of other client methods in this file.
fracmanager/encoding_test.go (1)
57-72: LGTM: Test data updated for new key structure.The addition of MID fields to AggBin keys correctly reflects the encoding format changes for delta encoding support.
pkg/seqproxyapi/v1/mappings.go (3)
79-80: Critical fix: Corrected mapping initialization.Fixed copy-paste error that was incorrectly using
orderMappingsinstead ofstatusMappings. This would have caused runtime panics.
87-87: Critical fix: Corrected boundary check.Fixed boundary check to use correct slice length for status mappings.
116-129: LGTM: Useful string-to-enum conversion.The new map and function provide clean string-to-enum conversion for async search statuses.
pkg/seqproxyapi/v1/marshaler_test.go (1)
103-146: LGTM: Comprehensive marshaling test.Thorough test coverage for the complex nested FetchAsyncSearchResultResponse structure with proper validation of JSON output format.
proxyapi/mock/grpc_v1.go (1)
103-116: LGTM!The new mock methods follow the established pattern for gomock generated code.
proxy/search/mock/store_api_client_mock.go (1)
140-158: LGTM!Standard gomock generated code for the new gRPC method.
fracmanager/encoding.go (2)
38-38: Good fix!Error message now follows the standard "want X, got Y" format.
390-443: Excellent optimization!Delta encoding for MID values reduces storage size while properly preserving the full {MID, Token} key structure in the aggregation map.
tests/integration_tests/integration_test.go (4)
1948-1956: Good refactoring!Extracting the request to a variable improves readability and enables proper comparison later in the test.
1954-1955: Excellent consistency improvement!Using UTC throughout prevents timezone-related test failures.
1986-1990: Smart async handling!Using
r.Eventuallyis the right approach for polling async operations.
2028-2064: Comprehensive test coverage!The new test sections properly verify:
- Multiple async searches in the list
- Correct ordering (newest first)
- All metadata fields
- Deletion functionality
pkg/seqproxyapi/v1/marshaler.go (1)
223-260: Well-structured JSON marshalers!The custom marshalers ensure consistent formatting:
- Enums as quoted strings
- Numeric fields as quoted decimals
- Timestamps in RFC3339Nano format
- Proper handling of optional fields
api/seqproxyapi/v1/seq_proxy_api.proto (2)
107-113: Well-designed async search listing APIThe new
GetAsyncSearchesListRPC method provides comprehensive filtering and pagination capabilities.
292-302: Good enhancement: Response includes original request contextIncluding the original
StartAsyncSearchRequestin the response provides valuable context. Field numbering properly preserves backward compatibility.api/storeapi/store_api.proto (1)
181-187: Consistent response enrichmentThe additional fields match the proxy API design, maintaining consistency across layers.
fracmanager/async_searcher.go (3)
43-63: Clear metric naming conventionThe
asyncSearchprefix improves metric organization and discoverability.
203-216: Clean status determination logicCentralizing status logic in a dedicated method improves maintainability.
909-920: Proper state validationPreventing cancellation of completed searches avoids unnecessary state changes.
storeapi/grpc_async_search.go (2)
72-75: Important bug fix: Corrected timestamp assignment logicGood catch - the condition was inverted. Now correctly assigns timestamp only when
CanceledAtis not zero.
112-130: Clean implementation of async search listingProper handling of optional status filter and ID list.
proxyapi/grpc_async_search.go (1)
73-87: Comprehensive request reconstructionProperly reconstructs the original request with all fields including optional histogram.
proxy/search/async.go (5)
59-61: Good load balancing improvementShuffling replica order helps distribute load more evenly across replicas.
438-449: Clean helper functionWell-structured conversion from protobuf to internal representation.
458-470: Appropriate error handling for distributed operationsGood approach to attempt cancellation on all replicas and only fail if all attempts fail.
495-502: Clean abstractionSimple and focused helper for replica iteration.
515-515: Important bug fixCorrectly assigns
HotStoresinstead of reassigningHotReadStores.
6063da9 to
1ab8df9
Compare
af86645 to
2a5a2a5
Compare
| google.protobuf.Timestamp started_at = 3; | ||
| google.protobuf.Timestamp expires_at = 4; | ||
| optional google.protobuf.Timestamp canceled_at = 5; | ||
| StartAsyncSearchRequest request = 2; |
There was a problem hiding this comment.
nit: I am a little triggered by the fact that we have something that is prefixed with Start in Fetch. Maybe StartAsyncSearchRequest should have inner message with all parameters that can be returned in FetchAsyncSearchResultResponse?
| } | ||
| if now.Add(r.Retention).After(now.AddDate(5, 0, 0)) { | ||
| // Just check Retention is correct. Retention more than 5 years is not expected. | ||
| if now.Add(r.Retention).After(now.Add(maxRetention)) { |
There was a problem hiding this comment.
I think this is enough:
if r.Retention > maxRetention {
...
}There was a problem hiding this comment.
Leaving this here since it's outside of PR diff.
I think 0o777 mode is redundant, we should stick with 0o666 or even 0o600.
| as.doSearch(asyncSearchID, fracs) | ||
| activeSearches.Add(-1) | ||
| asyncSearchActiveSearches.Add(-1) | ||
| as.processWg.Done() |
There was a problem hiding this comment.
Missing defer here. Method doSearch can panic.
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Documentation
Refactor