From a4d19d45db35c422c17112fbde8fe2545c70753c Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 18 Apr 2023 16:17:13 +0100 Subject: [PATCH 1/6] Fix memory leak in writeAll --- plugins/destination/plugin.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/destination/plugin.go b/plugins/destination/plugin.go index d776db1a5d..a94659f320 100644 --- a/plugins/destination/plugin.go +++ b/plugins/destination/plugin.go @@ -220,6 +220,7 @@ func (p *Plugin) writeOne(ctx context.Context, sourceSpec specs.Source, syncTime func (p *Plugin) writeAll(ctx context.Context, sourceSpec specs.Source, syncTime time.Time, resources []arrow.Record) error { ch := make(chan arrow.Record, len(resources)) for _, resource := range resources { + resource.Retain() ch <- resource } close(ch) From bfb6f638c5c90564fa7bfebb3ffa0ae037ba1d87 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 18 Apr 2023 16:57:02 +0100 Subject: [PATCH 2/6] Retain and release fixes --- internal/servers/destination/v0/destinations.go | 2 ++ plugins/destination/managed_writer.go | 12 +++++++++--- plugins/destination/plugin_testing_migrate.go | 2 -- plugins/destination/plugin_testing_overwrite.go | 4 ---- .../plugin_testing_overwrite_delete_stale.go | 4 ---- plugins/destination/plugin_testing_write_append.go | 2 -- 6 files changed, 11 insertions(+), 15 deletions(-) diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index 4266b6ba55..fe7f184a89 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -138,9 +138,11 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error { origResource.Data = append([]schema.CQType{sourceColumn, syncTimeColumn}, origResource.Data...) } convertedResource := schema.CQTypesToRecord(memory.DefaultAllocator, []schema.CQTypes{origResource.Data}, schema.CQSchemaToArrow(tables.Get(origResource.TableName))) + convertedResource.Retain() select { case resources <- convertedResource: case <-ctx.Done(): + convertedResource.Release() close(resources) if err := eg.Wait(); err != nil { return status.Errorf(codes.Internal, "Context done: %v and failed to wait for plugin: %v", ctx.Err(), err) diff --git a/plugins/destination/managed_writer.go b/plugins/destination/managed_writer.go index c08ecd25c3..55b9483980 100644 --- a/plugins/destination/managed_writer.go +++ b/plugins/destination/managed_writer.go @@ -70,6 +70,9 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *arrow.Schem p.logger.Info().Str("table", tableName).Int("len", batchSize).Dur("duration", time.Since(start)).Msg("batch written successfully") atomic.AddUint64(&metrics.Writes, uint64(batchSize)) } + for _, r := range resources { + r.Release() + } } func (*Plugin) removeDuplicatesByPK(table *arrow.Schema, resources []arrow.Record) []arrow.Record { @@ -81,8 +84,10 @@ func (*Plugin) removeDuplicatesByPK(table *arrow.Schema, resources []arrow.Recor pks := make(map[string]struct{}, len(resources)) res := make([]arrow.Record, 0, len(resources)) - var reported bool for _, r := range resources { + if r.NumRows() > 1 { + panic(fmt.Sprintf("record with more than 1 row: %d", r.NumRows())) + } key := pk.String(r) _, ok := pks[key] switch { @@ -90,9 +95,9 @@ func (*Plugin) removeDuplicatesByPK(table *arrow.Schema, resources []arrow.Recor pks[key] = struct{}{} res = append(res, r) continue - case reported: - continue } + // duplicate, release early + r.Release() } return res @@ -138,6 +143,7 @@ func (p *Plugin) writeManagedTableBatch(ctx context.Context, _ specs.Source, tab if _, ok := workers[tableName]; !ok { return fmt.Errorf("table %s not found in destination", tableName) } + r.Retain() workers[tableName].ch <- r } diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index 5cb42a7016..c7ed0b6587 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -42,7 +42,6 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P MaxRows: 1, } resource1 := testdata.GenTestData(mem, source, opts)[0] - resource1.Retain() defer resource1.Release() if err := p.writeOne(ctx, sourceSpec, syncTime, resource1); err != nil { return fmt.Errorf("failed to write one: %w", err) @@ -52,7 +51,6 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P return fmt.Errorf("failed to migrate existing table: %w", err) } resource2 := testdata.GenTestData(mem, target, opts)[0] - resource2.Retain() defer resource2.Release() if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil { return fmt.Errorf("failed to write one after migration: %w", err) diff --git a/plugins/destination/plugin_testing_overwrite.go b/plugins/destination/plugin_testing_overwrite.go index 8c58ae554c..08adcbd5fc 100644 --- a/plugins/destination/plugin_testing_overwrite.go +++ b/plugins/destination/plugin_testing_overwrite.go @@ -42,9 +42,6 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, MaxRows: 2, } resources := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts) - for _, r := range resources { - r.Retain() - } defer func() { for _, r := range resources { r.Release() @@ -86,7 +83,6 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, StableUUID: *u, } updatedResource := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts)[0] - updatedResource.Retain() defer updatedResource.Release() // write second time if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil { diff --git a/plugins/destination/plugin_testing_overwrite_delete_stale.go b/plugins/destination/plugin_testing_overwrite_delete_stale.go index 39ae0e99f2..752d85a618 100644 --- a/plugins/destination/plugin_testing_overwrite_delete_stale.go +++ b/plugins/destination/plugin_testing_overwrite_delete_stale.go @@ -48,9 +48,6 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte incResources := testdata.GenTestData(mem, incTable.ToArrowSchema(), opts) allResources := resources allResources = append(allResources, incResources...) - for _, r := range allResources { - r.Retain() - } defer func() { for _, r := range allResources { r.Release() @@ -99,7 +96,6 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte MaxRows: 1, } updatedResources := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0] - updatedResources.Retain() defer updatedResources.Release() if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil { diff --git a/plugins/destination/plugin_testing_write_append.go b/plugins/destination/plugin_testing_write_append.go index 746bf34d26..368a764817 100644 --- a/plugins/destination/plugin_testing_write_append.go +++ b/plugins/destination/plugin_testing_write_append.go @@ -40,7 +40,6 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, MaxRows: 1, } record1 := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0] - record1.Retain() defer record1.Release() if err := p.writeOne(ctx, specSource, syncTime, record1); err != nil { return fmt.Errorf("failed to write one second time: %w", err) @@ -49,7 +48,6 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, secondSyncTime := syncTime.Add(10 * time.Second).UTC() opts.SyncTime = secondSyncTime record2 := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0] - record2.Retain() defer record2.Release() if !s.tests.SkipSecondAppend { From c0f42f4126cf6e89fab5c609e9b9b63b6571134b Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 18 Apr 2023 17:02:21 +0100 Subject: [PATCH 3/6] Lint --- plugins/destination/managed_writer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/destination/managed_writer.go b/plugins/destination/managed_writer.go index 55b9483980..6c16338b11 100644 --- a/plugins/destination/managed_writer.go +++ b/plugins/destination/managed_writer.go @@ -90,8 +90,7 @@ func (*Plugin) removeDuplicatesByPK(table *arrow.Schema, resources []arrow.Recor } key := pk.String(r) _, ok := pks[key] - switch { - case !ok: + if !ok { pks[key] = struct{}{} res = append(res, r) continue From 6d1595b227dcdb1b024906031b2d2b8cf70eabd1 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 18 Apr 2023 17:34:29 +0100 Subject: [PATCH 4/6] Remove extra retains --- internal/servers/destination/v0/destinations.go | 1 - plugins/destination/managed_writer.go | 1 - 2 files changed, 2 deletions(-) diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index fe7f184a89..44198f5d10 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -138,7 +138,6 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error { origResource.Data = append([]schema.CQType{sourceColumn, syncTimeColumn}, origResource.Data...) } convertedResource := schema.CQTypesToRecord(memory.DefaultAllocator, []schema.CQTypes{origResource.Data}, schema.CQSchemaToArrow(tables.Get(origResource.TableName))) - convertedResource.Retain() select { case resources <- convertedResource: case <-ctx.Done(): diff --git a/plugins/destination/managed_writer.go b/plugins/destination/managed_writer.go index 6c16338b11..3abd35802b 100644 --- a/plugins/destination/managed_writer.go +++ b/plugins/destination/managed_writer.go @@ -142,7 +142,6 @@ func (p *Plugin) writeManagedTableBatch(ctx context.Context, _ specs.Source, tab if _, ok := workers[tableName]; !ok { return fmt.Errorf("table %s not found in destination", tableName) } - r.Retain() workers[tableName].ch <- r } From c8ef24e2ada3ea12952d948334013bd777d2d47f Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Tue, 18 Apr 2023 18:47:45 +0100 Subject: [PATCH 5/6] Undo convertedResource.Release() for now --- internal/servers/destination/v0/destinations.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index 44198f5d10..4266b6ba55 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -141,7 +141,6 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error { select { case resources <- convertedResource: case <-ctx.Done(): - convertedResource.Release() close(resources) if err := eg.Wait(); err != nil { return status.Errorf(codes.Internal, "Context done: %v and failed to wait for plugin: %v", ctx.Err(), err) From b5ca844278f86b32f1c7e22fe2852880de93cfa7 Mon Sep 17 00:00:00 2001 From: Herman Schaaf Date: Wed, 19 Apr 2023 10:44:43 +0100 Subject: [PATCH 6/6] Updates --- internal/servers/destination/v0/destinations.go | 1 + plugins/destination/plugin.go | 1 - plugins/destination/plugin_testing_migrate.go | 2 ++ plugins/destination/plugin_testing_overwrite.go | 4 ++++ plugins/destination/plugin_testing_overwrite_delete_stale.go | 4 ++++ plugins/destination/plugin_testing_write_append.go | 2 ++ 6 files changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index 4266b6ba55..44198f5d10 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -141,6 +141,7 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error { select { case resources <- convertedResource: case <-ctx.Done(): + convertedResource.Release() close(resources) if err := eg.Wait(); err != nil { return status.Errorf(codes.Internal, "Context done: %v and failed to wait for plugin: %v", ctx.Err(), err) diff --git a/plugins/destination/plugin.go b/plugins/destination/plugin.go index a94659f320..d776db1a5d 100644 --- a/plugins/destination/plugin.go +++ b/plugins/destination/plugin.go @@ -220,7 +220,6 @@ func (p *Plugin) writeOne(ctx context.Context, sourceSpec specs.Source, syncTime func (p *Plugin) writeAll(ctx context.Context, sourceSpec specs.Source, syncTime time.Time, resources []arrow.Record) error { ch := make(chan arrow.Record, len(resources)) for _, resource := range resources { - resource.Retain() ch <- resource } close(ch) diff --git a/plugins/destination/plugin_testing_migrate.go b/plugins/destination/plugin_testing_migrate.go index c7ed0b6587..5cb42a7016 100644 --- a/plugins/destination/plugin_testing_migrate.go +++ b/plugins/destination/plugin_testing_migrate.go @@ -42,6 +42,7 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P MaxRows: 1, } resource1 := testdata.GenTestData(mem, source, opts)[0] + resource1.Retain() defer resource1.Release() if err := p.writeOne(ctx, sourceSpec, syncTime, resource1); err != nil { return fmt.Errorf("failed to write one: %w", err) @@ -51,6 +52,7 @@ func testMigration(ctx context.Context, mem memory.Allocator, _ *testing.T, p *P return fmt.Errorf("failed to migrate existing table: %w", err) } resource2 := testdata.GenTestData(mem, target, opts)[0] + resource2.Retain() defer resource2.Release() if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil { return fmt.Errorf("failed to write one after migration: %w", err) diff --git a/plugins/destination/plugin_testing_overwrite.go b/plugins/destination/plugin_testing_overwrite.go index 08adcbd5fc..8c58ae554c 100644 --- a/plugins/destination/plugin_testing_overwrite.go +++ b/plugins/destination/plugin_testing_overwrite.go @@ -42,6 +42,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, MaxRows: 2, } resources := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts) + for _, r := range resources { + r.Retain() + } defer func() { for _, r := range resources { r.Release() @@ -83,6 +86,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, StableUUID: *u, } updatedResource := testdata.GenTestData(mem, schema.CQSchemaToArrow(table), opts)[0] + updatedResource.Retain() defer updatedResource.Release() // write second time if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil { diff --git a/plugins/destination/plugin_testing_overwrite_delete_stale.go b/plugins/destination/plugin_testing_overwrite_delete_stale.go index 752d85a618..39ae0e99f2 100644 --- a/plugins/destination/plugin_testing_overwrite_delete_stale.go +++ b/plugins/destination/plugin_testing_overwrite_delete_stale.go @@ -48,6 +48,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte incResources := testdata.GenTestData(mem, incTable.ToArrowSchema(), opts) allResources := resources allResources = append(allResources, incResources...) + for _, r := range allResources { + r.Retain() + } defer func() { for _, r := range allResources { r.Release() @@ -96,6 +99,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte MaxRows: 1, } updatedResources := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0] + updatedResources.Retain() defer updatedResources.Release() if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil { diff --git a/plugins/destination/plugin_testing_write_append.go b/plugins/destination/plugin_testing_write_append.go index 368a764817..746bf34d26 100644 --- a/plugins/destination/plugin_testing_write_append.go +++ b/plugins/destination/plugin_testing_write_append.go @@ -40,6 +40,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, MaxRows: 1, } record1 := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0] + record1.Retain() defer record1.Release() if err := p.writeOne(ctx, specSource, syncTime, record1); err != nil { return fmt.Errorf("failed to write one second time: %w", err) @@ -48,6 +49,7 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, secondSyncTime := syncTime.Add(10 * time.Second).UTC() opts.SyncTime = secondSyncTime record2 := testdata.GenTestData(mem, table.ToArrowSchema(), opts)[0] + record2.Retain() defer record2.Release() if !s.tests.SkipSecondAppend {