diff --git a/CHANGES.md b/CHANGES.md index 82519483b7fd..b8775c11742d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* The Go SDK's Row Coder now uses a different single-precision float encoding for float32 types to match Java's behavior ([#22629](https://github.com/apache/beam/issues/22629)). ## Deprecations diff --git a/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go b/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go index f0436cd5506d..f18fa43f5a4e 100644 --- a/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go +++ b/sdks/go/pkg/beam/core/graph/coder/coder_fuzz_test.go @@ -65,6 +65,25 @@ func FuzzEncodeDecodeDouble(f *testing.F) { }) } +func FuzzEncodeDecodeSinglePrecisionFloat(f *testing.F) { + f.Add(float32(3.141)) + f.Fuzz(func(t *testing.T, a float32) { + var buf bytes.Buffer + err := EncodeSinglePrecisionFloat(a, &buf) + if err != nil { + return + } + + actual, err := DecodeSinglePrecisionFloat(&buf) + if err != nil { + t.Fatalf("DecodeDouble(%v) failed: %v", &buf, err) + } + if math.Abs(float64(actual-a)) > floatPrecision { + t.Fatalf("got %f, want %f +/- %f", actual, a, floatPrecision) + } + }) +} + func FuzzEncodeDecodeUInt64(f *testing.F) { f.Add(uint64(42)) f.Fuzz(func(t *testing.T, b uint64) { diff --git a/sdks/go/pkg/beam/core/graph/coder/float.go b/sdks/go/pkg/beam/core/graph/coder/float.go new file mode 100644 index 000000000000..4a311f5cf72b --- /dev/null +++ b/sdks/go/pkg/beam/core/graph/coder/float.go @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "encoding/binary" + "io" + "math" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" +) + +// EncodeSinglePrecisionFloat encodes a float32 in big endian format. +func EncodeSinglePrecisionFloat(value float32, w io.Writer) error { + var data [4]byte + binary.BigEndian.PutUint32(data[:], math.Float32bits(value)) + _, err := ioutilx.WriteUnsafe(w, data[:]) + return err +} + +// DecodeSinglePrecisionFloat decodes a float32 in big endian format. +func DecodeSinglePrecisionFloat(r io.Reader) (float32, error) { + var data [4]byte + if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil { + return 0, err + } + return math.Float32frombits(binary.BigEndian.Uint32(data[:])), nil +} diff --git a/sdks/go/pkg/beam/core/graph/coder/float_test.go b/sdks/go/pkg/beam/core/graph/coder/float_test.go new file mode 100644 index 000000000000..645ea51e67d6 --- /dev/null +++ b/sdks/go/pkg/beam/core/graph/coder/float_test.go @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package coder + +import ( + "bytes" + "math" + "testing" +) + +func TestEncodeDecodeSinglePrecisionFloat(t *testing.T) { + var tests []float32 + for x := float32(-100.0); x <= float32(100.0); x++ { + tests = append(tests, 0.1*x) + } + tests = append(tests, -math.MaxFloat32) + tests = append(tests, math.MaxFloat32) + for _, test := range tests { + var buf bytes.Buffer + if err := EncodeSinglePrecisionFloat(test, &buf); err != nil { + t.Fatalf("EncodeSinglePrecisionFloat(%v) failed: %v", test, err) + } + t.Logf("Encoded %v to %v", test, buf.Bytes()) + + if len(buf.Bytes()) != 4 { + t.Errorf("len(EncodeSinglePrecisionFloat(%v)) = %v, want 4", test, len(buf.Bytes())) + } + + actual, err := DecodeSinglePrecisionFloat(&buf) + if err != nil { + t.Fatalf("DecodeSinglePrecisionFloat(<%v>) failed: %v", test, err) + } + if actual != test { + t.Errorf("DecodeSinglePrecisionFloat(<%v>) = %v, want %v", test, actual, test) + } + } +} diff --git a/sdks/go/pkg/beam/core/graph/coder/row_decoder.go b/sdks/go/pkg/beam/core/graph/coder/row_decoder.go index 9688ed9876c4..689f687af14d 100644 --- a/sdks/go/pkg/beam/core/graph/coder/row_decoder.go +++ b/sdks/go/pkg/beam/core/graph/coder/row_decoder.go @@ -241,6 +241,15 @@ func reflectDecodeUint(rv reflect.Value, r io.Reader) error { return nil } +func reflectDecodeSinglePrecisionFloat(rv reflect.Value, r io.Reader) error { + v, err := DecodeSinglePrecisionFloat(r) + if err != nil { + return errors.Wrap(err, "error decoding single-precision float field") + } + rv.SetFloat(float64(v)) + return nil +} + func reflectDecodeFloat(rv reflect.Value, r io.Reader) error { v, err := DecodeDouble(r) if err != nil { @@ -336,7 +345,9 @@ func (b *RowDecoderBuilder) decoderForSingleTypeReflect(t reflect.Type) (typeDec return typeDecoderFieldReflect{decode: reflectDecodeInt}, nil case reflect.Uint, reflect.Uint64, reflect.Uint32, reflect.Uint16: return typeDecoderFieldReflect{decode: reflectDecodeUint}, nil - case reflect.Float32, reflect.Float64: + case reflect.Float32: + return typeDecoderFieldReflect{decode: reflectDecodeSinglePrecisionFloat}, nil + case reflect.Float64: return typeDecoderFieldReflect{decode: reflectDecodeFloat}, nil case reflect.Ptr: decf, err := b.decoderForSingleTypeReflect(t.Elem()) diff --git a/sdks/go/pkg/beam/core/graph/coder/row_encoder.go b/sdks/go/pkg/beam/core/graph/coder/row_encoder.go index cfc1a8e51a3d..dc41890b004a 100644 --- a/sdks/go/pkg/beam/core/graph/coder/row_encoder.go +++ b/sdks/go/pkg/beam/core/graph/coder/row_encoder.go @@ -208,7 +208,11 @@ func (b *RowEncoderBuilder) encoderForSingleTypeReflect(t reflect.Type) (typeEnc return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error { return EncodeVarUint64(rv.Uint(), w) }}, nil - case reflect.Float32, reflect.Float64: + case reflect.Float32: + return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error { + return EncodeSinglePrecisionFloat(float32(rv.Float()), w) + }}, nil + case reflect.Float64: return typeEncoderFieldReflect{encode: func(rv reflect.Value, w io.Writer) error { return EncodeDouble(rv.Float(), w) }}, nil