feat: logs S3 offloading#60
Conversation
|
Note Reviews pausedUse the following commands to manage reviews:
📝 WalkthroughWalkthroughAdds S3 offloading: new offloading config, S3 client wiring, Remote fraction type, Sealed/Loader refactor to support offload, fracmanager split into local/remote with offload/retention logic, metrics and API signature changes across store, loader, and tests. Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
50ec78d to
657dba4
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 16
🔭 Outside diff range comments (5)
fracmanager/async_searcher.go (1)
216-231: Guard against fraction disappearing; avoid nil deref panic.Between StartSearch and doSearch, a frac can be offloaded/GC’ed and not present in
fracsByName. IndexingfracsByName[... ]without an ok-check risks passing a nilfrac.FractiontoprocessFrac, causing a panic on method call.Apply this diff to handle it safely:
- f := fracsByName[fracInfo.Name] - - if err := as.processFrac(f, state.Request); err != nil { + f, ok := fracsByName[fracInfo.Name] + if !ok || f == nil { + logger.Warn("fraction disappeared from manager, skipping", + zap.String("name", fracInfo.Name)) + continue + } + if err := as.processFrac(f, state.Request); err != nil { return fmt.Errorf("processing fraction %s: %s", fracInfo.Name, err) }storeapi/store.go (1)
49-58: Fail fast when offloading is enabled but S3 client is nil; wrap errors with %w.Guard against nil s3cli when offloading is enabled to avoid deep NPEs. Also, wrap the load error with %w.
func NewStore(ctx context.Context, c StoreConfig, s3cli *s3.Client, mappingProvider MappingProvider) (*Store, error) { if err := c.setDefaults(); err != nil { return nil, err } - fracManager := fracmanager.NewFracManager(ctx, &c.FracManager, s3cli) + if c.FracManager.OffloadingEnabled && s3cli == nil { + return nil, fmt.Errorf("offloading is enabled but s3 client is nil") + } + fracManager := fracmanager.NewFracManager(ctx, &c.FracManager, s3cli) err := fracManager.Load(ctx) if err != nil { - return nil, fmt.Errorf("loading time list: %s", err) + return nil, fmt.Errorf("loading time list: %w", err) } fracManager.Start()fracmanager/sealed_frac_cache_test.go (1)
274-280: Good: context-aware initialization. Add defer fm.Stop() to avoid goroutine leaks.Ensures background workers are stopped even on test failure.
- fm, err := newFracManagerWithBackgroundStart(t.Context(), &Config{ + fm, err := newFracManagerWithBackgroundStart(t.Context(), &Config{ FracSize: 100, TotalSize: maxSize * 2, ShouldReplay: false, DataDir: dataDir, }) + defer fm.Stop()frac/sealed_loader.go (1)
24-56: Add nil-argument guards to prevent panics and improve diagnosabilityLoad assumes non-nil state/info/indexReader; a nil at call sites will panic later with less useful stack traces.
Apply this diff to fail fast with context:
func (l *Loader) Load(state *State, info *Info, indexReader *storage.IndexReader) { t := time.Now() - l.reader = indexReader + if state == nil || info == nil || indexReader == nil { + logger.Fatal("sealed loader: nil argument", + zap.Bool("nil_state", state == nil), + zap.Bool("nil_info", info == nil), + zap.Bool("nil_indexReader", indexReader == nil), + ) + } + l.reader = indexReader l.blockIndex = 1 // skipping info block that's already readfrac/sealed.go (1)
318-321: close() skips closing files when isLoaded=false → resource leak on Suicide.Offload/openIndex/openDocs can open files without flipping isLoaded. close() returns early and never closes them. Remove the early return and rely on nil checks below.
- if !f.isLoaded { - return - }
♻️ Duplicate comments (3)
frac/remote.go (1)
211-221: Validate remote object existence and surface errors (previously raised)You’re ignoring errors from Exists() and never verify the index exists. This can yield late failures with confusing errors.
Suggested adjustments:
func (f *Remote) openIndex() { if f.indexFile == nil { - name := path.Base(f.BaseFileName) + consts.IndexFileSuffix - f.indexFile = s3.NewReader(f.ctx, f.s3cli, name) + name := f.BaseFileName + consts.IndexFileSuffix + if ok, err := f.s3cli.Exists(f.ctx, name); err != nil { + logger.Warn("failed to check index existence", zap.String("key", name), zap.Error(err)) + } else if !ok { + logger.Warn("remote index not found", zap.String("key", name)) + } + f.indexFile = s3.NewReader(f.ctx, f.s3cli, name) f.indexReader = storage.NewIndexReader(f.readLimiter, f.indexFile.Name(), f.indexFile, f.indexCache.Registry) } } func (f *Remote) openDocs() { if f.docsFile == nil { - pickedName := path.Base(f.BaseFileName) + consts.DocsFileSuffix - sortedName := path.Base(f.BaseFileName) + consts.SdocsFileSuffix + pickedName := f.BaseFileName + consts.DocsFileSuffix + sortedName := f.BaseFileName + consts.SdocsFileSuffix - if ok, _ := f.s3cli.Exists(f.ctx, pickedName); !ok { + if ok, err := f.s3cli.Exists(f.ctx, pickedName); err != nil { + logger.Warn("failed to check docs existence", zap.String("key", pickedName), zap.Error(err)) + } else if !ok { pickedName = sortedName + if ok2, err2 := f.s3cli.Exists(f.ctx, pickedName); err2 != nil { + logger.Warn("failed to check sdocs existence", zap.String("key", pickedName), zap.Error(err2)) + } else if !ok2 { + logger.Warn("remote docs not found (neither .docs nor .sdocs)", zap.Strings("tried", []string{f.BaseFileName + consts.DocsFileSuffix, f.BaseFileName + consts.SdocsFileSuffix})) + } } f.docsFile = s3.NewReader(f.ctx, f.s3cli, pickedName) f.docsReader = storage.NewDocsReader(f.readLimiter, f.docsFile, f.docsCache) } }Also applies to: 203-209
frac/sealed.go (2)
56-60: UnexportState(follow-up to prior review).This was previously flagged and acknowledged. The state container is internal to sealed/remote fractions; exporting it expands the public surface without a compelling reason. Consider renaming to sealedState and keeping fields unexported (BlocksOffsets as well), with accessors if truly needed.
98-100: Remove unused baseFileName parameter from loadHeader (follow-up to prior review).baseFileName is not used inside loadHeader. This was noted before; please drop it and adjust call sites.
Apply in this file:
- f.info = loadHeader(f.BaseFileName, f.indexFile, f.indexReader) + f.info = loadHeader(f.indexFile, f.indexReader)-func loadHeader( - baseFileName string, - indexFile storage.ImmutableFile, - indexReader storage.IndexReader, -) *Info { +func loadHeader( + indexFile storage.ImmutableFile, + indexReader storage.IndexReader, +) *Info {Also update other call sites (e.g., frac/remote.go):
// before f.info = loadHeader(f.BaseFileName, f.indexFile, f.indexReader) // after f.info = loadHeader(f.indexFile, f.indexReader)Also applies to: 407-443
🧹 Nitpick comments (12)
Makefile (1)
46-46: Make test LOG_LEVEL overridable (don’t hardcode ERROR).Hardcoding LOG_LEVEL=ERROR prevents overriding via
make test LOG_LEVEL=INFO. Make it configurable while keeping ERROR as default.Apply this diff within the changed target:
- LOG_LEVEL=ERROR go test ./... -count 1 + LOG_LEVEL=${LOG_LEVEL} go test ./... -count 1Add this near the top to set a default (outside the changed hunk):
# At top-level (e.g., after TIME definition) LOG_LEVEL ?= ERRORfracmanager/config.go (1)
30-31: Decide: require OffloadingRetention > 0 when OffloadingEnabledShort: I inspected usages — fracmanager.removeStaleFractions treats retention <= 0 as “keep remote fractions alive” (no deletion). Enforcing retention > 0 will change behavior and may break existing configs. Please confirm intended semantics.
Files to check:
- fracmanager/config.go — OffloadingEnabled / OffloadingRetention and FillConfigWithDefault (proposed check location).
- fracmanager/fracmanager.go — removeStaleFractions: early return on retention <= 0 (keeps remote fractions forever).
- fracmanager/loader.go — respects OffloadingEnabled when loading remote fractions.
- cmd/seq-db/seq-db.go — maps cfg.Offloading.Retention → OffloadingRetention and creates S3 client when enabled.
If you want to require positive retention, apply this (inside FillConfigWithDefault before return):
func FillConfigWithDefault(config *Config) *Config { ... - return config + if config.OffloadingEnabled && config.OffloadingRetention <= 0 { + logger.Fatal( + "offloading misconfiguration: retention must be > 0 when offloading is enabled", + zap.Bool("offloading_enabled", config.OffloadingEnabled), + zap.Duration("offloading_retention", config.OffloadingRetention), + ) + } + return config }Recommendation: either enforce the check above (breaking change) or keep current behavior but document that retention <= 0 means “never delete remote fractions.” Which do you prefer?
config/config.go (1)
218-224: Optional: consider session token and path-style flagsMany deploys use temporary credentials and MinIO-compatible endpoints. Consider adding:
- SessionToken string
config:"session_token"- UsePathStyle bool
config:"use_path_style"metric/store.go (1)
290-303: Offloading metrics look good; consider adding bytes and standardizing status values
- Add OffloadingBytesTotal (counter) to observe transferred size.
- Standardize status values to {"success","failure"} for consistency.
config.example.yaml (2)
10-11: Tiny sizes are fine for demos; make it explicit in comments.1MiB/10MiB is sensible for quick local demos. Consider adding a short comment here noting these are intentionally tiny demo defaults to avoid confusion.
13-24: Offloading example is helpful; add region for AWS and note endpoint formatting.
- For AWS users, including region in the example cuts friction.
- Minor: some S3 SDKs are sensitive to trailing slashes in endpoints; if your client trims them, fine, otherwise consider removing it in the example.
Apply this minimal addition to improve AWS-compat guidance:
offloading: enabled: true retention: 5m endpoint: http://localhost:9000/ + region: us-east-1 # Required for AWS S3; ignored by local MinIO bucket: remote-storage access_key: minioadmin secret_key: minioadminfracmanager/fraction_provider.go (1)
72-85: Guard against nil S3 client when creating Remote fracs.NewRemote will eventually rely on s3cli; if it’s nil, failures will be late and opaque. Prefer failing fast.
Apply a defensive check:
func (fp *fractionProvider) NewRemote( ctx context.Context, name string, cachedInfo *frac.Info, s3cli *s3.Client, ) *frac.Remote { + if s3cli == nil { + panic("fracmanager.NewRemote: nil s3 client") + } return frac.NewRemote( ctx, name, fp.readLimiter, fp.cacheProvider.CreateIndexCache(), fp.cacheProvider.CreateDocBlockCache(), cachedInfo, fp.config, s3cli, ) }If panicking is undesirable in your codebase, alternatively return a sealed/local frac error path upstream or log-and-error from the caller before reaching here.
frac/fraction.go (1)
19-26: Clarify Offload contract (return bool semantics, idempotency) and consider narrower interface.Adding Offload to Fraction is fine, but please document what the bool indicates (performed vs no-op) and guarantee idempotency. Optionally, consider an Offloadable sub-interface implemented only by sealed/eligible fractions to prevent no-op exposure on Active.
Apply this doc tweak within the interface for clarity:
type Fraction interface { Info() *Info IsIntersecting(from seq.MID, to seq.MID) bool Contains(mid seq.MID) bool DataProvider(context.Context) (DataProvider, func()) - Offload(ctx context.Context, u storage.Uploader) (bool, error) + // Offload uploads the fraction's persisted data to remote storage. + // Return values: + // - bool: true if an offload was performed; false if not applicable (e.g., active) or already offloaded. + // - error: non-nil on failure. Implementations should be idempotent and safe to retry. + Offload(ctx context.Context, u storage.Uploader) (bool, error) Suicide() }fracmanager/fetcher_test.go (1)
63-64: Prefer t.Context() over context.TODO() for consistency and cancellation.Keeps both fetch calls bound to the test lifecycle.
-docs, err = fetcher.FetchDocs(context.TODO(), fm.GetFracs(FracTypeLocal), ids) +docs, err = fetcher.FetchDocs(t.Context(), fm.GetFracs(FracTypeLocal), ids)cmd/seq-db/seq-db.go (1)
303-321: Validate offloading configuration early (S3 client presence/connectivity)If Offloading.Enabled is true but the client can’t reach the bucket (or s3cli ends up nil elsewhere), downstream code may fail at use time. Consider performing a lightweight bucket check (e.g., HeadBucket) on startup and fail fast, or at least warn when Offloading is enabled but validation fails.
I can add a small validation step after NewClient (using HeadBucket/ListBuckets) if the s3 client exposes it. Want me to draft that change?
fracmanager/fracmanager_test.go (1)
90-91: Avoid reaching into fm.localFracs directly to assert countsAccessing internal fields makes tests brittle against refactors. Prefer asserting via the public API (e.g., GetFracs(FracTypeLocal) or a helper) for size/count checks.
Do you want a small test helper that derives sealed-local count from GetFracs to decouple from internals?
fracmanager/fracmanager.go (1)
282-286: Typo in log message: "successully" → "successfully".Minor, but shows up in user-facing logs.
- logger.Info( - "successully offloaded fraction", + logger.Info( + "successfully offloaded fraction",
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these settings in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (27)
Makefile(1 hunks)cmd/seq-db/seq-db.go(3 hunks)config.example.yaml(1 hunks)config/config.go(1 hunks)consts/consts.go(1 hunks)frac/active.go(1 hunks)frac/fraction.go(2 hunks)frac/remote.go(1 hunks)frac/sealed.go(9 hunks)frac/sealed_loader.go(1 hunks)fracmanager/async_searcher.go(2 hunks)fracmanager/config.go(1 hunks)fracmanager/fetcher_test.go(3 hunks)fracmanager/fracmanager.go(12 hunks)fracmanager/fracmanager_test.go(8 hunks)fracmanager/fraction_provider.go(2 hunks)fracmanager/loader.go(7 hunks)fracmanager/proxy_frac.go(2 hunks)fracmanager/sealed_frac_cache_test.go(7 hunks)go.mod(3 hunks)metric/store.go(2 hunks)storeapi/grpc_fetch.go(2 hunks)storeapi/grpc_search.go(2 hunks)storeapi/grpc_v1_test.go(1 hunks)storeapi/store.go(2 hunks)tests/setup/env.go(1 hunks)util/fs.go(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (20)
fracmanager/fraction_provider.go (3)
frac/remote.go (2)
NewRemote(60-97)Remote(32-58)frac/info.go (1)
Info(22-45)storage/s3/client.go (1)
Client(16-19)
util/fs.go (1)
logger/logger.go (1)
Panic(78-80)
storeapi/store.go (2)
storage/s3/client.go (1)
Client(16-19)fracmanager/fracmanager.go (2)
NewFracManager(71-106)FracManager(34-60)
frac/active.go (1)
storage/io.go (1)
Uploader(27-29)
storeapi/grpc_fetch.go (1)
fracmanager/fracmanager.go (2)
FracTypeLocal(304-304)FracTypeRemote(305-305)
frac/fraction.go (1)
storage/io.go (1)
Uploader(27-29)
fracmanager/async_searcher.go (1)
fracmanager/fracmanager.go (2)
FracTypeLocal(304-304)FracTypeRemote(305-305)
frac/sealed_loader.go (6)
frac/sealed/lids/loader.go (1)
Loader(16-21)frac/sealed/seqids/loader.go (1)
Loader(28-35)frac/sealed.go (1)
State(56-60)frac/info.go (1)
Info(22-45)storage/index_reader.go (1)
IndexReader(13-20)logger/logger.go (1)
Fatal(82-84)
tests/setup/env.go (1)
storeapi/store.go (1)
NewStore(49-69)
frac/remote.go (14)
frac/fraction.go (2)
Fraction(19-26)DataProvider(14-17)storage/io.go (2)
ImmutableFile(16-25)Uploader(27-29)storage/docs_reader.go (2)
DocsReader(11-14)NewDocsReader(16-21)frac/index_cache.go (1)
IndexCache(10-18)storage/index_reader.go (2)
IndexReader(13-20)NewIndexReader(22-32)frac/sealed.go (1)
State(56-60)storage/s3/client.go (1)
Client(16-19)storage/read_limiter.go (1)
ReadLimiter(9-12)seq/seq.go (1)
MID(17-17)util/fs.go (1)
MustRemoveFileByPath(30-38)consts/consts.go (4)
RemoteFractionSuffix(64-64)DocsFileSuffix(53-53)SdocsFileSuffix(56-56)IndexFileSuffix(60-60)frac/sealed/token/block_loader.go (1)
NewBlockLoader(72-78)frac/sealed/token/table_loader.go (1)
NewTableLoader(24-30)storage/s3/reader.go (1)
NewReader(34-36)
fracmanager/sealed_frac_cache_test.go (2)
fracmanager/config.go (1)
Config(14-32)frac/config.go (1)
Config(3-8)
storeapi/grpc_search.go (1)
fracmanager/fracmanager.go (2)
FracTypeLocal(304-304)FracTypeRemote(305-305)
cmd/seq-db/seq-db.go (3)
storage/s3/client.go (1)
Client(16-19)logger/logger.go (2)
Fatal(82-84)Error(74-76)storeapi/store.go (1)
NewStore(49-69)
storeapi/grpc_v1_test.go (1)
fracmanager/fracmanager.go (1)
NewFracManager(71-106)
fracmanager/loader.go (7)
storage/s3/client.go (1)
Client(16-19)config/config.go (1)
Config(39-241)fracmanager/sealed_frac_cache.go (1)
NewFracCacheFromDisk(45-49)consts/consts.go (2)
FracCacheFileSuffix(66-66)RemoteFractionSuffix(64-64)frac/info.go (1)
Info(22-45)logger/logger.go (1)
Info(66-68)frac/remote.go (2)
Remote(32-58)NewRemote(60-97)
fracmanager/proxy_frac.go (1)
storage/io.go (1)
Uploader(27-29)
fracmanager/fracmanager.go (8)
fracmanager/config.go (1)
Config(14-32)frac/info.go (1)
Info(22-45)frac/fraction.go (1)
Fraction(19-26)storage/s3/uploader.go (1)
NewUploader(24-26)storage/io.go (1)
TypeRemote(13-13)frac/remote.go (1)
NewRemote(60-97)fracmanager/list.go (1)
List(10-10)fracmanager/loader.go (1)
NewLoader(43-53)
fracmanager/fracmanager_test.go (2)
fracmanager/fracmanager.go (3)
FracManager(34-60)NewFracManager(71-106)FracTypeLocal(304-304)frac/sealed.go (2)
Sealed(28-54)HalfRename(66-66)
fracmanager/fetcher_test.go (1)
fracmanager/fracmanager.go (1)
FracTypeLocal(304-304)
frac/sealed.go (8)
frac/sealed/seqids/loader.go (4)
Table(13-18)Table(20-22)Table(24-26)Loader(28-35)storage/io.go (3)
TypeLocal(12-12)Uploader(27-29)ImmutableFile(16-25)frac/sealed_loader.go (1)
Loader(18-22)consts/consts.go (1)
RemoteFractionSuffix(64-64)frac/sealed/seqids/provider.go (1)
NewProvider(18-40)storage/index_reader.go (1)
IndexReader(13-20)frac/info.go (1)
Info(22-45)frac/block_info.go (1)
BlockInfo(14-16)
🔇 Additional comments (14)
util/fs.go (1)
30-38: Good: safe file removal helper (ignores not-exist).The helper’s semantics align with the Must* pattern and avoids noisy panics for missing files. Looks solid.
frac/active.go (1)
343-349: Explicit no-op Offload on Active is appropriate.Clear docs + benign return. This keeps callers uniform while constraining offloading to sealed fracs.
fracmanager/async_searcher.go (1)
119-124: LGTM: search considers Local|Remote fracs.Using GetFracs(FracTypeLocal|FracTypeRemote) before range filtering matches the offloading model.
go.mod (2)
3-3: Confirm CI/tooling supports Go 1.24Short check: go.mod specifies
go 1.24; local verification showsgo version go1.24.1 linux/amd64. CI/tooling may still lag — please confirm your CI images/build agents or Dockerfiles support Go 1.24, otherwise pin go.mod to a supported version.Files/locations to check:
- go.mod — line 3:
go 1.24- CI configs/build images to verify: .github/workflows/*, .circleci/config.yml, .gitlab-ci.yml, Dockerfile(s)
Verified output (from sandbox):
go version go1.24.1 linux/amd64
9-13: AWS SDK v2 mixed-versions — no presign/advanced S3 usage found; please confirm buildQuick findings:
- go.mod shows: aws-sdk-go-v2 v1.36.5, config v1.29.17, credentials v1.17.70, feature/s3/manager v1.17.84, service/s3 v1.84.0 (indirects include internal/presigned-url v1.12.17, internal/s3shared v1.18.17).
- S3 call sites found:
- storage/s3/uploader.go — manager.NewUploader + manager.Upload (PutObject)
- storage/s3/reader.go — GetObject, HeadObject, range reads
- storage/s3/client.go — s3.NewFromConfig, HeadObject, DeleteObjects (sets DisableLogOutputChecksumValidationSkipped)
- No presign/Presigned/Presign/v4a/checksum API usage was found in the repo.
Action: this mix looks harmless for the current usage, but I can't run a build here — please run CI/go build to confirm the set compiles and behaves as expected (and pin compatible versions if you plan to use presigned URLs, v4a or checksum features).
frac/remote.go (1)
147-154: Confirm cache ownership before releasing on Suicide
f.docsCache.Release()andf.indexCache.Release()will clear caches that look store-wide/shared. If shared, this could impact other fractions using the same caches.If caches are store-scoped and shared, prefer evicting fraction-specific entries (by key prefix) rather than releasing the entire cache.
consts/consts.go (1)
64-65: LGTM:RemoteFractionSuffixClear and consistent with other suffix constants.
storeapi/grpc_search.go (1)
21-21: Search mask change is consistent; consider centralizing the mask and confirm active frac inclusion.Passing Local|Remote to SearchDocs keeps behavior consistent with Fetch. Please verify that active fracs are represented by Local; if not, search could miss hot data. To reduce duplication, extract this combined mask to a shared constant or helper to keep fetch/search aligned.
Use the same script shared in grpc_fetch.go to confirm FracType coverage and GetFracs semantics.
Also applies to: 139-143
storeapi/grpc_v1_test.go (1)
70-76: Constructor update OK — t.Context() supported (Go 1.24).go.mod sets "go 1.24" and testing.T.Context was introduced in Go 1.24, so no fallback to context.Background() is needed.
- Verified usage at: storeapi/grpc_v1_test.go:70 (also used across fracmanager tests and integration_tests).
frac/fraction.go (1)
11-11: All Fraction implementations define Offload — verifiedFound implementations for the interface method and no missing wrappers:
- frac/fraction.go — interface includes Offload (line ~24)
- frac/active.go — func (f *Active) Offload(...) (line 347)
- frac/sealed.go — func (f *Sealed) Offload(...) (line 212)
- frac/remote.go — func (f *Remote) Offload(...) (line 136)
- fracmanager/proxy_frac.go — func (f *proxyFrac) Offload(...) (line 162) — forwards to f.cur().Offload
No further action required.
storeapi/store.go (1)
49-49: No remaining 3-arg NewStore calls — all call sites use the 4-arg signature.Confirmed via repo search; discovered instances:
- storeapi/store.go:49 — func NewStore(ctx context.Context, c StoreConfig, s3cli *s3.Client, mappingProvider MappingProvider)
- tests/setup/env.go:225 — store, err := storeapi.NewStore(context.Background(), confs[i], nil, mappingProvider)
- cmd/seq-db/seq-db.go:323 — store, err := storeapi.NewStore(ctx, sconfig, s3cli, mp)
No 3-argument invocations found.
tests/setup/env.go (1)
225-235: OK to pass nil S3 client in tests — offloading not enabledVerified: tests/setup/env.go calls storeapi.NewStore(..., nil, ...). fracmanager.Config has OffloadingEnabled (defaults false) and a search found no tests that set OffloadingEnabled = true. No action required now.
Files checked:
- tests/setup/env.go — MakeStores passes nil S3 client to NewStore
- fracmanager/config.go — OffloadingEnabled bool (defaults to false)
- storeapi/store.go — NewStore accepts s3 client and forwards frac manager config
- cmd/seq-db/seq-db.go — real binary wires OffloadingEnabled (not used in tests)
If you plan to enable offloading in tests later, either set OffloadingEnabled=false in the test config or inject a test s3.Client when calling NewStore.
fracmanager/fetcher_test.go (1)
41-41: LGTM: use of t.Context() and filtered fracs improves determinism and clarity.Using GetFracs(FracTypeLocal) makes intent explicit and future-proofs against remote fracs.
fracmanager/loader.go (1)
76-91: Verify intended duplication: remote + local entries for the same fractionWhen hasRemote and hasSdocs+hasIndex are both true, you append both a remote and a local sealed frac. If consumers ever union these lists, you may double-process the same logical fraction.
Would you like me to scan usages of loader.load() results and confirm that local/remote lists are never combined without dedup?
910664d to
c797db4
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #60 +/- ##
==========================================
- Coverage 71.45% 70.02% -1.43%
==========================================
Files 199 200 +1
Lines 17559 17986 +427
==========================================
+ Hits 12547 12595 +48
- Misses 4322 4698 +376
- Partials 690 693 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
fracmanager/sealed_frac_cache_test.go (1)
274-281: Add cleanup to avoid leaking background goroutines in this test.newFracManagerWithBackgroundStart likely spins up background workers; this test never stops fm. Register a cleanup so goroutines don’t leak across tests.
fm, err := newFracManagerWithBackgroundStart(t.Context(), &Config{ FracSize: 100, TotalSize: maxSize * 2, ShouldReplay: false, DataDir: dataDir, }) assert.NoError(t, err) +// Ensure background workers are stopped even if the test fails midway. +t.Cleanup(func() { if fm != nil { fm.Stop() } })
♻️ Duplicate comments (10)
fracmanager/sealed_frac_cache_test.go (3)
390-394: Wait for maintenance goroutines before Stop to avoid races and flakiness.You trigger one-off maintenance and immediately Stop. If Stop doesn’t guarantee completion of those maintenance tasks, assertions below can see partially-applied shrink/cleanup. Prior review already flagged this exact race.
sealWG := sync.WaitGroup{} suicideWG := sync.WaitGroup{} fm.maintenance(&sealWG, &suicideWG) // shrinkSizes should be called -fm.Stop() +sealWG.Wait() +suicideWG.Wait() +fm.Stop()Run this to confirm Stop’s semantics relative to maintenance:
#!/bin/bash set -euo pipefail # Locate Stop() implementation rg -nC3 -g '!**/vendor/**' -P 'func\s+\(\s*\*?FracManager\s*\)\s+Stop\s*\(' # Show maintenance implementation to see how WaitGroups are used rg -nC5 -g '!**/vendor/**' -P 'func\s+\(\s*\*?FracManager\s*\)\s+maintenance\s*\(' # Check if Stop or related shutdown code calls Wait() on any maintenance WGs rg -nC2 -g '!**/vendor/**' --type go -P '\bwg\.Wait\(\)|Wait\(\)\s*$'
436-440: Same here: wait for maintenance completion before stopping.Avoids non-determinism when syncing in-memory cache to disk.
sealWG := sync.WaitGroup{} suicideWG := sync.WaitGroup{} fm.maintenance(&sealWG, &suicideWG) // shrinkSizes should be called -fm.Stop() +sealWG.Wait() +suicideWG.Wait() +fm.Stop()
461-465: Also wait on fm2’s maintenance WGs before Stop.This was previously noted (and echoed by forshev); still needed to make file cache verification deterministic.
sealWG2 := sync.WaitGroup{} suicideWG2 := sync.WaitGroup{} fm2.maintenance(&sealWG2, &suicideWG2) // shrinkSizes should be called -fm2.Stop() +sealWG2.Wait() +suicideWG2.Wait() +fm2.Stop()fracmanager/proxy_frac.go (1)
162-173: Fix race and nil-target panic in Offload (select cur once; guard suicided).Between unlocking and calling f.cur(), state can flip (active/sealed cleared), returning nil and panicking on Offload. Also handle suicided explicitly.
Apply this diff:
func (f *proxyFrac) Offload(ctx context.Context, u storage.Uploader) (bool, error) { - f.useMu.RLock() - - if f.isSealingState() { - f.useMu.RUnlock() - f.sealWg.Wait() - return f.cur().Offload(ctx, u) - } - - f.useMu.RUnlock() - return f.cur().Offload(ctx, u) + f.useMu.RLock() + sealing := f.isSealingState() + f.useMu.RUnlock() + + if sealing { + f.sealWg.Wait() + } + + cur := f.cur() + if cur == nil { // suicided or torn down + return false, errors.New("fraction is not available (suicided)") + } + return cur.Offload(ctx, u) }fracmanager/fetcher_test.go (1)
25-26: Defer fm.Stop() to avoid leaking background goroutines on early assertion failures.Ensures cleanup runs even if a later assert fails.
- fm, err := newFracManagerWithBackgroundStart(t.Context(), config) - assert.NoError(t, err) + fm, err := newFracManagerWithBackgroundStart(t.Context(), config) + assert.NoError(t, err) + defer fm.Stop()fracmanager/loader.go (2)
43-53: Fail fast when offloading enabled but S3 client is nil.Avoids nil deref later in loadRemoteFrac/NewRemote.
func NewLoader( config *Config, s3cli *s3.Client, fracProvider *fractionProvider, fracCache *sealedFracCache, ) *loader { + if config != nil && config.OffloadingEnabled && s3cli == nil { + logger.Fatal("offloading is enabled but S3 client is not provided") + } return &loader{ s3cli: s3cli, config: config, fracProvider: fracProvider, fracCache: fracCache, } }
184-187: Don’t skip cleanup of partially deleted files when .remote exists; warn on inconsistent config.Keep disk tidy and surface misconfigurations.
- if info.hasRemote { - infoList = append(infoList, info) - continue - } + if info.hasRemote { + // still clean up partially deleted local files, if any + if info.hasDocsDel || info.hasIndexDel || info.hasSdocsDel { + logger.Info("cleaning up partially deleted fraction files", zap.String("file", info.base)) + removeFractionFiles(info.base) + } + if !l.config.OffloadingEnabled { + logger.Warn("found remote fraction while offloading is disabled; ignoring remote part", + zap.String("file", info.base)) + } + infoList = append(infoList, info) + continue + }frac/sealed.go (2)
98-101: Remove unused parameter baseFileName from loadHeader; update call site.The argument isn’t used; simplify the API.
- f.info = loadHeader(f.BaseFileName, f.indexFile, f.indexReader) + f.info = loadHeader(f.indexFile, f.indexReader)-func loadHeader( - baseFileName string, - indexFile storage.ImmutableFile, - indexReader storage.IndexReader, -) *Info { +func loadHeader( + indexFile storage.ImmutableFile, + indexReader storage.IndexReader, +) *Info {Follow-up: adjust other call sites (e.g., frac/remote.go: NewRemote) to drop the first argument as well.
Also applies to: 407-444
210-239: Fix FD leak and confusing bool semantics in Offload.Close the marker file (use WriteFile) and return false on error.
func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error) { f.useMu.Lock() defer f.useMu.Unlock() g, gctx := errgroup.WithContext(ctx) g.Go(func() error { f.openDocs() return u.Upload(gctx, f.docsFile) }) g.Go(func() error { f.openIndex() return u.Upload(gctx, f.indexFile) }) if err := g.Wait(); err != nil { - return true, err + return false, err } remoteFracName := f.BaseFileName + consts.RemoteFractionSuffix - if _, err := os.Create(remoteFracName); err != nil { - return true, err - } + // Create a zero-byte marker file without leaking an FD. + if err := os.WriteFile(remoteFracName, nil, 0o644); err != nil { + return false, err + } return true, nil }fracmanager/fracmanager.go (1)
246-306: Fix goroutine loop variable capture (outsider) — offloads may target the wrong fraction.The closure captures
outsiderby reference. By the time goroutines run, they may all see the last element. Rebind or pass as a parameter. This was flagged earlier and is still unresolved.Apply this diff:
- for _, outsider := range outsiders { - cleanupWg.Add(1) - go func() { + for _, outsider := range outsiders { + o := outsider + cleanupWg.Add(1) + go func(o frac.Fraction) { defer cleanupWg.Done() - info := outsider.Info() + info := o.Info() // Client for offloading is not configured. // So just kill fraction. if fm.s3cli == nil { fm.fracCache.RemoveFraction(info.Name()) - outsider.Suicide() + o.Suicide() return } offloadStart := time.Now() - mustBeOffloaded, err := outsider.Offload(fm.ctx, s3.NewUploader(fm.s3cli)) + mustBeOffloaded, err := o.Offload(fm.ctx, s3.NewUploader(fm.s3cli)) if err != nil { // While searching for outsiders we removed this fraction from list of local fractions. // Now we need to return it back and try again to offload it later. fm.fracMu.Lock() - fm.localFracs = append(fm.localFracs, &fracRef{outsider}) + fm.localFracs = append(fm.localFracs, &fracRef{o}) fm.fracMu.Unlock() metric.OffloadingTotal.WithLabelValues("failure").Inc() metric.OffloadingDurationSeconds.Observe(float64(time.Since(offloadStart).Seconds())) logger.Error( "will skip fraction suicide: failed to offload fraction", zap.String("fraction", info.Name()), zap.Error(err), ) return } if !mustBeOffloaded { fm.fracCache.RemoveFraction(info.Name()) - outsider.Suicide() + o.Suicide() return } metric.OffloadingTotal.WithLabelValues("success").Inc() metric.OffloadingDurationSeconds.Observe(float64(time.Since(offloadStart).Seconds())) logger.Info( - "successully offloaded fraction", + "successfully offloaded fraction", zap.String("fraction", info.Name()), zap.String("took", time.Since(offloadStart).String()), ) // Update storage type for offloaded fraction. info.StorageType = storage.TypeRemote remote := fm.fracProvider.NewRemote(fm.ctx, info.Path, info, fm.s3cli) fm.fracMu.Lock() fm.remoteFracs = append(fm.remoteFracs, &fracRef{remote}) fm.fracMu.Unlock() - outsider.Suicide() - }() + o.Suicide() + }(o) }
🧹 Nitpick comments (9)
storeapi/grpc_async_search.go (1)
49-49: Typed frac selection is correct; consider centralizing the maskLocal|Remote is repeated across call sites. Introduce a constant (e.g., FracTypeQueryable) in fracmanager to avoid drift.
In fracmanager (outside this file):
+// In fracmanager/fracmanager.go +const FracTypeQueryable = FracTypeLocal | FracTypeRemoteThen here:
- fracs := g.fracManager.GetFracs(fracmanager.FracTypeLocal|fracmanager.FracTypeRemote).FilterInRange(seq.MID(r.From), seq.MID(r.To)) + fracs := g.fracManager.GetFracs(fracmanager.FracTypeQueryable). + FilterInRange(seq.MID(r.From), seq.MID(r.To))storeapi/store.go (1)
15-16: Minor: alias storage/s3 import to avoid confusion with AWS s3 clientHelps readers differentiate our wrapper from github.com/aws/aws-sdk-go-v2/service/s3.
- "github.com/ozontech/seq-db/storage/s3" + storageS3 "github.com/ozontech/seq-db/storage/s3" @@ -func NewStore(ctx context.Context, c StoreConfig, s3cli *s3.Client, mappingProvider MappingProvider) (*Store, error) { +func NewStore(ctx context.Context, c StoreConfig, s3cli *storageS3.Client, mappingProvider MappingProvider) (*Store, error) { @@ - fracManager := fracmanager.NewFracManager(ctx, &c.FracManager, s3cli) + fracManager := fracmanager.NewFracManager(ctx, &c.FracManager, s3cli)Also applies to: 49-49, 54-55
fracmanager/fracmanager_test.go (2)
53-61: Use defer fm.Stop() immediately after creation in TestCleanUp.Prevents leaks if an earlier assert fails; also drop the later explicit Stop().
- fm, err := newFracManagerWithBackgroundStart(t.Context(), &Config{ + fm, err := newFracManagerWithBackgroundStart(t.Context(), &Config{ FracSize: 1000, TotalSize: 100000, ShouldReplay: false, DataDir: dataDir, }) assert.NoError(t, err) + defer fm.Stop() @@ - fm.Stop() + // fm stopped by defer aboveAlso applies to: 74-76
98-110: Defer provider stop in launchAndCheck to ensure cleanup on failures.If checkFn asserts, cleanup is skipped; defer avoids that.
launchAndCheck := func(checkFn func(fm *FracManager)) { fm := NewFracManager(context.Background(), &Config{ FracSize: 500, TotalSize: 5000, ShouldReplay: false, DataDir: dataDir, }, nil) assert.NoError(t, fm.Load(context.Background())) + defer fm.fracProvider.Stop() checkFn(fm) - fm.fracProvider.Stop() }frac/sealed.go (1)
213-229: Consider narrowing lock scope to reduce write-lock contention during uploads.Holding useMu.Lock for the entire upload blocks readers for the duration of S3 I/O. If safe, switch to RLock or lock only around lazy-open and state checks, then release before starting errgroup.
fracmanager/fracmanager.go (4)
290-294: Typo in log message.“successully offloaded fraction” → “successfully offloaded fraction”.
- logger.Info( - "successully offloaded fraction", + logger.Info( + "successfully offloaded fraction",
183-190: Retention clock source: CreationTime vs SealingTime.TTL for remote fractions is often measured from when data became immutable. Consider using
SealingTimeinstead ofCreationTime, or document the intent.If you decide to switch:
- ct := time.UnixMilli(int64(f.instance.Info().CreationTime)) + ct := time.UnixMilli(int64(f.instance.Info().SealingTime))Optionally expose a config knob if both semantics are desired.
219-221: Remove unused slice reslicing in determineOutsiders.
localFracs = localFracs[1:]is not used later. Simplify to avoid confusion.- localFracs := fm.GetFracs(FracTypeLocal) - occupiedSize := localFracs.GetTotalSize() + occupiedSize := fm.GetFracs(FracTypeLocal).GetTotalSize() ... - localFracs = localFracs[1:]Also applies to: 228-231
79-79: Defend against a nil ctx in NewFracManager.If a caller passes nil,
Start()’scontext.WithCancel(fm.ctx)will panic. Default tocontext.Background().func NewFracManager(ctx context.Context, cfg *Config, s3cli *s3.Client) *FracManager { FillConfigWithDefault(cfg) + if ctx == nil { + ctx = context.Background() + }Also applies to: 102-106
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (28)
Makefile(1 hunks)cmd/seq-db/seq-db.go(3 hunks)config.example.yaml(1 hunks)config/config.go(1 hunks)consts/consts.go(1 hunks)frac/active.go(1 hunks)frac/fraction.go(2 hunks)frac/remote.go(1 hunks)frac/sealed.go(9 hunks)frac/sealed_loader.go(1 hunks)fracmanager/config.go(1 hunks)fracmanager/fetcher_test.go(3 hunks)fracmanager/fracmanager.go(12 hunks)fracmanager/fracmanager_test.go(8 hunks)fracmanager/fraction_provider.go(2 hunks)fracmanager/loader.go(8 hunks)fracmanager/proxy_frac.go(2 hunks)fracmanager/sealed_frac_cache_test.go(7 hunks)go.mod(3 hunks)metric/store.go(2 hunks)storeapi/grpc_async_search.go(1 hunks)storeapi/grpc_fetch.go(2 hunks)storeapi/grpc_search.go(2 hunks)storeapi/grpc_v1.go(1 hunks)storeapi/grpc_v1_test.go(1 hunks)storeapi/store.go(2 hunks)tests/setup/env.go(1 hunks)util/fs.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (16)
- consts/consts.go
- fracmanager/config.go
- Makefile
- storeapi/grpc_fetch.go
- storeapi/grpc_search.go
- frac/fraction.go
- tests/setup/env.go
- frac/sealed_loader.go
- metric/store.go
- frac/active.go
- frac/remote.go
- fracmanager/fraction_provider.go
- storeapi/grpc_v1_test.go
- config.example.yaml
- config/config.go
- cmd/seq-db/seq-db.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-05T10:52:04.737Z
Learnt from: forshev
PR: ozontech/seq-db#3
File: api/seqproxyapi/v1/seq_proxy_api.proto:88-90
Timestamp: 2025-08-05T10:52:04.737Z
Learning: In the seq-db project, the FetchAsyncSearchResult RPC method in api/seqproxyapi/v1/seq_proxy_api.proto was intentionally changed from HTTP GET to POST during development. This breaking change is acceptable because the async search feature is under active development and not yet in production use.
Applied to files:
storeapi/grpc_v1.go
🧬 Code graph analysis (11)
storeapi/grpc_async_search.go (2)
fracmanager/fracmanager.go (2)
FracTypeLocal(312-312)FracTypeRemote(313-313)seq/seq.go (1)
MID(17-17)
storeapi/grpc_v1.go (2)
fracmanager/async_searcher.go (1)
MustStartAsync(95-130)fracmanager/fracmanager.go (2)
FracTypeLocal(312-312)FracTypeRemote(313-313)
storeapi/store.go (2)
storage/s3/client.go (1)
Client(16-19)fracmanager/fracmanager.go (2)
NewFracManager(79-114)FracManager(34-60)
fracmanager/fracmanager_test.go (3)
fracmanager/config.go (1)
Config(14-32)fracmanager/fracmanager.go (3)
FracManager(34-60)NewFracManager(79-114)FracTypeLocal(312-312)frac/sealed.go (1)
Sealed(28-54)
util/fs.go (1)
logger/logger.go (1)
Panic(78-80)
frac/sealed.go (6)
storage/io.go (3)
TypeLocal(12-12)Uploader(27-29)ImmutableFile(16-25)consts/consts.go (1)
RemoteFractionSuffix(64-64)storage/index_reader.go (1)
IndexReader(13-20)frac/info.go (1)
Info(22-45)logger/logger.go (2)
Info(66-68)Fatal(82-84)frac/block_info.go (1)
BlockInfo(14-16)
fracmanager/proxy_frac.go (1)
storage/io.go (1)
Uploader(27-29)
fracmanager/sealed_frac_cache_test.go (1)
fracmanager/config.go (1)
Config(14-32)
fracmanager/loader.go (6)
storage/s3/client.go (1)
Client(16-19)fracmanager/config.go (1)
Config(14-32)frac/remote.go (2)
Remote(32-58)NewRemote(60-97)fracmanager/sealed_frac_cache.go (1)
NewFracCacheFromDisk(45-49)consts/consts.go (4)
FracCacheFileSuffix(66-66)MetaFileSuffix(51-51)DocsFileSuffix(53-53)RemoteFractionSuffix(64-64)frac/info.go (1)
Info(22-45)
fracmanager/fracmanager.go (10)
config/config.go (1)
Config(48-252)storage/s3/client.go (1)
Client(16-19)frac/info.go (1)
Info(22-45)frac/fraction.go (1)
Fraction(19-26)storage/s3/uploader.go (1)
NewUploader(24-26)metric/store.go (3)
OffloadingTotal(290-295)OffloadingDurationSeconds(296-302)DataSizeTotal(115-120)storage/io.go (1)
TypeRemote(13-13)frac/remote.go (1)
NewRemote(60-97)fracmanager/list.go (1)
List(10-10)fracmanager/loader.go (1)
NewLoader(43-53)
fracmanager/fetcher_test.go (1)
fracmanager/fracmanager.go (1)
FracTypeLocal(312-312)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (5)
fracmanager/sealed_frac_cache_test.go (1)
274-280: LGTM: passing t.Context() is the right move.Hooking the frac manager’s lifecycle to the test context will respect test timeouts and cancellations.
Also applies to: 358-364, 417-423, 453-459
util/fs.go (1)
30-38: Safer remove helper for marker files — LGTMIgnoring os.ErrNotExist while panicking on other errors is exactly what we want for idempotent marker cleanup in offloading flows.
go.mod (1)
9-13: AWS SDK v2 versions and Go 1.24 verified
- CI is pinned to Go 1.24 via actions/setup-go@v5 (go-version: '1.24' in .github/workflows/ci.yml and release.yml).
- All aws-sdk-go-v2 submodules resolve to the intended versions (v1.36.5, config v1.29.17, credentials v1.17.70, feature/s3/manager v1.17.84, service/s3 v1.84.0).
- No mixed minor versions detected in the module graph.
go mod tidyis a no-op (go.mod/go.sum unchanged).No further action needed.
fracmanager/fracmanager.go (2)
309-340: Nice API split for local vs remote fractions.Bitmask-based
FracTypeandGetFracskeep the call sites clean and extensible (e.g., future archive tiers).
296-303: No need to re-add the entry—mutating the pointer is enoughsealedFracCache’s internal map is
map[string]*frac.Info, so it holds pointers to yourinfoobject. Updatinginfo.StorageTypemutates the same pointer in the cache, and when you callSyncWithDiskit will JSON-marshal the current map, including your updatedStorageType. You don’t need to invokeAddFractionagain.Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
fracmanager/fracmanager.go (1)
79-114: Fail fast if offloading is enabled without an S3 clientThis will panic at runtime in cleanup/offload paths (nil client to uploader). Validate at construction time.
func NewFracManager(ctx context.Context, cfg *Config, s3cli *s3.Client) *FracManager { FillConfigWithDefault(cfg) + if cfg.OffloadingEnabled && s3cli == nil { + logger.Fatal("offloading is enabled but S3 client is not provided") + } + cacheMaintainer := NewCacheMaintainer(cfg.CacheSize, cfg.SortCacheSize, &CacheMaintainerMetrics{
♻️ Duplicate comments (8)
frac/sealed.go (1)
210-242: Fix Offload’s error semantics: return false on any errorReturning true when g.Wait or os.Create fails misleads callers into treating failed offloads as successes.
Apply:
func (f *Sealed) Offload(ctx context.Context, u storage.Uploader) (bool, error) { @@ - if err := g.Wait(); err != nil { - return true, err - } + if err := g.Wait(); err != nil { + return false, err + } @@ - file, err := os.Create(remoteFracName) + file, err := os.Create(remoteFracName) if err != nil { - return true, err + return false, err } defer file.Close() @@ util.MustSyncPath(filepath.Dir(remoteFracName)) return true, nil }fracmanager/fetcher_test.go (1)
25-26: Add defer fm.Stop() to avoid background goroutine leaksEnsure cleanup even on assertion failures.
fm, err := newFracManagerWithBackgroundStart(t.Context(), config) assert.NoError(t, err) +defer fm.Stop()frac/remote.go (3)
86-89: Set StorageType on the fast pathWhen cached info is provided, StorageType remains unset; remote fracs may be misclassified.
- if info != nil && info.IndexOnDisk > 0 { - return f - } + if info != nil && info.IndexOnDisk > 0 { + f.info.StorageType = storage.TypeRemote + return f + }
135-137: Don’t panic in Offload; return an errorA panic can take the whole process down.
-import ( +import ( "context" + "errors" "path" "sync" @@ -func (f *Remote) Offload(context.Context, storage.Uploader) (bool, error) { - panic("BUG: remote fraction cannot be offloaded") -} +func (f *Remote) Offload(context.Context, storage.Uploader) (bool, error) { + return false, errors.New("remote fraction cannot be offloaded") +}
149-153: Avoid S3 key collisions: stop truncating to path.BaseUsing only the basename risks overwriting different fracs that share a filename under different dirs. Use full keys (or an explicit prefix) consistently across read/delete.
Apply:
@@ - files := []string{ - path.Base(f.BaseFileName) + consts.DocsFileSuffix, - path.Base(f.BaseFileName) + consts.SdocsFileSuffix, - path.Base(f.BaseFileName) + consts.IndexFileSuffix, - } + files := []string{ + f.BaseFileName + consts.DocsFileSuffix, + f.BaseFileName + consts.SdocsFileSuffix, + f.BaseFileName + consts.IndexFileSuffix, + } @@ - name := path.Base(f.BaseFileName) + consts.IndexFileSuffix + name := f.BaseFileName + consts.IndexFileSuffix @@ - pickedName := path.Base(f.BaseFileName) + consts.DocsFileSuffix - sortedName := path.Base(f.BaseFileName) + consts.SdocsFileSuffix + pickedName := f.BaseFileName + consts.DocsFileSuffix + sortedName := f.BaseFileName + consts.SdocsFileSuffixFollow-up: verify the uploader uses the same key scheme to avoid read/write mismatches.
#!/bin/bash # Inspect S3 interactions and key formation to ensure consistency with Remote. rg -nP -C3 --type=go 'Upload\s*\(' rg -nP -C2 --type=go 'NewReader\(\s*context\.|Exists\(\s*.*?,\s*[^)]*\)' rg -nP -C2 --type=go 'path\.Base\('Also applies to: 202-208, 210-221
fracmanager/loader.go (2)
180-184: Don’t skip cleanup for partially deleted files when .remote existsEarly continue for hasRemote bypasses the .del cleanup path, leaving trash on disk. Clean first, and optionally warn if offloading is disabled.
- if info.hasRemote { - infoList = append(infoList, info) - continue - } + if info.hasRemote { + // still clean up partially deleted local files, if any + if info.hasDocsDel || info.hasIndexDel || info.hasSdocsDel { + logger.Info("cleaning up partially deleted fraction files", zap.String("file", info.base)) + removeFractionFiles(info.base) + } + if !l.config.OffloadingEnabled { + logger.Warn("found remote fraction while offloading is disabled; ignoring remote part", + zap.String("file", info.base)) + } + infoList = append(infoList, info) + continue + }
72-76: Prevent fall-through for remote fractions; explicitly handle OffloadingEnabled=falseRight now remote entries can be appended to both remote and sealed slices (no continue), and when OffloadingEnabled=false they silently drop out. Short-circuit after handling remote and warn when offloading is disabled to make behavior explicit and avoid duplicates.
- if l.config.OffloadingEnabled && info.hasRemote { - remote = append(remote, l.loadRemoteFrac(ctx, diskFracCache, info)) - } + if info.hasRemote { + if l.config.OffloadingEnabled { + remote = append(remote, l.loadRemoteFrac(ctx, diskFracCache, info)) + } else { + logger.Warn("remote fraction found but offloading is disabled; skipping", + zap.String("file", info.base)) + } + continue + } if info.hasSdocs && info.hasIndex { if info.hasMeta { removeFile(info.base + consts.MetaFileSuffix) } if info.hasDocs { removeFile(info.base + consts.DocsFileSuffix) } - sealed = append(sealed, l.loadSealedFrac(diskFracCache, info)) - } else if !info.hasRemote { + sealed = append(sealed, l.loadSealedFrac(diskFracCache, info)) + } else { if info.hasMeta { actives = append(actives, l.fracProvider.NewActive(info.base)) } else { sealed = append(sealed, l.loadSealedFrac(diskFracCache, info)) } }Also applies to: 84-86
fracmanager/fracmanager.go (1)
246-304: Fix goroutine loop-variable capture; each goroutine must process its own outsiderClassic capture bug: the closure captures outsider by reference. All goroutines may act on the last element.
- for _, outsider := range outsiders { - cleanupWg.Add(1) - go func() { + for _, outsider := range outsiders { + o := outsider + cleanupWg.Add(1) + go func(o frac.Fraction) { defer cleanupWg.Done() - info := outsider.Info() + info := o.Info() if !fm.config.OffloadingEnabled { fm.fracCache.RemoveFraction(info.Name()) - outsider.Suicide() + o.Suicide() return } offloadStart := time.Now() - mustBeOffloaded, err := outsider.Offload(fm.ctx, s3.NewUploader(fm.s3cli)) + mustBeOffloaded, err := o.Offload(fm.ctx, s3.NewUploader(fm.s3cli)) if err != nil { // While searching for outsiders we removed this fraction from list of local fractions. // Now we need to return it back and try again to offload it later. fm.fracMu.Lock() - fm.localFracs = append(fm.localFracs, &fracRef{outsider}) + fm.localFracs = append(fm.localFracs, &fracRef{o}) fm.fracMu.Unlock() metric.OffloadingTotal.WithLabelValues("failure").Inc() metric.OffloadingDurationSeconds.Observe(float64(time.Since(offloadStart).Seconds())) logger.Error( "will skip fraction suicide: failed to offload fraction", zap.String("fraction", info.Name()), zap.Error(err), ) return } if !mustBeOffloaded { fm.fracCache.RemoveFraction(info.Name()) - outsider.Suicide() + o.Suicide() return } metric.OffloadingTotal.WithLabelValues("success").Inc() metric.OffloadingDurationSeconds.Observe(float64(time.Since(offloadStart).Seconds())) logger.Info( - "successully offloaded fraction", + "successfully offloaded fraction", zap.String("fraction", info.Name()), zap.String("took", time.Since(offloadStart).String()), ) // Update storage type for offloaded fraction. info.StorageType = storage.TypeRemote - remote := fm.fracProvider.NewRemote(fm.ctx, info.Path, info) + remote := fm.fracProvider.NewRemote(fm.ctx, info.Path, info) fm.fracMu.Lock() fm.remoteFracs = append(fm.remoteFracs, remote) fm.fracMu.Unlock() - outsider.Suicide() - }() + o.Suicide() + }(o) }
🧹 Nitpick comments (7)
frac/sealed.go (1)
210-242: Reduce blocking during Offload (optional)Holding useMu exclusively for the entire upload stalls readers. Consider RLock for uploads, then a short WLock just to drop the marker.
If acceptable, I can draft a refactor that:
- Acquires useMu.RLock() for open+upload
- Releases RLock, acquires useMu.Lock() only to create+sync the marker
- Preserves Suicide/DataProvider safety
fracmanager/fetcher_test.go (1)
63-64: Use t.Context() consistentlyUse the test’s context for cancellation and deadlines rather than context.TODO().
-docs, err = fetcher.FetchDocs(context.TODO(), fm.GetAllFracs(), ids) +docs, err = fetcher.FetchDocs(t.Context(), fm.GetAllFracs(), ids)fracmanager/sealer_test.go (1)
35-38: Typo: muliplier → multiplierMinor readability nit in a benchmark helper.
-const muliplier = 10 +const multiplier = 10 @@ - for i := 0; i < muliplier; i++ { + for i := 0; i < multiplier; i++ {frac/remote.go (1)
215-217: Handle Exists error (optional)Ignoring the error may mask permission/network issues; fail fast with context.
- if ok, _ := f.s3cli.Exists(f.ctx, pickedName); !ok { + if ok, err := f.s3cli.Exists(f.ctx, pickedName); err != nil { + logger.Info("failed to check docs existence", zap.String("key", pickedName), zap.Error(err)) + } else if !ok { pickedName = sortedName }fracmanager/fracmanager.go (2)
288-292: Typo in log message: “successully” → “successfully”Minor polish.
- logger.Info( - "successully offloaded fraction", + logger.Info( + "successfully offloaded fraction",
314-323: Doc comment nit: s/factions/fractionsTighten comments; small readability win.
-// While working with this list, it may become irrelevant (factions may, for example, be deleted). +// While working with this list, it may become irrelevant (fractions may, for example, be deleted).cmd/seq-db/seq-db.go (1)
322-344: Harden initS3Client: pre-validate config and emit a success log (keep credentials out of logs).Early validation yields clearer failures than a generic “cannot create S3 client”, and a one-time info log helps operability.
Apply this diff:
func initS3Client(cfg config.Config) *s3.Client { if !cfg.Offloading.Enabled { return nil } + if cfg.Offloading.Bucket == "" { + logger.Fatal("offloading is enabled but S3 bucket is empty") + } + if cfg.Offloading.Region == "" { + logger.Fatal("offloading is enabled but S3 region is empty") + } + cli, err := s3.NewClient( cfg.Offloading.Endpoint, cfg.Offloading.AccessKey, cfg.Offloading.SecretKey, cfg.Offloading.Region, cfg.Offloading.Bucket, ) if err != nil { logger.Fatal( "cannot create S3 client", zap.Error(err), ) } + logger.Info( + "S3 offloading enabled", + zap.String("bucket", cfg.Offloading.Bucket), + zap.String("region", cfg.Offloading.Region), + zap.String("endpoint", cfg.Offloading.Endpoint), + ) + return cli }Optional: consider returning (*s3.Client, error) and doing the logger.Fatal at the call site to improve testability and keep termination decisions at the top level.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (13)
cmd/seq-db/seq-db.go(4 hunks)frac/remote.go(1 hunks)frac/sealed.go(8 hunks)frac/sealed_loader.go(1 hunks)fracmanager/fetcher_test.go(2 hunks)fracmanager/fracmanager.go(13 hunks)fracmanager/fracmanager_test.go(7 hunks)fracmanager/fraction_provider.go(3 hunks)fracmanager/loader.go(7 hunks)fracmanager/sealed_frac_cache_test.go(7 hunks)fracmanager/sealer_test.go(1 hunks)storeapi/grpc_search.go(1 hunks)storeapi/grpc_v1.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- storeapi/grpc_search.go
- storeapi/grpc_v1.go
- frac/sealed_loader.go
- fracmanager/sealed_frac_cache_test.go
- fracmanager/fracmanager_test.go
🧰 Additional context used
🧬 Code graph analysis (6)
cmd/seq-db/seq-db.go (4)
storeapi/store.go (1)
NewStore(49-69)fracmanager/config.go (1)
Config(14-32)storage/s3/client.go (1)
Client(16-19)logger/logger.go (2)
Fatal(82-84)Error(74-76)
fracmanager/fraction_provider.go (3)
storage/s3/client.go (1)
Client(16-19)storage/read_limiter.go (1)
ReadLimiter(9-12)frac/remote.go (2)
NewRemote(60-96)Remote(32-58)
frac/sealed.go (6)
frac/sealed/seqids/loader.go (3)
Table(13-18)Table(20-22)Table(24-26)frac/sealed/lids/table.go (1)
Table(11-19)storage/io.go (3)
TypeLocal(12-12)Uploader(27-29)ImmutableFile(16-25)consts/consts.go (1)
RemoteFractionSuffix(64-64)util/fs.go (1)
MustSyncPath(15-28)storage/index_reader.go (1)
IndexReader(13-20)
frac/remote.go (12)
frac/fraction.go (2)
Fraction(19-26)DataProvider(14-17)storage/io.go (3)
ImmutableFile(16-25)TypeRemote(13-13)Uploader(27-29)cache/cache.go (1)
Cache(60-68)storage/docs_reader.go (2)
DocsReader(11-14)NewDocsReader(16-21)frac/index_cache.go (1)
IndexCache(10-18)storage/index_reader.go (2)
IndexReader(13-20)NewIndexReader(22-32)storage/s3/client.go (1)
Client(16-19)storage/read_limiter.go (1)
ReadLimiter(9-12)seq/seq.go (1)
MID(17-17)util/fs.go (1)
MustRemoveFileByPath(30-38)consts/consts.go (4)
RemoteFractionSuffix(64-64)DocsFileSuffix(53-53)SdocsFileSuffix(56-56)IndexFileSuffix(60-60)storage/s3/reader.go (1)
NewReader(34-36)
fracmanager/fracmanager.go (10)
config/config.go (1)
Config(48-252)fracmanager/config.go (1)
Config(14-32)frac/remote.go (2)
Remote(32-58)NewRemote(60-96)storage/s3/client.go (1)
Client(16-19)frac/fraction.go (1)
Fraction(19-26)frac/info.go (1)
Info(22-45)storage/s3/uploader.go (1)
NewUploader(24-26)metric/store.go (3)
OffloadingTotal(290-295)OffloadingDurationSeconds(296-302)DataSizeTotal(115-120)storage/io.go (1)
TypeRemote(13-13)fracmanager/list.go (1)
List(10-10)
fracmanager/loader.go (6)
fracmanager/config.go (1)
Config(14-32)frac/sealed.go (2)
Sealed(29-55)NewSealed(71-102)frac/remote.go (2)
Remote(32-58)NewRemote(60-96)fracmanager/sealed_frac_cache.go (1)
NewFracCacheFromDisk(45-49)consts/consts.go (1)
RemoteFractionSuffix(64-64)frac/info.go (1)
Info(22-45)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (6)
frac/sealed.go (1)
411-446: loadHeader extraction looks goodShared helper consolidates header I/O, consistent error handling, and sets IndexOnDisk. No issues spotted.
fracmanager/sealer_test.go (1)
105-110: Constructor wiring LGTMPassing nil for the new s3 client parameter in bench context is appropriate; Stop() is already deferred.
fracmanager/fraction_provider.go (1)
77-90: NewRemote wiring is correct and cohesivePropagates read limiter, caches, config, and the stored s3 client into frac.NewRemote as intended.
fracmanager/loader.go (1)
40-43: Constructor doesn’t validate offloading preconditions (leave check to caller)NewLoader no longer accepts an S3 client, so it cannot fail fast when OffloadingEnabled=true but no S3 client is configured. Ensure the caller (NewFracManager) validates this before constructing/using the loader.
I’ll add a concrete fail-fast check in NewFracManager below.
fracmanager/fracmanager.go (1)
581-586: Good: syncing frac-cache on Stop()Ensures on-disk cache reflects runtime updates (e.g., storage_type changes after offload).
cmd/seq-db/seq-db.go (1)
35-35: S3 storage import wiring looks correct.This cleanly scopes S3 usage to the store path.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
frac/sealed.go (1)
227-241: Fix Offload(): boolean semantics on error and ensure marker durability.Returning true with a non-nil error is misleading (this was noted earlier). Also, fsync the marker file and handle Close() errors to avoid losing the marker after a crash. This change keeps the directory fsync you already added.
- if err := g.Wait(); err != nil { - return true, err - } + if err := g.Wait(); err != nil { + return false, err + } remoteFracName := f.BaseFileName + consts.RemoteFractionSuffix - file, err := os.Create(remoteFracName) - if err != nil { - return true, err - } - defer file.Close() + file, err := os.Create(remoteFracName) + if err != nil { + return false, err + } + // Ensure the marker is durable before syncing the parent directory. + if err := file.Sync(); err != nil { + _ = file.Close() + return false, err + } + if err := file.Close(); err != nil { + return false, err + } util.MustSyncPath(filepath.Dir(remoteFracName)) - return true, nil + return true, nilOptional hardening (no diff): use os.OpenFile(..., os.O_CREATE|os.O_EXCL, 0o644) to make creation idempotent and avoid truncating an existing marker on repeated Offload calls.
fracmanager/fracmanager.go (2)
288-292: Fix log message typo.“successully” → “successfully”.
- logger.Info( - "successully offloaded fraction", + logger.Info( + "successfully offloaded fraction",
246-306: Fix goroutine loop-variable capture; also make offloading robust to nil S3 client.The closure captures
outsiderby reference; all goroutines can end up handling the last element. Additionally, ifOffloadingEnabledis true butfm.s3cliis nil,s3.NewUploader(fm.s3cli)risks a nil deref. Rebind and pass the value into the goroutine, and guard the uploader creation.Apply:
- for _, outsider := range outsiders { - cleanupWg.Add(1) - go func() { + for _, outsider := range outsiders { + o := outsider + cleanupWg.Add(1) + go func(o frac.Fraction) { defer cleanupWg.Done() - info := outsider.Info() + info := o.Info() if !fm.config.OffloadingEnabled { fm.fracCache.RemoveFraction(info.Name()) - outsider.Suicide() + o.Suicide() return } + // Offloading is enabled but client is not configured; re-queue and try later. + if fm.s3cli == nil { + fm.fracMu.Lock() + fm.localFracs = append(fm.localFracs, &fracRef{o}) + fm.fracMu.Unlock() + metric.OffloadingTotal.WithLabelValues("failure").Inc() + logger.Error("offloading enabled but S3 client is nil; re-queuing fraction", zap.String("fraction", info.Name())) + return + } + offloadStart := time.Now() - mustBeOffloaded, err := outsider.Offload(fm.ctx, s3.NewUploader(fm.s3cli)) + mustBeOffloaded, err := o.Offload(fm.ctx, s3.NewUploader(fm.s3cli)) if err != nil { // While searching for outsiders we removed this fraction from list of local fractions. // Now we need to return it back and try again to offload it later. fm.fracMu.Lock() - fm.localFracs = append(fm.localFracs, &fracRef{outsider}) + fm.localFracs = append(fm.localFracs, &fracRef{o}) fm.fracMu.Unlock() metric.OffloadingTotal.WithLabelValues("failure").Inc() metric.OffloadingDurationSeconds.Observe(float64(time.Since(offloadStart).Seconds())) logger.Error( "will skip fraction suicide: failed to offload fraction", zap.String("fraction", info.Name()), zap.Error(err), ) return } if !mustBeOffloaded { fm.fracCache.RemoveFraction(info.Name()) - outsider.Suicide() + o.Suicide() return } metric.OffloadingTotal.WithLabelValues("success").Inc() metric.OffloadingDurationSeconds.Observe(float64(time.Since(offloadStart).Seconds())) logger.Info( - "successully offloaded fraction", + "successfully offloaded fraction", zap.String("fraction", info.Name()), zap.String("took", time.Since(offloadStart).String()), ) - remote := fm.fracProvider.NewRemote(fm.ctx, info.Path, info) + remote := fm.fracProvider.NewRemote(fm.ctx, info.Path, info) fm.fracMu.Lock() // FIXME(dkharms): We had previously shifted fraction from local fracs list (in [fm.determineOutsiders] via [fm.shiftFirstFrac]) // and therefore excluded it from search queries. // But now we return that fraction back (well now it's a [frac.Remote] fraction but it still points to the same data) // so user can face incosistent search results. fm.remoteFracs = append(fm.remoteFracs, remote) fm.fracMu.Unlock() - outsider.Suicide() - }() + o.Suicide() + }(o) }
🧹 Nitpick comments (8)
frac/sealed.go (3)
212-226: Avoid holding the write lock across long remote uploads.The write lock blocks readers (DataProvider) for the entire upload duration. Consider:
- Grab the lock only to snapshot/open docs/index into local vars and check suicided, then unlock and upload using the local handles; or
- Downgrade to RLock if write access isn’t required during Upload.
This reduces latency impact under load while keeping safety.
Would you like me to draft a safe, lock-minimizing variant?
209-214: Document the boolean return value.Please state what the bool indicates (e.g., “markerCreated”) to avoid ambiguity for callers.
-// Offload saves `.docs` (or `.sdocs`) and `.index` files into remote storage. -// It does not free any of the occupied memory (nor on disk nor in memory). +// Offload saves `.docs` (or `.sdocs`) and `.index` files into remote storage. +// It does not free any occupied resources (neither on disk nor in memory). +// Returns: +// - bool: markerCreated — true on success, false on any error.
99-100: Use pointer IndexReader to avoid copying and align call sites.Minor consistency/perf nit: loadHeader takes IndexReader by value while other paths pass pointers. Suggest switching to pointer to avoid copying and to align with Loader.Load usage.
- f.info = loadHeader(f.indexFile, f.indexReader) + f.info = loadHeader(f.indexFile, &f.indexReader)And update the helper:
-func loadHeader( - indexFile storage.ImmutableFile, - indexReader storage.IndexReader, -) *Info { +func loadHeader( + indexFile storage.ImmutableFile, + indexReader *storage.IndexReader, +) *Info {Also applies to: 410-445
fracmanager/fracmanager.go (5)
79-114: Warn early if offloading is enabled but S3 client is nil.This configuration will cause every outsider to be re-queued (per the guard above) and disk usage can grow unexpectedly. Emit a startup warning.
fracManager := &FracManager{ config: cfg, ctx: ctx, s3cli: s3cli, mature: atomic.Bool{}, cacheMaintainer: cacheMaintainer, fracProvider: newFractionProvider(&cfg.Fraction, s3cli, cacheMaintainer, config.ReaderWorkers, config.IndexWorkers), ulidEntropy: ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0), fracCache: NewSealedFracCache(filepath.Join(cfg.DataDir, consts.FracCacheFileSuffix)), } + if cfg.OffloadingEnabled && s3cli == nil { + logger.Warn("offloading is enabled but S3 client is nil; outsiders will be re-queued and disk may fill up") + }
121-128: Potential nil dereference if Start runs before Load.
fm.Active().Info()assumesfm.active.fracis non-nil. IfStart()is called beforeLoad(), this can panic. If call order isn’t guaranteed externally, add a guard.Option:
func (fm *FracManager) maintenance(sealWg, cleanupWg *sync.WaitGroup) { logger.Debug("maintenance started") n := time.Now() - if fm.Active().Info().DocsOnDisk > fm.config.FracSize { + if fm.Active() != nil && fm.Active().Info().DocsOnDisk > fm.config.FracSize { active := fm.rotate()Do we guarantee Start is always after Load across the codebase?
575-589: Make Stop idempotent-safe.
fm.stopFn()andfm.cacheWG.Wait()can panic ifStart()wasn’t called. Guard them.func (fm *FracManager) Stop() { fm.fracProvider.Stop() - fm.stopFn() + if fm.stopFn != nil { + fm.stopFn() + } fm.statWG.Wait() fm.mntcWG.Wait() - fm.cacheWG.Wait() + if fm.cacheWG != nil { + fm.cacheWG.Wait() + } if err := fm.fracCache.SyncWithDisk(); err != nil { logger.Error( "failed to sync frac-cache on disk", zap.Error(err), ) }
351-393: Metrics: avoid transient nil Info() panics.If any fraction returns a transient nil from
Info()(e.g., remote opened without metadata yet), the stats loop would panic. Consider a defensive check.- for _, f := range fracs { - info := f.Info() + for _, f := range fracs { + info := f.Info() + if info == nil { + continue + }
240-306: Tests needed for offloading paths and retention.Given the Codecov report, add focused tests for:
- Offloading success path: moves local → remote, updates caches, calls
Suicide()on the old fraction.- Offloading failure path: re-queues local and records metrics.
- Offloading enabled with nil S3 client: re-queues (no panic).
- Retention: stale
frac.Remoteare removed, and S3 deletion is invoked.I can draft table-driven tests with a fake
storage.Uploaderand a fake S3 client to simulate success/failure and assert side effects. Want me to push a spec?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
frac/info.go(0 hunks)frac/remote.go(1 hunks)frac/sealed.go(7 hunks)fracmanager/fracmanager.go(13 hunks)storage/io.go(0 hunks)
💤 Files with no reviewable changes (2)
- storage/io.go
- frac/info.go
🚧 Files skipped from review as they are similar to previous changes (1)
- frac/remote.go
🧰 Additional context used
🧬 Code graph analysis (2)
fracmanager/fracmanager.go (8)
config/config.go (1)
Config(48-252)fracmanager/config.go (1)
Config(14-32)frac/remote.go (2)
Remote(33-59)NewRemote(61-102)storage/s3/client.go (1)
Client(16-19)frac/fraction.go (1)
Fraction(19-26)storage/s3/uploader.go (1)
NewUploader(24-26)metric/store.go (3)
OffloadingTotal(290-295)OffloadingDurationSeconds(296-302)DataSizeTotal(115-120)fracmanager/list.go (1)
List(10-10)
frac/sealed.go (8)
frac/sealed/seqids/loader.go (4)
Table(13-18)Table(20-22)Table(24-26)Loader(28-35)frac/sealed/lids/table.go (1)
Table(11-19)storage/io.go (2)
Uploader(20-22)ImmutableFile(9-18)consts/consts.go (1)
RemoteFractionSuffix(64-64)util/fs.go (1)
MustSyncPath(15-28)storage/index_reader.go (1)
IndexReader(13-20)frac/info.go (1)
Info(21-40)frac/block_info.go (1)
BlockInfo(14-16)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (1)
frac/sealed.go (1)
49-61: State encapsulation LGTM.Consolidating ids/lids/blocksOffsets under sealedState simplifies loading paths and improves invariants. The wiring in NewSealedPreloaded and createDataProvider looks consistent.
Also applies to: 152-157, 380-387
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (3)
config.example.yaml (3)
10-12: Document the size constraint between total_size and frac_size.Avoid user misconfiguration. Add a short comment that total_size must be a multiple of frac_size (it is here: 1GiB/16MiB = 64).
storage: - frac_size: 16MiB - total_size: 1GiB + # tip: 'total_size' should be a multiple of 'frac_size' (example: 1GiB / 16MiB = 64 fractions) + frac_size: 16MiB + total_size: 1GiB
13-17: Add a note on creating the bucket used below.The MinIO run snippet is great; users still need to create the “remote-storage” bucket.
# docker run -p 9000:9000 -p 9001:9001 \ # quay.io/minio/minio server /data --console-address ":9001" +# +# Then create the bucket (via Console http://localhost:9001 or with mc): +# docker run --rm --network host minio/mc \ +# mc alias set local http://localhost:9000 minioadmin minioadmin && \ +# mc mb local/remote-storage
18-24: Endpoint trailing slash, portability hint, and secrets guidance.
- Prefer endpoint without trailing slash to avoid accidental “//” joins in clients.
- Add a commented region hint for AWS portability.
- Nudge users to avoid committing real keys in configs.
offloading: enabled: true retention: 5m - endpoint: http://localhost:9000/ + endpoint: http://localhost:9000 bucket: remote-storage access_key: minioadmin secret_key: minioadmin + # region: us-east-1 # uncomment when using AWS S3 + # NOTE: for production, prefer environment-based secrets or a secret manager.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
config.example.yaml(1 hunks)config/config.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- config/config.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
🔇 Additional comments (1)
config.example.yaml (1)
18-24: Offloading config parsing and S3 client behavior need implementation and verificationI wasn’t able to find any code that wires up the
offloadingblock from config.example.yaml into anOffloadingstruct, nor any parsing ofretention, endpoint normalization, or S3 path-style setup. Please verify (or implement if missing) in your config loader and client initialization:• Define an
Offloadingstruct (e.g. inconfig/config.go) with aRetentionfield parsed viautil.ParseDuration(supports “5m”, “1h”, etc.).
• Normalize theendpointvalue by trimming trailing slashes before use.
• Configure your S3/MinIO client withS3ForcePathStyle(orWithPathStyle) when targeting non-AWS endpoints.
• Add unit tests covering duration parsing, endpoint trimming, and path-style behavior.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
frac/remote.go (2)
163-171: S3 key collisions: stop trimming to basename; use full (normalized) keyBuilding object keys with filepath.Base risks overwriting fractions that share filenames across dirs/shards. This was raised earlier and still applies.
files := []string{ - filepath.Base(f.BaseFileName) + consts.DocsFileSuffix, - filepath.Base(f.BaseFileName) + consts.SdocsFileSuffix, - filepath.Base(f.BaseFileName) + consts.IndexFileSuffix, + f.BaseFileName + consts.DocsFileSuffix, + f.BaseFileName + consts.SdocsFileSuffix, + f.BaseFileName + consts.IndexFileSuffix, } - name := filepath.Base(f.BaseFileName) + consts.IndexFileSuffix + name := f.BaseFileName + consts.IndexFileSuffix - sortedName := filepath.Base(f.BaseFileName) + consts.SdocsFileSuffix - unsortedName := filepath.Base(f.BaseFileName) + consts.DocsFileSuffix + sortedName := f.BaseFileName + consts.SdocsFileSuffix + unsortedName := f.BaseFileName + consts.DocsFileSuffixOptional hardening (OS-agnostic keys): centralize key construction and normalize separators to forward slashes.
// add near other helpers func (f *Remote) s3Key(suffix string) string { return filepath.ToSlash(f.BaseFileName + suffix) }Then use f.s3Key(consts.IndexFileSuffix) etc. across openDocs/openIndex/Suicide.
Also applies to: 230-230, 254-256
149-151: Return an explicit error instead of panicking in OffloadPanics here can take the whole process down if Offload is called polymorphically on a Fraction. Return a typed error.
func (f *Remote) Offload(context.Context, storage.Uploader) (bool, error) { - panic("BUG: remote fraction cannot be offloaded") + return false, fmt.Errorf("remote fraction cannot be offloaded") }
🧹 Nitpick comments (2)
storage/s3/uploader.go (1)
6-6: Avoid basename-only S3 keys; normalize path separators for OS-agnostic behaviorUsing filepath.Base makes key derivation OS-dependent, and stripping directories entirely risks cross-shard collisions if identical filenames exist in different dirs. Normalize to forward slashes first, then take the base, or better, pass a full, unique key. Minimal tweak below keeps current shape while being OS-agnostic; follow-up in frac/remote.go recommends eliminating basename-only keys entirely.
import ( "context" "fmt" - "path/filepath" + "path" + "path/filepath" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/ozontech/seq-db/storage" ) func (u *uploader) Upload(ctx context.Context, r storage.ImmutableFile) error { _, err := u.manager.Upload(ctx, &s3.PutObjectInput{ Bucket: aws.String(u.c.bucket), - Key: aws.String(filepath.Base(r.Name())), + // Normalize then take base to avoid OS-specific separators in keys + Key: aws.String(path.Base(filepath.ToSlash(r.Name()))), Body: r, })Follow-up: confirm your bucket layout guarantees uniqueness even after basename stripping; otherwise switch to explicit keys (prefix + relative path) to prevent overwrites.
Also applies to: 31-33
frac/remote.go (1)
233-237: Improve error messages with full object key for easier debuggingCurrent messages only mention the suffix, which is ambiguous in multi-fraction buckets.
- return fmt.Errorf( - "cannot check existence of %q file: %w", - consts.IndexFileSuffix, err, - ) + return fmt.Errorf("cannot check existence of %q: %w", name, err) - return fmt.Errorf( - "cannot check existence of %q file: %w", - consts.DocsFileSuffix, err, - ) + return fmt.Errorf("cannot check existence of %q: %w", unsortedName, err) - return fmt.Errorf( - "cannot check existence of %q file: %w", - consts.SdocsFileSuffix, err, - ) + return fmt.Errorf("cannot check existence of %q: %w", sortedName, err)Also applies to: 259-262, 273-276
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
frac/remote.go(1 hunks)storage/s3/uploader.go(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
frac/remote.go (12)
frac/fraction.go (2)
Fraction(19-26)DataProvider(14-17)frac/info.go (1)
Info(21-40)storage/io.go (2)
ImmutableFile(9-18)Uploader(20-22)cache/cache.go (1)
Cache(60-68)storage/docs_reader.go (2)
DocsReader(11-14)NewDocsReader(16-21)frac/index_cache.go (1)
IndexCache(10-18)storage/index_reader.go (2)
IndexReader(13-20)NewIndexReader(22-32)storage/s3/client.go (1)
Client(16-19)storage/read_limiter.go (1)
ReadLimiter(9-12)util/fs.go (1)
MustRemoveFileByPath(30-38)consts/consts.go (4)
RemoteFractionSuffix(64-64)DocsFileSuffix(53-53)SdocsFileSuffix(56-56)IndexFileSuffix(60-60)storage/s3/reader.go (1)
NewReader(34-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
|
@coderabbitai pause |
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (6)
frac/remote.go (6)
91-93: Fast-path must set Info.StorageType=remoteReturning without correcting the storage type misclassifies the fraction downstream.
- if info != nil && info.IndexOnDisk > 0 { - return f - } + if info != nil && info.IndexOnDisk > 0 { + // Ensure downstream sees this fraction as remote + f.info.StorageType = storage.TypeRemote + return f + }
95-109: Don’t call loadHeader after openIndex failure; remove FIXME and degrade safelyCurrently you log the failure but still invoke loadHeader, which can panic with nil readers. Also, the lingering FIXME contradicts the PR checklist.
- // FIXME(dkharms): For now almost any availability issues with S3 will cause seq-db to panic during initialisation phase. - // I wrote a small proposal on how we can reduce impact of such events. - // https://github.com/ozontech/seq-db/issues/92 - - if err := f.openIndex(); err != nil { - logger.Error( - "cannot open index file: any subsequent operation will fail", - zap.String("fraction", filepath.Base(f.BaseFileName)), - zap.Error(err), - ) - } - - f.info = loadHeader(f.indexFile, f.indexReader) - return f + if err := f.openIndex(); err != nil { + logger.Warn( + "remote: cannot open index during init; will lazily retry", + zap.String("fraction", filepath.Base(f.BaseFileName)), + zap.Error(err), + ) + // Keep a non-nil Info with correct storage type to avoid nil derefs in logs/metrics. + if f.info == nil { + f.info = &Info{StorageType: storage.TypeRemote} + } else { + f.info.StorageType = storage.TypeRemote + } + return f + } + f.info = loadHeader(f.indexFile, f.indexReader) + return f
156-158: Replace panic in Offload with an explicit errorPanicking can take the whole process down. Return a clear error instead.
-func (f *Remote) Offload(context.Context, storage.Uploader) (bool, error) { - panic("BUG: remote fraction cannot be offloaded") -} +func (f *Remote) Offload(context.Context, storage.Uploader) (bool, error) { + return false, fmt.Errorf("remote fraction cannot be offloaded") +}
170-174: Use full key, not basename, to avoid S3 key collisionsBasename-only keys will collide across shards/stores. Use the full relative key (or a configured prefix) consistently.
files := []string{ - filepath.Base(f.BaseFileName) + consts.DocsFileSuffix, - filepath.Base(f.BaseFileName) + consts.SdocsFileSuffix, - filepath.Base(f.BaseFileName) + consts.IndexFileSuffix, + f.BaseFileName + consts.DocsFileSuffix, + f.BaseFileName + consts.SdocsFileSuffix, + f.BaseFileName + consts.IndexFileSuffix, }Optional: consider normalizing to POSIX-style separators when mapping filesystem paths to S3 keys to avoid backslash keys on Windows.
237-239: Avoid basename when forming index keyPrevent collisions by using the full key.
- name := filepath.Base(f.BaseFileName) + consts.IndexFileSuffix + name := f.BaseFileName + consts.IndexFileSuffix
261-263: Avoid basename when forming docs keysSame collision risk as above.
- sortedName := filepath.Base(f.BaseFileName) + consts.SdocsFileSuffix - unsortedName := filepath.Base(f.BaseFileName) + consts.DocsFileSuffix + sortedName := f.BaseFileName + consts.SdocsFileSuffix + unsortedName := f.BaseFileName + consts.DocsFileSuffix
🧹 Nitpick comments (2)
frac/remote.go (2)
167-169: Close remote files on Suicide to free resourcesRelease caches and also close underlying readers.
f.docsCache.Release() f.indexCache.Release() + if f.docsFile != nil { + _ = f.docsFile.Close() + f.docsFile = nil + } + if f.indexFile != nil { + _ = f.indexFile.Close() + f.indexFile = nil + }
210-230: Add coverage for lazy-load and failure pathsCodecov shows remote.go under-covered. Please add tests for:
- NewRemote: fast-path with injected info, degraded init when openIndex fails.
- DataProvider: suicided path returns EmptyDataProvider and unlocks.
- openDocs: unsorted preferred over sorted, both missing returns error.
- Offload: returns explicit error, not panic (after fix).
I can draft table-driven tests with a fake s3.Client to simulate Exists/Remove behaviors and S3 flakiness. Want me to open a follow-up PR with those?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
frac/remote.go(1 hunks)storage/s3/uploader.go(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- storage/s3/uploader.go
🧰 Additional context used
🧬 Code graph analysis (1)
frac/remote.go (11)
frac/fraction.go (2)
Fraction(19-26)DataProvider(14-17)storage/io.go (2)
ImmutableFile(9-18)Uploader(20-22)cache/cache.go (1)
Cache(60-68)storage/docs_reader.go (2)
DocsReader(11-14)NewDocsReader(16-21)frac/index_cache.go (1)
IndexCache(10-18)storage/index_reader.go (2)
IndexReader(13-20)NewIndexReader(22-32)storage/s3/client.go (1)
Client(16-19)storage/read_limiter.go (1)
ReadLimiter(9-12)util/fs.go (1)
MustRemoveFileByPath(30-38)consts/consts.go (4)
RemoteFractionSuffix(64-64)DocsFileSuffix(53-53)SdocsFileSuffix(56-56)IndexFileSuffix(60-60)storage/s3/reader.go (1)
NewReader(34-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test
✅ Actions performedReviews paused. |
This commit introduces support for fraction offloading and ability to search, build aggregations from remote (offloaded) fractions.
Things to polish:
FIXMEcomments;Summary by CodeRabbit
New Features
Configuration
Observability
Other