Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ type Decoder struct {
pageIndex map[uint32]PageIndexElem
state string

chksum Checksum
hash hash.Hash64
pageN int // pages read
n int64 // bytes read
chksum Checksum
hash hash.Hash64
pageN int // pages read
n int64 // bytes read
lastPgno uint32 // last page number read (for snapshot validation)
}

// NewDecoder returns a new instance of Decoder.
Expand Down Expand Up @@ -102,8 +103,23 @@ func (dec *Decoder) Close() error {
return fmt.Errorf("unmarshal trailer: %w", err)
}

// TODO: Ensure last read page is equal to the commit for snapshot LTX files
// Ensure last read page is equal to the commit for snapshot LTX files
if dec.header.IsSnapshot() && dec.lastPgno != 0 {
// For snapshots, we expect all pages from 1 to commit (excluding lock page)
expectedLastPage := dec.header.Commit
lockPgno := LockPgno(dec.header.PageSize)

// If commit is past the lock page, the last page should be commit
// If commit is the lock page, the last page should be commit-1
// If commit is before the lock page, the last page should be commit
if dec.header.Commit == lockPgno {
expectedLastPage = dec.header.Commit - 1
}

if dec.lastPgno != expectedLastPage {
return fmt.Errorf("snapshot incomplete: expected last page %d, got %d", expectedLastPage, dec.lastPgno)
}
}
// Compare file checksum with checksum in trailer.
if chksum := ChecksumFlag | Checksum(dec.hash.Sum64()); chksum != dec.trailer.FileChecksum {
return ErrChecksumMismatch
Expand Down Expand Up @@ -218,6 +234,9 @@ func (dec *Decoder) DecodePage(hdr *PageHeader, data []byte) error {
dec.writeToHash(data)
dec.pageN++

// Track the last page number read
dec.lastPgno = hdr.Pgno

// Calculate checksum while decoding snapshots if tracking checksums.
if dec.header.IsSnapshot() && !dec.header.NoChecksum() {
if hdr.Pgno != LockPgno(dec.header.PageSize) {
Expand Down
193 changes: 193 additions & 0 deletions decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,196 @@ func TestDecoder_64KBPageSize(t *testing.T) {
}
})
}

// TestDecoder_SnapshotCompleteness validates that the decoder properly checks
// for snapshot completeness when closing. For snapshot LTX files (MinTXID=1),
// the decoder must verify that all pages from 1 to Commit have been read,
// excluding the lock page which is always zero and never written.
func TestDecoder_SnapshotCompleteness(t *testing.T) {
t.Run("CompleteSnapshot", func(t *testing.T) {
spec := &ltx.FileSpec{
Header: ltx.Header{
Version: ltx.Version,
Flags: ltx.HeaderFlagNoChecksum,
PageSize: 512,
Commit: 3,
MinTXID: 1,
MaxTXID: 1,
Timestamp: 1000,
},
Pages: []ltx.PageSpec{
{Header: ltx.PageHeader{Pgno: 1}, Data: bytes.Repeat([]byte("1"), 512)},
{Header: ltx.PageHeader{Pgno: 2}, Data: bytes.Repeat([]byte("2"), 512)},
{Header: ltx.PageHeader{Pgno: 3}, Data: bytes.Repeat([]byte("3"), 512)},
},
Trailer: ltx.Trailer{},
}

var buf bytes.Buffer
writeFileSpec(t, &buf, spec)
dec := ltx.NewDecoder(&buf)

if err := dec.DecodeHeader(); err != nil {
t.Fatal(err)
}

for i := 0; i < 3; i++ {
var hdr ltx.PageHeader
data := make([]byte, 512)
if err := dec.DecodePage(&hdr, data); err != nil {
t.Fatal(err)
}
}

var hdr ltx.PageHeader
data := make([]byte, 512)
if err := dec.DecodePage(&hdr, data); err != io.EOF {
t.Fatalf("expected EOF, got: %v", err)
}

if err := dec.Close(); err != nil {
t.Fatal(err)
}
})

t.Run("IncompleteSnapshot", func(t *testing.T) {
spec := &ltx.FileSpec{
Header: ltx.Header{
Version: ltx.Version,
Flags: ltx.HeaderFlagNoChecksum,
PageSize: 512,
Commit: 3,
MinTXID: 1,
MaxTXID: 1,
Timestamp: 1000,
},
Pages: []ltx.PageSpec{
{Header: ltx.PageHeader{Pgno: 1}, Data: bytes.Repeat([]byte("1"), 512)},
{Header: ltx.PageHeader{Pgno: 2}, Data: bytes.Repeat([]byte("2"), 512)},
},
Trailer: ltx.Trailer{},
}

var buf bytes.Buffer
writeFileSpec(t, &buf, spec)
dec := ltx.NewDecoder(&buf)

if err := dec.DecodeHeader(); err != nil {
t.Fatal(err)
}

for i := 0; i < 2; i++ {
var hdr ltx.PageHeader
data := make([]byte, 512)
if err := dec.DecodePage(&hdr, data); err != nil {
t.Fatal(err)
}
}

var hdr ltx.PageHeader
data := make([]byte, 512)
if err := dec.DecodePage(&hdr, data); err != io.EOF {
t.Fatalf("expected EOF, got: %v", err)
}

if err := dec.Close(); err == nil || err.Error() != "snapshot incomplete: expected last page 3, got 2" {
t.Fatalf("expected snapshot incomplete error, got: %v", err)
}
})

t.Run("SnapshotWithLockPage", func(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}

lockPgno := ltx.LockPgno(4096)

spec := &ltx.FileSpec{
Header: ltx.Header{
Version: ltx.Version,
Flags: ltx.HeaderFlagNoChecksum,
PageSize: 4096,
Commit: lockPgno,
MinTXID: 1,
MaxTXID: 1,
Timestamp: 1000,
},
Pages: []ltx.PageSpec{},
Trailer: ltx.Trailer{},
}

for pgno := uint32(1); pgno < lockPgno; pgno++ {
spec.Pages = append(spec.Pages, ltx.PageSpec{
Header: ltx.PageHeader{Pgno: pgno},
Data: bytes.Repeat([]byte{byte(pgno % 256)}, 4096),
})
}

var buf bytes.Buffer
writeFileSpec(t, &buf, spec)
dec := ltx.NewDecoder(&buf)

if err := dec.DecodeHeader(); err != nil {
t.Fatal(err)
}

for i := 0; i < len(spec.Pages); i++ {
var hdr ltx.PageHeader
data := make([]byte, 4096)
if err := dec.DecodePage(&hdr, data); err != nil {
t.Fatal(err)
}
}

var hdr ltx.PageHeader
data := make([]byte, 4096)
if err := dec.DecodePage(&hdr, data); err != io.EOF {
t.Fatalf("expected EOF, got: %v", err)
}

if err := dec.Close(); err != nil {
t.Fatal(err)
}
})

t.Run("NonSnapshot", func(t *testing.T) {
spec := &ltx.FileSpec{
Header: ltx.Header{
Version: ltx.Version,
Flags: 0,
PageSize: 512,
Commit: 3,
MinTXID: 2,
MaxTXID: 2,
Timestamp: 1000,
PreApplyChecksum: ltx.ChecksumFlag | 1,
},
Pages: []ltx.PageSpec{
{Header: ltx.PageHeader{Pgno: 2}, Data: bytes.Repeat([]byte("2"), 512)},
},
Trailer: ltx.Trailer{PostApplyChecksum: ltx.ChecksumFlag | 2},
}

var buf bytes.Buffer
writeFileSpec(t, &buf, spec)
dec := ltx.NewDecoder(&buf)

if err := dec.DecodeHeader(); err != nil {
t.Fatal(err)
}

var hdr ltx.PageHeader
data := make([]byte, 512)
if err := dec.DecodePage(&hdr, data); err != nil {
t.Fatal(err)
}

if err := dec.DecodePage(&hdr, data); err != io.EOF {
t.Fatalf("expected EOF, got: %v", err)
}

if err := dec.Close(); err != nil {
t.Fatal(err)
}
})
}