diff --git a/internal/servers/destination/v0/destinations.go b/internal/servers/destination/v0/destinations.go index 4266b6ba55..44198f5d10 100644 --- a/internal/servers/destination/v0/destinations.go +++ b/internal/servers/destination/v0/destinations.go @@ -141,6 +141,7 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error { select { case resources <- convertedResource: case <-ctx.Done(): + convertedResource.Release() close(resources) if err := eg.Wait(); err != nil { return status.Errorf(codes.Internal, "Context done: %v and failed to wait for plugin: %v", ctx.Err(), err) diff --git a/plugins/destination/managed_writer.go b/plugins/destination/managed_writer.go index c08ecd25c3..3abd35802b 100644 --- a/plugins/destination/managed_writer.go +++ b/plugins/destination/managed_writer.go @@ -70,6 +70,9 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *arrow.Schem p.logger.Info().Str("table", tableName).Int("len", batchSize).Dur("duration", time.Since(start)).Msg("batch written successfully") atomic.AddUint64(&metrics.Writes, uint64(batchSize)) } + for _, r := range resources { + r.Release() + } } func (*Plugin) removeDuplicatesByPK(table *arrow.Schema, resources []arrow.Record) []arrow.Record { @@ -81,18 +84,19 @@ func (*Plugin) removeDuplicatesByPK(table *arrow.Schema, resources []arrow.Recor pks := make(map[string]struct{}, len(resources)) res := make([]arrow.Record, 0, len(resources)) - var reported bool for _, r := range resources { + if r.NumRows() > 1 { + panic(fmt.Sprintf("record with more than 1 row: %d", r.NumRows())) + } key := pk.String(r) _, ok := pks[key] - switch { - case !ok: + if !ok { pks[key] = struct{}{} res = append(res, r) continue - case reported: - continue } + // duplicate, release early + r.Release() } return res