From 8c65988acdc94c90469f53a29333a80cbdd5e1c4 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 5 Aug 2025 16:54:24 -0500 Subject: [PATCH] Add snapshot completeness validation - Implement validation to ensure snapshots contain all expected pages - Track lastPgno during page decoding - Add comprehensive tests for snapshot completeness validation - Handle special case where commit equals lock page number - Resolves TODO: Ensure last read page is equal to the commit for snapshot LTX files --- decoder.go | 29 ++++++-- decoder_test.go | 193 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 217 insertions(+), 5 deletions(-) diff --git a/decoder.go b/decoder.go index 8419ba7..f1ca3b1 100644 --- a/decoder.go +++ b/decoder.go @@ -28,10 +28,11 @@ type Decoder struct { pageIndex map[uint32]PageIndexElem state string - chksum Checksum - hash hash.Hash64 - pageN int // pages read - n int64 // bytes read + chksum Checksum + hash hash.Hash64 + pageN int // pages read + n int64 // bytes read + lastPgno uint32 // last page number read (for snapshot validation) } // NewDecoder returns a new instance of Decoder. @@ -102,8 +103,23 @@ func (dec *Decoder) Close() error { return fmt.Errorf("unmarshal trailer: %w", err) } - // TODO: Ensure last read page is equal to the commit for snapshot LTX files + // Ensure last read page is equal to the commit for snapshot LTX files + if dec.header.IsSnapshot() && dec.lastPgno != 0 { + // For snapshots, we expect all pages from 1 to commit (excluding lock page) + expectedLastPage := dec.header.Commit + lockPgno := LockPgno(dec.header.PageSize) + // If commit is past the lock page, the last page should be commit + // If commit is the lock page, the last page should be commit-1 + // If commit is before the lock page, the last page should be commit + if dec.header.Commit == lockPgno { + expectedLastPage = dec.header.Commit - 1 + } + + if dec.lastPgno != expectedLastPage { + return fmt.Errorf("snapshot incomplete: expected last page %d, got %d", expectedLastPage, dec.lastPgno) + } + } // Compare file checksum with checksum in trailer. if chksum := ChecksumFlag | Checksum(dec.hash.Sum64()); chksum != dec.trailer.FileChecksum { return ErrChecksumMismatch @@ -218,6 +234,9 @@ func (dec *Decoder) DecodePage(hdr *PageHeader, data []byte) error { dec.writeToHash(data) dec.pageN++ + // Track the last page number read + dec.lastPgno = hdr.Pgno + // Calculate checksum while decoding snapshots if tracking checksums. if dec.header.IsSnapshot() && !dec.header.NoChecksum() { if hdr.Pgno != LockPgno(dec.header.PageSize) { diff --git a/decoder_test.go b/decoder_test.go index cc0c894..12a56c9 100644 --- a/decoder_test.go +++ b/decoder_test.go @@ -384,3 +384,196 @@ func TestDecoder_64KBPageSize(t *testing.T) { } }) } + +// TestDecoder_SnapshotCompleteness validates that the decoder properly checks +// for snapshot completeness when closing. For snapshot LTX files (MinTXID=1), +// the decoder must verify that all pages from 1 to Commit have been read, +// excluding the lock page which is always zero and never written. +func TestDecoder_SnapshotCompleteness(t *testing.T) { + t.Run("CompleteSnapshot", func(t *testing.T) { + spec := <x.FileSpec{ + Header: ltx.Header{ + Version: ltx.Version, + Flags: ltx.HeaderFlagNoChecksum, + PageSize: 512, + Commit: 3, + MinTXID: 1, + MaxTXID: 1, + Timestamp: 1000, + }, + Pages: []ltx.PageSpec{ + {Header: ltx.PageHeader{Pgno: 1}, Data: bytes.Repeat([]byte("1"), 512)}, + {Header: ltx.PageHeader{Pgno: 2}, Data: bytes.Repeat([]byte("2"), 512)}, + {Header: ltx.PageHeader{Pgno: 3}, Data: bytes.Repeat([]byte("3"), 512)}, + }, + Trailer: ltx.Trailer{}, + } + + var buf bytes.Buffer + writeFileSpec(t, &buf, spec) + dec := ltx.NewDecoder(&buf) + + if err := dec.DecodeHeader(); err != nil { + t.Fatal(err) + } + + for i := 0; i < 3; i++ { + var hdr ltx.PageHeader + data := make([]byte, 512) + if err := dec.DecodePage(&hdr, data); err != nil { + t.Fatal(err) + } + } + + var hdr ltx.PageHeader + data := make([]byte, 512) + if err := dec.DecodePage(&hdr, data); err != io.EOF { + t.Fatalf("expected EOF, got: %v", err) + } + + if err := dec.Close(); err != nil { + t.Fatal(err) + } + }) + + t.Run("IncompleteSnapshot", func(t *testing.T) { + spec := <x.FileSpec{ + Header: ltx.Header{ + Version: ltx.Version, + Flags: ltx.HeaderFlagNoChecksum, + PageSize: 512, + Commit: 3, + MinTXID: 1, + MaxTXID: 1, + Timestamp: 1000, + }, + Pages: []ltx.PageSpec{ + {Header: ltx.PageHeader{Pgno: 1}, Data: bytes.Repeat([]byte("1"), 512)}, + {Header: ltx.PageHeader{Pgno: 2}, Data: bytes.Repeat([]byte("2"), 512)}, + }, + Trailer: ltx.Trailer{}, + } + + var buf bytes.Buffer + writeFileSpec(t, &buf, spec) + dec := ltx.NewDecoder(&buf) + + if err := dec.DecodeHeader(); err != nil { + t.Fatal(err) + } + + for i := 0; i < 2; i++ { + var hdr ltx.PageHeader + data := make([]byte, 512) + if err := dec.DecodePage(&hdr, data); err != nil { + t.Fatal(err) + } + } + + var hdr ltx.PageHeader + data := make([]byte, 512) + if err := dec.DecodePage(&hdr, data); err != io.EOF { + t.Fatalf("expected EOF, got: %v", err) + } + + if err := dec.Close(); err == nil || err.Error() != "snapshot incomplete: expected last page 3, got 2" { + t.Fatalf("expected snapshot incomplete error, got: %v", err) + } + }) + + t.Run("SnapshotWithLockPage", func(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } + + lockPgno := ltx.LockPgno(4096) + + spec := <x.FileSpec{ + Header: ltx.Header{ + Version: ltx.Version, + Flags: ltx.HeaderFlagNoChecksum, + PageSize: 4096, + Commit: lockPgno, + MinTXID: 1, + MaxTXID: 1, + Timestamp: 1000, + }, + Pages: []ltx.PageSpec{}, + Trailer: ltx.Trailer{}, + } + + for pgno := uint32(1); pgno < lockPgno; pgno++ { + spec.Pages = append(spec.Pages, ltx.PageSpec{ + Header: ltx.PageHeader{Pgno: pgno}, + Data: bytes.Repeat([]byte{byte(pgno % 256)}, 4096), + }) + } + + var buf bytes.Buffer + writeFileSpec(t, &buf, spec) + dec := ltx.NewDecoder(&buf) + + if err := dec.DecodeHeader(); err != nil { + t.Fatal(err) + } + + for i := 0; i < len(spec.Pages); i++ { + var hdr ltx.PageHeader + data := make([]byte, 4096) + if err := dec.DecodePage(&hdr, data); err != nil { + t.Fatal(err) + } + } + + var hdr ltx.PageHeader + data := make([]byte, 4096) + if err := dec.DecodePage(&hdr, data); err != io.EOF { + t.Fatalf("expected EOF, got: %v", err) + } + + if err := dec.Close(); err != nil { + t.Fatal(err) + } + }) + + t.Run("NonSnapshot", func(t *testing.T) { + spec := <x.FileSpec{ + Header: ltx.Header{ + Version: ltx.Version, + Flags: 0, + PageSize: 512, + Commit: 3, + MinTXID: 2, + MaxTXID: 2, + Timestamp: 1000, + PreApplyChecksum: ltx.ChecksumFlag | 1, + }, + Pages: []ltx.PageSpec{ + {Header: ltx.PageHeader{Pgno: 2}, Data: bytes.Repeat([]byte("2"), 512)}, + }, + Trailer: ltx.Trailer{PostApplyChecksum: ltx.ChecksumFlag | 2}, + } + + var buf bytes.Buffer + writeFileSpec(t, &buf, spec) + dec := ltx.NewDecoder(&buf) + + if err := dec.DecodeHeader(); err != nil { + t.Fatal(err) + } + + var hdr ltx.PageHeader + data := make([]byte, 512) + if err := dec.DecodePage(&hdr, data); err != nil { + t.Fatal(err) + } + + if err := dec.DecodePage(&hdr, data); err != io.EOF { + t.Fatalf("expected EOF, got: %v", err) + } + + if err := dec.Close(); err != nil { + t.Fatal(err) + } + }) +}