diff --git a/sdks/go/pkg/beam/io/parquetio/parquetio.go b/sdks/go/pkg/beam/io/parquetio/parquetio.go index 5caf958a1c74..0d5a4fbb8316 100644 --- a/sdks/go/pkg/beam/io/parquetio/parquetio.go +++ b/sdks/go/pkg/beam/io/parquetio/parquetio.go @@ -24,15 +24,23 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/xitongsys/parquet-go-source/buffer" "github.com/xitongsys/parquet-go/reader" "github.com/xitongsys/parquet-go/writer" ) func init() { - beam.RegisterFunction(expandFn) + register.Function3x1(expandFn) + register.Emitter1[string]() + beam.RegisterType(reflect.TypeOf((*parquetReadFn)(nil)).Elem()) + register.DoFn3x1[context.Context, string, func(beam.X), error](&parquetReadFn{}) + register.Emitter1[beam.X]() + beam.RegisterType(reflect.TypeOf((*parquetWriteFn)(nil)).Elem()) + register.DoFn3x1[context.Context, int, func(*beam.X) bool, error](&parquetWriteFn{}) + register.Iter1[beam.X]() } // Read reads a set of files and returns lines as a PCollection diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go index 94016cd0e208..506e9e5f786e 100644 --- a/sdks/go/pkg/beam/io/textio/textio.go +++ b/sdks/go/pkg/beam/io/textio/textio.go @@ -42,6 +42,8 @@ func init() { register.Emitter2[string, string]() beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem()) + register.DoFn3x1[context.Context, int, func(*string) bool, error](&writeFileFn{}) + register.Iter1[string]() } type readOption struct {