From 904314e161eacf571be47321b68278eba02c4a31 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 10 Aug 2022 15:14:43 -0400 Subject: [PATCH 1/5] Implement standalone single-precision float encoder --- sdks/go/pkg/beam/core/graph/coder/float.go | 41 +++++++++++++++ .../pkg/beam/core/graph/coder/float_test.go | 50 +++++++++++++++++++ .../pkg/beam/core/graph/coder/row_decoder.go | 13 ++++- .../pkg/beam/core/graph/coder/row_encoder.go | 6 ++- 4 files changed, 108 insertions(+), 2 deletions(-) create mode 100644 sdks/go/pkg/beam/core/graph/coder/float.go create mode 100644 sdks/go/pkg/beam/core/graph/coder/float_test.go diff --git a/sdks/go/pkg/beam/core/graph/coder/float.go b/sdks/go/pkg/beam/core/graph/coder/float.go new file mode 100644 index 000000000000..70b326164c97 --- /dev/null +++ b/sdks/go/pkg/beam/core/graph/coder/float.go @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "encoding/binary" + "io" + "math" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" +) + +// EncodeSinglePrecisionFloat encodes a float32 in big endian format. +func EncodeSinglePrecisionFloat(value float32, w io.Writer) error { + var data [8]byte + binary.BigEndian.PutUint32(data[:], math.Float32bits(value)) + _, err := ioutilx.WriteUnsafe(w, data[:]) + return err +} + +// DecodeSinglePrecisionFloat decodes a float64 in big endian format. +func DecodeSinglePrecisionFloat(r io.Reader) (float32, error) { + var data [8]byte + if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil { + return 0, err + } + return math.Float32frombits(binary.BigEndian.Uint32(data[:])), nil +} diff --git a/sdks/go/pkg/beam/core/graph/coder/float_test.go b/sdks/go/pkg/beam/core/graph/coder/float_test.go new file mode 100644 index 000000000000..36dd09b57c06 --- /dev/null +++ b/sdks/go/pkg/beam/core/graph/coder/float_test.go @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "bytes" + "math" + "testing" +) + +func TestEncodeDecodeSinglePrecisionFloat(t *testing.T) { + var tests []float32 + for x := float32(-100.0); x <= float32(100.0); x++ { + tests = append(tests, 0.1*x) + } + tests = append(tests, -math.MaxFloat32) + tests = append(tests, math.MaxFloat32) + for _, test := range tests { + var buf bytes.Buffer + if err := EncodeSinglePrecisionFloat(test, &buf); err != nil { + t.Fatalf("EncodeSinglePrecisionFloat(%v) failed: %v", test, err) + } + t.Logf("Encoded %v to %v", test, buf.Bytes()) + + if len(buf.Bytes()) != 8 { + t.Errorf("len(EncodeSinglePrecisionFloat(%v)) = %v, want 8", test, len(buf.Bytes())) + } + + actual, err := DecodeSinglePrecisionFloat(&buf) + if err != nil { + t.Fatalf("DecodeSinglePrecisionFloat(<%v>) failed: %v", test, err) + } + if actual != test { + t.Errorf("DecodeSinglePrecisionFloat(<%v>) = %v, want %v", test, actual, test) + } + } +} diff --git a/sdks/go/pkg/beam/core/graph/coder/row_decoder.go b/sdks/go/pkg/beam/core/graph/coder/row_decoder.go index 9688ed9876c4..689f687af14d 100644 --- a/sdks/go/pkg/beam/core/graph/coder/row_decoder.go +++ b/sdks/go/pkg/beam/core/graph/coder/row_decoder.go @@ -241,6 +241,15 @@ func reflectDecodeUint(rv reflect.Value, r io.Reader) error { return nil } +func reflectDecodeSinglePrecisionFloat(rv reflect.Value, r io.Reader) error { + v, err := DecodeSinglePrecisionFloat(r) + if err != nil { + return errors.Wrap(err, "error decoding single-precision float field") + } + rv.SetFloat(float64(v)) + return nil +} + func reflectDecodeFloat(rv reflect.Value, r io.Reader) error { v, err := DecodeDouble(r) if err != nil { @@ -336,7 +345,9 @@ func (b *RowDecoderBuilder) decoderForSingleTypeReflect(t reflect.Type) (typeDec return typeDecoderFieldReflect{decode: reflectDecodeInt}, nil case reflect.Uint, reflect.Uint64, reflect.Uint32, reflect.Uint16: return typeDecoderFieldReflect{decode: reflectDecodeUint}, nil - case reflect.Float32, reflect.Float64: + case reflect.Float32: + return typeDecoderFieldReflect{decode: reflectDecodeSinglePrecisionFloat}, nil + case reflect.Float64: return typeDecoderFieldReflect{decode: reflectDecodeFloat}, nil case reflect.Ptr: decf, err := b.decoderForSingleTypeReflect(t.Elem()) diff --git a/sdks/go/pkg/beam/core/graph/coder/row_encoder.go b/sdks/go/pkg/beam/core/graph/coder/row_encoder.go index cfc1a8e51a3d..dc41890b004a 100644 --- a/sdks/go/pkg/beam/core/graph/coder/row_encoder.go +++ b/sdks/go/pkg/beam/core/graph/coder/row_encoder.go @@ -208,7 +208,11 @@ func (b *RowEncoderBuilder) encoderForSingleTypeReflect(t reflect.Type) (typeEnc return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error { return EncodeVarUint64(rv.Uint(), w) }}, nil - case reflect.Float32, reflect.Float64: + case reflect.Float32: + return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error { + return EncodeSinglePrecisionFloat(float32(rv.Float()), w) + }}, nil + case reflect.Float64: return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error { return EncodeDouble(rv.Float(), w) }}, nil From f7cf295aa6fdd6f9f0041b693794d329ae7aa52e Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 10 Aug 2022 15:27:26 -0400 Subject: [PATCH 2/5] Add fuzz test --- .../beam/core/graph/coder/coder_fuzz_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go b/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go index f0436cd5506d..f18fa43f5a4e 100644 --- a/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go +++ b/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go @@ -65,6 +65,25 @@ func FuzzEncodeDecodeDouble(f *testing.F) { }) } +func FuzzEncodeDecodeSinglePrecisionFloat(f *testing.F) { + f.Add(float32(3.141)) + f.Fuzz(func(t *testing.T, a float32) { + var buf bytes.Buffer + err := EncodeSinglePrecisionFloat(a, &buf) + if err != nil { + return + } + + actual, err := DecodeSinglePrecisionFloat(&buf) + if err != nil { + t.Fatalf("DecodeDouble(%v) failed: %v", &buf, err) + } + if math.Abs(float64(actual-a)) > floatPrecision { + t.Fatalf("got %f, want %f +/- %f", actual, a, floatPrecision) + } + }) +} + func FuzzEncodeDecodeUInt64(f *testing.F) { f.Add(uint64(42)) f.Fuzz(func(t *testing.T, b uint64) { From c827fcb280e20aefa735eb28acc43cb223e8d177 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 10 Aug 2022 15:34:51 -0400 Subject: [PATCH 3/5] Fix buffer size --- sdks/go/pkg/beam/core/graph/coder/float.go | 4 ++-- sdks/go/pkg/beam/core/graph/coder/float_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/core/graph/coder/float.go b/sdks/go/pkg/beam/core/graph/coder/float.go index 70b326164c97..69f165b12348 100644 --- a/sdks/go/pkg/beam/core/graph/coder/float.go +++ b/sdks/go/pkg/beam/core/graph/coder/float.go @@ -25,7 +25,7 @@ import ( // EncodeSinglePrecisionFloat encodes a float32 in big endian format. func EncodeSinglePrecisionFloat(value float32, w io.Writer) error { - var data [8]byte + var data [4]byte binary.BigEndian.PutUint32(data[:], math.Float32bits(value)) _, err := ioutilx.WriteUnsafe(w, data[:]) return err @@ -33,7 +33,7 @@ func EncodeSinglePrecisionFloat(value float32, w io.Writer) error { // DecodeSinglePrecisionFloat decodes a float64 in big endian format. func DecodeSinglePrecisionFloat(r io.Reader) (float32, error) { - var data [8]byte + var data [4]byte if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil { return 0, err } diff --git a/sdks/go/pkg/beam/core/graph/coder/float_test.go b/sdks/go/pkg/beam/core/graph/coder/float_test.go index 36dd09b57c06..645ea51e67d6 100644 --- a/sdks/go/pkg/beam/core/graph/coder/float_test.go +++ b/sdks/go/pkg/beam/core/graph/coder/float_test.go @@ -35,8 +35,8 @@ func TestEncodeDecodeSinglePrecisionFloat(t *testing.T) { } t.Logf("Encoded %v to %v", test, buf.Bytes()) - if len(buf.Bytes()) != 8 { - t.Errorf("len(EncodeSinglePrecisionFloat(%v)) = %v, want 8", test, len(buf.Bytes())) + if len(buf.Bytes()) != 4 { + t.Errorf("len(EncodeSinglePrecisionFloat(%v)) = %v, want 4", test, len(buf.Bytes())) } actual, err := DecodeSinglePrecisionFloat(&buf) From b015c48010aa21e47e644e55025088ff14d2e47d Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 10 Aug 2022 15:36:28 -0400 Subject: [PATCH 4/5] Fix docstring --- sdks/go/pkg/beam/core/graph/coder/float.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/graph/coder/float.go b/sdks/go/pkg/beam/core/graph/coder/float.go index 69f165b12348..4a311f5cf72b 100644 --- a/sdks/go/pkg/beam/core/graph/coder/float.go +++ b/sdks/go/pkg/beam/core/graph/coder/float.go @@ -31,7 +31,7 @@ func EncodeSinglePrecisionFloat(value float32, w io.Writer) error { return err } -// DecodeSinglePrecisionFloat decodes a float64 in big endian format. +// DecodeSinglePrecisionFloat decodes a float32 in big endian format. func DecodeSinglePrecisionFloat(r io.Reader) (float32, error) { var data [4]byte if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil { From b51dacb3bc826ceb7193ffb608048c135d187db1 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Fri, 12 Aug 2022 10:52:38 -0400 Subject: [PATCH 5/5] Add CHANGES.md entry --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 82519483b7fd..b8775c11742d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* The Go SDK's Row Coder now uses a different single-precision float encoding for float32 types to match Java's behavior ([#22629](https://github.com/apache/beam/issues/22629)). ## Deprecations