From 7f6468776ed9d89c1f4bdaf7f3701c2b5178f6ee Mon Sep 17 00:00:00 2001 From: datbth Date: Mon, 12 Aug 2024 15:17:36 +0700 Subject: [PATCH 1/3] allow setting Logical types for pqarrow file writer --- go/parquet/pqarrow/file_writer.go | 6 ++- go/parquet/pqarrow/file_writer_test.go | 56 ++++++++++++++++++++++++++ go/parquet/pqarrow/logical_type.go | 15 +++++++ go/parquet/pqarrow/schema.go | 31 +++++++++----- 4 files changed, 98 insertions(+), 10 deletions(-) create mode 100644 go/parquet/pqarrow/logical_type.go diff --git a/go/parquet/pqarrow/file_writer.go b/go/parquet/pqarrow/file_writer.go index 539c544829e..241ded6753e 100644 --- a/go/parquet/pqarrow/file_writer.go +++ b/go/parquet/pqarrow/file_writer.go @@ -63,11 +63,15 @@ type FileWriter struct { // the ArrowColumnWriter and WriteArrow functions which allow writing arrow to an existing // file.Writer, this will create a new file.Writer based on the schema provided. func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error) { + return NewFileWriterWithLogicalTypes(arrschema, w, props, arrprops, nil) +} + +func NewFileWriterWithLogicalTypes(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties, logicalTypes []*LogicalType) (*FileWriter, error) { if props == nil { props = parquet.NewWriterProperties() } - pqschema, err := ToParquet(arrschema, props, arrprops) + pqschema, err := ToParquetWithLogicalTypes(arrschema, props, arrprops, logicalTypes) if err != nil { return nil, err } diff --git a/go/parquet/pqarrow/file_writer_test.go b/go/parquet/pqarrow/file_writer_test.go index 5b807389a3e..31dd4cab6c1 100644 --- a/go/parquet/pqarrow/file_writer_test.go +++ b/go/parquet/pqarrow/file_writer_test.go @@ -26,7 +26,10 @@ import ( "github.com/apache/arrow/go/v18/arrow/array" "github.com/apache/arrow/go/v18/arrow/memory" "github.com/apache/arrow/go/v18/parquet" + "github.com/apache/arrow/go/v18/parquet/file" + "github.com/apache/arrow/go/v18/parquet/internal/encoding" "github.com/apache/arrow/go/v18/parquet/pqarrow" + pqschema "github.com/apache/arrow/go/v18/parquet/schema" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -133,3 +136,56 @@ func TestFileWriterBuffered(t *testing.T) { require.NoError(t, writer.Close()) assert.Equal(t, 4, writer.NumRows()) } + +func TestFileWriterWithLogicalTypes(t *testing.T) { + schema := arrow.NewSchema([]arrow.Field{ + {Name: "string", Nullable: true, Type: arrow.BinaryTypes.String}, + {Name: "json", Nullable: true, Type: arrow.BinaryTypes.String}, + }, nil) + + data := `[ + { "string": "{\"key\":\"value\"}", "json": "{\"key\":\"value\"}" }, + { "string": null, "json": null } + ]` + + logicalTypes := []*pqarrow.LogicalType{ + nil, + { Type: pqschema.JSONLogicalType{}, Length: -1 }, + } + + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer alloc.AssertSize(t, 0) + + record, _, err := array.RecordFromJSON(alloc, schema, strings.NewReader(data)) + require.NoError(t, err) + defer record.Release() + + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + sink := encoding.NewBufferWriter(0, mem) + defer sink.Release() + + writer, err := pqarrow.NewFileWriterWithLogicalTypes( + schema, + sink, + parquet.NewWriterProperties( + parquet.WithAllocator(alloc), + ), + pqarrow.NewArrowWriterProperties( + pqarrow.WithAllocator(alloc), + ), + logicalTypes, + ) + require.NoError(t, err) + + require.NoError(t, writer.Write(record)) + require.NoError(t, writer.Close()) + + reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes())) + require.NoError(t, err) + assert.EqualValues(t, 2, reader.NumRows()) + + parquetSchema := reader.MetaData().Schema + assert.EqualValues(t, "String", parquetSchema.Column(0).LogicalType().String()) + assert.EqualValues(t, "JSON", parquetSchema.Column(1).LogicalType().String()) +} \ No newline at end of file diff --git a/go/parquet/pqarrow/logical_type.go b/go/parquet/pqarrow/logical_type.go new file mode 100644 index 00000000000..7043a7b3e68 --- /dev/null +++ b/go/parquet/pqarrow/logical_type.go @@ -0,0 +1,15 @@ +package pqarrow + +import "github.com/apache/arrow/go/v18/parquet/schema" + +type LogicalType struct { + Type schema.LogicalType + Length int +} + +func NewLogicalType() *LogicalType { + return &LogicalType{ + Type: schema.NoLogicalType{}, + Length: -1, + } +} diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go index ce5cc6f9050..104dd12edb1 100644 --- a/go/parquet/pqarrow/schema.go +++ b/go/parquet/pqarrow/schema.go @@ -239,7 +239,7 @@ func structToNode(typ *arrow.StructType, name string, nullable bool, props *parq children := make(schema.FieldList, 0, typ.NumFields()) for _, f := range typ.Fields() { - n, err := fieldToNode(f.Name, f, props, arrprops) + n, err := fieldToNode(f.Name, f, props, arrprops, nil) if err != nil { return nil, err } @@ -249,7 +249,7 @@ func structToNode(typ *arrow.StructType, name string, nullable bool, props *parq return schema.NewGroupNode(name, repFromNullable(nullable), children, -1) } -func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (schema.Node, error) { +func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties, arrprops ArrowWriterProperties, customLogicalType *LogicalType) (schema.Node, error) { var ( logicalType schema.LogicalType = schema.NoLogicalType{} typ parquet.Type @@ -358,7 +358,7 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties elem = field.Type.(*arrow.FixedSizeListType).Elem() } - child, err := fieldToNode(name, arrow.Field{Name: name, Type: elem, Nullable: true}, props, arrprops) + child, err := fieldToNode(name, arrow.Field{Name: name, Type: elem, Nullable: true}, props, arrprops, nil) if err != nil { return nil, err } @@ -368,7 +368,7 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties // parquet has no dictionary type, dictionary is encoding, not schema level dictType := field.Type.(*arrow.DictionaryType) return fieldToNode(name, arrow.Field{Name: name, Type: dictType.ValueType, Nullable: field.Nullable, Metadata: field.Metadata}, - props, arrprops) + props, arrprops, customLogicalType) case arrow.EXTENSION: return fieldToNode(name, arrow.Field{ Name: name, @@ -378,15 +378,15 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties ipc.ExtensionTypeKeyName: field.Type.(arrow.ExtensionType).ExtensionName(), ipc.ExtensionMetadataKeyName: field.Type.(arrow.ExtensionType).Serialize(), }), - }, props, arrprops) + }, props, arrprops, customLogicalType) case arrow.MAP: mapType := field.Type.(*arrow.MapType) - keyNode, err := fieldToNode("key", mapType.KeyField(), props, arrprops) + keyNode, err := fieldToNode("key", mapType.KeyField(), props, arrprops, nil) if err != nil { return nil, err } - valueNode, err := fieldToNode("value", mapType.ItemField(), props, arrprops) + valueNode, err := fieldToNode("value", mapType.ItemField(), props, arrprops, nil) if err != nil { return nil, err } @@ -406,6 +406,11 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties return nil, fmt.Errorf("%w: support for %s", arrow.ErrNotImplemented, field.Type.ID()) } + if customLogicalType != nil { + logicalType = customLogicalType.Type + length = customLogicalType.Length + } + return schema.NewPrimitiveNodeLogical(name, repType, logicalType, typ, length, fieldIDFromMeta(field.Metadata)) } @@ -436,13 +441,21 @@ func fieldIDFromMeta(m arrow.Metadata) int32 { // ToParquet generates a Parquet Schema from an arrow Schema using the given properties to make // decisions when determining the logical/physical types of the columns. func ToParquet(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*schema.Schema, error) { + return ToParquetWithLogicalTypes(sc, props, arrprops, nil) +} + +func ToParquetWithLogicalTypes(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties, logicalTypes []*LogicalType) (*schema.Schema, error) { if props == nil { props = parquet.NewWriterProperties() } nodes := make(schema.FieldList, 0, sc.NumFields()) - for _, f := range sc.Fields() { - n, err := fieldToNode(f.Name, f, props, arrprops) + for i, f := range sc.Fields() { + var logicalType *LogicalType + if logicalTypes != nil && i < len(logicalTypes) { + logicalType = logicalTypes[i] + } + n, err := fieldToNode(f.Name, f, props, arrprops, logicalType) if err != nil { return nil, err } From ef9f2f6fe5a9e18990fb15c96f5e4394c03c87b1 Mon Sep 17 00:00:00 2001 From: datbth Date: Mon, 12 Aug 2024 18:18:51 +0700 Subject: [PATCH 2/3] pass down custom logical types via pqarrow.ArrowWriterProperties --- go/parquet/pqarrow/file_writer.go | 6 +----- go/parquet/pqarrow/file_writer_test.go | 4 ++-- go/parquet/pqarrow/properties.go | 10 +++++++++- go/parquet/pqarrow/schema.go | 8 ++------ 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/go/parquet/pqarrow/file_writer.go b/go/parquet/pqarrow/file_writer.go index 241ded6753e..539c544829e 100644 --- a/go/parquet/pqarrow/file_writer.go +++ b/go/parquet/pqarrow/file_writer.go @@ -63,15 +63,11 @@ type FileWriter struct { // the ArrowColumnWriter and WriteArrow functions which allow writing arrow to an existing // file.Writer, this will create a new file.Writer based on the schema provided. func NewFileWriter(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*FileWriter, error) { - return NewFileWriterWithLogicalTypes(arrschema, w, props, arrprops, nil) -} - -func NewFileWriterWithLogicalTypes(arrschema *arrow.Schema, w io.Writer, props *parquet.WriterProperties, arrprops ArrowWriterProperties, logicalTypes []*LogicalType) (*FileWriter, error) { if props == nil { props = parquet.NewWriterProperties() } - pqschema, err := ToParquetWithLogicalTypes(arrschema, props, arrprops, logicalTypes) + pqschema, err := ToParquet(arrschema, props, arrprops) if err != nil { return nil, err } diff --git a/go/parquet/pqarrow/file_writer_test.go b/go/parquet/pqarrow/file_writer_test.go index 31dd4cab6c1..dd060bbe140 100644 --- a/go/parquet/pqarrow/file_writer_test.go +++ b/go/parquet/pqarrow/file_writer_test.go @@ -165,7 +165,7 @@ func TestFileWriterWithLogicalTypes(t *testing.T) { sink := encoding.NewBufferWriter(0, mem) defer sink.Release() - writer, err := pqarrow.NewFileWriterWithLogicalTypes( + writer, err := pqarrow.NewFileWriter( schema, sink, parquet.NewWriterProperties( @@ -173,8 +173,8 @@ func TestFileWriterWithLogicalTypes(t *testing.T) { ), pqarrow.NewArrowWriterProperties( pqarrow.WithAllocator(alloc), + pqarrow.WithCustomLogicalTypes(logicalTypes), ), - logicalTypes, ) require.NoError(t, err) diff --git a/go/parquet/pqarrow/properties.go b/go/parquet/pqarrow/properties.go index 25a299c86f5..89ab942c5cd 100755 --- a/go/parquet/pqarrow/properties.go +++ b/go/parquet/pqarrow/properties.go @@ -33,7 +33,9 @@ type ArrowWriterProperties struct { coerceTimestampUnit arrow.TimeUnit allowTruncatedTimestamps bool storeSchema bool - noMapLogicalType bool + noMapLogicalType bool // if true, do not set Logical type for arrow.MAP + customLogicalTypes []*LogicalType // specify to customize the Logical types of the output parquet schema + // compliantNestedTypes bool } @@ -119,6 +121,12 @@ func WithNoMapLogicalType() WriterOption { } } +func WithCustomLogicalTypes(logicalTypes []*LogicalType) WriterOption { + return func(c *config) { + c.props.customLogicalTypes = logicalTypes + } +} + // func WithCompliantNestedTypes(enabled bool) WriterOption { // return func(c *config) { // c.props.compliantNestedTypes = enabled diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go index 104dd12edb1..ad955d5b631 100644 --- a/go/parquet/pqarrow/schema.go +++ b/go/parquet/pqarrow/schema.go @@ -441,10 +441,6 @@ func fieldIDFromMeta(m arrow.Metadata) int32 { // ToParquet generates a Parquet Schema from an arrow Schema using the given properties to make // decisions when determining the logical/physical types of the columns. func ToParquet(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties) (*schema.Schema, error) { - return ToParquetWithLogicalTypes(sc, props, arrprops, nil) -} - -func ToParquetWithLogicalTypes(sc *arrow.Schema, props *parquet.WriterProperties, arrprops ArrowWriterProperties, logicalTypes []*LogicalType) (*schema.Schema, error) { if props == nil { props = parquet.NewWriterProperties() } @@ -452,8 +448,8 @@ func ToParquetWithLogicalTypes(sc *arrow.Schema, props *parquet.WriterProperties nodes := make(schema.FieldList, 0, sc.NumFields()) for i, f := range sc.Fields() { var logicalType *LogicalType - if logicalTypes != nil && i < len(logicalTypes) { - logicalType = logicalTypes[i] + if arrprops.customLogicalTypes != nil && i < len(arrprops.customLogicalTypes) { + logicalType = arrprops.customLogicalTypes[i] } n, err := fieldToNode(f.Name, f, props, arrprops, logicalType) if err != nil { From 3de078191d15ce1d985ac76eceec25de464d44f9 Mon Sep 17 00:00:00 2001 From: datbth Date: Mon, 12 Aug 2024 22:24:47 +0700 Subject: [PATCH 3/3] use parquet schema.LogicalType directly --- go/parquet/pqarrow/file_writer_test.go | 4 ++-- go/parquet/pqarrow/logical_type.go | 15 --------------- go/parquet/pqarrow/properties.go | 7 ++++--- go/parquet/pqarrow/schema.go | 7 +++---- 4 files changed, 9 insertions(+), 24 deletions(-) delete mode 100644 go/parquet/pqarrow/logical_type.go diff --git a/go/parquet/pqarrow/file_writer_test.go b/go/parquet/pqarrow/file_writer_test.go index dd060bbe140..b2d111cb0f9 100644 --- a/go/parquet/pqarrow/file_writer_test.go +++ b/go/parquet/pqarrow/file_writer_test.go @@ -148,9 +148,9 @@ func TestFileWriterWithLogicalTypes(t *testing.T) { { "string": null, "json": null } ]` - logicalTypes := []*pqarrow.LogicalType{ + logicalTypes := []pqschema.LogicalType{ nil, - { Type: pqschema.JSONLogicalType{}, Length: -1 }, + pqschema.JSONLogicalType{}, } alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) diff --git a/go/parquet/pqarrow/logical_type.go b/go/parquet/pqarrow/logical_type.go deleted file mode 100644 index 7043a7b3e68..00000000000 --- a/go/parquet/pqarrow/logical_type.go +++ /dev/null @@ -1,15 +0,0 @@ -package pqarrow - -import "github.com/apache/arrow/go/v18/parquet/schema" - -type LogicalType struct { - Type schema.LogicalType - Length int -} - -func NewLogicalType() *LogicalType { - return &LogicalType{ - Type: schema.NoLogicalType{}, - Length: -1, - } -} diff --git a/go/parquet/pqarrow/properties.go b/go/parquet/pqarrow/properties.go index 89ab942c5cd..5c105070a02 100755 --- a/go/parquet/pqarrow/properties.go +++ b/go/parquet/pqarrow/properties.go @@ -22,6 +22,7 @@ import ( "github.com/apache/arrow/go/v18/arrow" "github.com/apache/arrow/go/v18/arrow/memory" "github.com/apache/arrow/go/v18/parquet/internal/encoding" + "github.com/apache/arrow/go/v18/parquet/schema" ) // ArrowWriterProperties are used to determine how to manipulate the arrow data @@ -33,8 +34,8 @@ type ArrowWriterProperties struct { coerceTimestampUnit arrow.TimeUnit allowTruncatedTimestamps bool storeSchema bool - noMapLogicalType bool // if true, do not set Logical type for arrow.MAP - customLogicalTypes []*LogicalType // specify to customize the Logical types of the output parquet schema + noMapLogicalType bool // if true, do not set Logical type for arrow.MAP + customLogicalTypes []schema.LogicalType // specify to customize the Logical types of the output parquet schema // compliantNestedTypes bool } @@ -121,7 +122,7 @@ func WithNoMapLogicalType() WriterOption { } } -func WithCustomLogicalTypes(logicalTypes []*LogicalType) WriterOption { +func WithCustomLogicalTypes(logicalTypes []schema.LogicalType) WriterOption { return func(c *config) { c.props.customLogicalTypes = logicalTypes } diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go index ad955d5b631..b3aa5f0e2e2 100644 --- a/go/parquet/pqarrow/schema.go +++ b/go/parquet/pqarrow/schema.go @@ -249,7 +249,7 @@ func structToNode(typ *arrow.StructType, name string, nullable bool, props *parq return schema.NewGroupNode(name, repFromNullable(nullable), children, -1) } -func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties, arrprops ArrowWriterProperties, customLogicalType *LogicalType) (schema.Node, error) { +func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties, arrprops ArrowWriterProperties, customLogicalType schema.LogicalType) (schema.Node, error) { var ( logicalType schema.LogicalType = schema.NoLogicalType{} typ parquet.Type @@ -407,8 +407,7 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties } if customLogicalType != nil { - logicalType = customLogicalType.Type - length = customLogicalType.Length + logicalType = customLogicalType } return schema.NewPrimitiveNodeLogical(name, repType, logicalType, typ, length, fieldIDFromMeta(field.Metadata)) @@ -447,7 +446,7 @@ func ToParquet(sc *arrow.Schema, props *parquet.WriterProperties, arrprops Arrow nodes := make(schema.FieldList, 0, sc.NumFields()) for i, f := range sc.Fields() { - var logicalType *LogicalType + var logicalType schema.LogicalType if arrprops.customLogicalTypes != nil && i < len(arrprops.customLogicalTypes) { logicalType = arrprops.customLogicalTypes[i] }