From 0bf628b59136ae0bd366aa8e5ec6171b0d74d420 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 7 Sep 2022 14:45:25 -0400 Subject: [PATCH 1/5] bump to go1.17 but use -compat=1.16 for go.sum dependencies --- .env | 2 +- .github/workflows/go.yml | 24 +++++++++++------------ ci/docker/debian-10-go.dockerfile | 2 +- ci/docker/debian-11-go.dockerfile | 2 +- ci/scripts/go_build.sh | 2 +- dev/release/verify-release-candidate.sh | 2 +- dev/tasks/tasks.yml | 4 ++-- go/go.mod | 26 ++++++++++++++++++++++--- 8 files changed, 42 insertions(+), 22 deletions(-) diff --git a/.env b/.env index 4aa04daab04..8ae036b6b5b 100644 --- a/.env +++ b/.env @@ -58,7 +58,7 @@ CUDA=11.0.3 DASK=latest DOTNET=6.0 GCC_VERSION="" -GO=1.16 +GO=1.17 STATICCHECK=v0.2.2 HDFS=3.2.1 JDK=8 diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 4112bf3bd4c..5fccebbca15 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -50,9 +50,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest @@ -86,9 +86,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest @@ -123,9 +123,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest @@ -158,9 +158,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest @@ -193,9 +193,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest @@ -228,9 +228,9 @@ jobs: strategy: fail-fast: false matrix: - go: [1.16, 1.18] + go: [1.17, 1.18] include: - - go: 1.16 + - go: 1.17 staticcheck: v0.2.2 - go: 1.18 staticcheck: latest diff --git a/ci/docker/debian-10-go.dockerfile b/ci/docker/debian-10-go.dockerfile index dfe81f5a73c..8d964c76a66 100644 --- a/ci/docker/debian-10-go.dockerfile +++ b/ci/docker/debian-10-go.dockerfile @@ -16,7 +16,7 @@ # under the License. ARG arch=amd64 -ARG go=1.16 +ARG go=1.17 ARG staticcheck=v0.2.2 FROM ${arch}/golang:${go}-buster diff --git a/ci/docker/debian-11-go.dockerfile b/ci/docker/debian-11-go.dockerfile index 32d7b3af390..9f75bf23fdd 100644 --- a/ci/docker/debian-11-go.dockerfile +++ b/ci/docker/debian-11-go.dockerfile @@ -16,7 +16,7 @@ # under the License. ARG arch=amd64 -ARG go=1.16 +ARG go=1.17 ARG staticcheck=v0.2.2 FROM ${arch}/golang:${go}-bullseye diff --git a/ci/scripts/go_build.sh b/ci/scripts/go_build.sh index 43f348b1538..c113bbd320e 100755 --- a/ci/scripts/go_build.sh +++ b/ci/scripts/go_build.sh @@ -22,7 +22,7 @@ set -ex source_dir=${1}/go ARCH=`uname -m` -# Arm64 CI is triggered by travis and run in arm64v8/golang:1.16-bullseye +# Arm64 CI is triggered by travis and run in arm64v8/golang:1.17-bullseye if [ "aarch64" == "$ARCH" ]; then # Install `staticcheck` GO111MODULE=on go install honnef.co/go/tools/cmd/staticcheck@v0.2.2 diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh index eb44e3e4fec..b016988ba91 100755 --- a/dev/release/verify-release-candidate.sh +++ b/dev/release/verify-release-candidate.sh @@ -399,7 +399,7 @@ install_go() { return 0 fi - local version=1.16.12 + local version=1.17.13 show_info "Installing go version ${version}..." local arch="$(uname -m)" diff --git a/dev/tasks/tasks.yml b/dev/tasks/tasks.yml index 0816c24589e..ae3c613902b 100644 --- a/dev/tasks/tasks.yml +++ b/dev/tasks/tasks.yml @@ -1449,13 +1449,13 @@ tasks: ci: github template: r/github.linux.revdepcheck.yml - test-debian-11-go-1.16: + test-debian-11-go-1.17: ci: azure template: docker-tests/azure.linux.yml params: env: DEBIAN: 11 - GO: 1.16 + GO: 1.17 image: debian-go test-ubuntu-default-docs: diff --git a/go/go.mod b/go/go.mod index 9e7054b8d52..25ca1e084c7 100644 --- a/go/go.mod +++ b/go/go.mod @@ -16,7 +16,7 @@ module github.com/apache/arrow/go/v10 -go 1.16 +go 1.17 require ( github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c @@ -28,7 +28,6 @@ require ( github.com/google/flatbuffers v2.0.8+incompatible github.com/klauspost/asmfmt v1.3.2 github.com/klauspost/compress v1.15.9 - github.com/kr/pretty v0.3.0 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 github.com/pierrec/lz4/v4 v4.1.15 @@ -42,12 +41,33 @@ require ( gonum.org/v1/gonum v0.11.0 google.golang.org/grpc v1.49.0 google.golang.org/protobuf v1.28.1 + modernc.org/sqlite v1.18.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/kr/pretty v0.3.0 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect + github.com/stretchr/objx v0.4.0 // indirect + golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect + golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect + golang.org/x/text v0.3.7 // indirect + google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/uint128 v1.2.0 // indirect modernc.org/cc/v3 v3.36.3 // indirect modernc.org/ccgo/v3 v3.16.9 // indirect modernc.org/libc v1.17.1 // indirect + modernc.org/mathutil v1.5.0 // indirect + modernc.org/memory v1.2.1 // indirect modernc.org/opt v0.1.3 // indirect - modernc.org/sqlite v1.18.1 modernc.org/strutil v1.1.3 // indirect + modernc.org/token v1.0.0 // indirect ) From 6f949d5f8007883b844cbef2e12b9ef77796aed9 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 7 Sep 2022 14:45:44 -0400 Subject: [PATCH 2/5] implement proper C Data Stream export and tests --- go/arrow/cdata/cdata_exports.go | 25 ++++++++++- go/arrow/cdata/cdata_test.go | 35 +++++++++++++++ go/arrow/cdata/exports.go | 76 ++++++++++++++++++++------------- go/arrow/cdata/interface.go | 9 ++++ 4 files changed, 115 insertions(+), 30 deletions(-) diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go index a3da68447db..edb5ff855e6 100644 --- a/go/arrow/cdata/cdata_exports.go +++ b/go/arrow/cdata/cdata_exports.go @@ -36,6 +36,7 @@ import ( "encoding/binary" "fmt" "reflect" + "runtime/cgo" "strings" "unsafe" @@ -362,7 +363,7 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { out.buffers = (*unsafe.Pointer)(unsafe.Pointer(&buffers[0])) } - out.private_data = unsafe.Pointer(storeData(arr.Data())) + out.private_data = unsafe.Pointer(cgo.NewHandle(arr.Data())) out.release = (*[0]byte)(C.goReleaseArray) switch arr := arr.(type) { case *array.List: @@ -400,3 +401,25 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { out.children = nil } } + +type cRecordReader struct { + rdr array.RecordReader +} + +func (rr cRecordReader) getSchema(out *CArrowSchema) int { + ExportArrowSchema(rr.rdr.Schema(), out) + return 0 +} + +func (rr cRecordReader) next(out *CArrowArray) int { + if rr.rdr.Next() { + ExportArrowRecordBatch(rr.rdr.Record(), out, nil) + return 0 + } + releaseArr(out) + return 0 +} + +func (rr cRecordReader) release() { + rr.rdr.Release() +} diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go index 0b73a08d6b0..a36d82f2c99 100644 --- a/go/arrow/cdata/cdata_test.go +++ b/go/arrow/cdata/cdata_test.go @@ -27,6 +27,7 @@ import ( "errors" "io" "runtime" + "runtime/cgo" "testing" "time" "unsafe" @@ -34,6 +35,7 @@ import ( "github.com/apache/arrow/go/v10/arrow" "github.com/apache/arrow/go/v10/arrow/array" "github.com/apache/arrow/go/v10/arrow/decimal128" + "github.com/apache/arrow/go/v10/arrow/internal/arrdata" "github.com/apache/arrow/go/v10/arrow/memory" "github.com/stretchr/testify/assert" ) @@ -659,3 +661,36 @@ func TestRecordReaderStream(t *testing.T) { assert.Equal(t, "baz", rec.Column(1).(*array.String).Value(2)) } } + +func TestExportRecordReaderStream(t *testing.T) { + reclist := arrdata.Records["primitives"] + rdr, _ := array.NewRecordReader(reclist[0].Schema(), reclist) + + var out CArrowArrayStream + ExportRecordReader(rdr, &out) + + assert.NotNil(t, out.get_schema) + assert.NotNil(t, out.get_next) + assert.NotNil(t, out.get_last_error) + assert.NotNil(t, out.release) + assert.NotNil(t, out.private_data) + + h := cgo.Handle(out.private_data) + assert.Same(t, rdr, h.Value().(cRecordReader).rdr) + + importedRdr := ImportCArrayStream(&out, nil) + i := 0 + for { + rec, err := importedRdr.Read() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + assert.NoError(t, err) + } + + assert.Truef(t, array.RecordEqual(reclist[i], rec), "expected: %s\ngot: %s", reclist[i], rec) + i++ + } + assert.EqualValues(t, len(reclist), i) +} diff --git a/go/arrow/cdata/exports.go b/go/arrow/cdata/exports.go index 4ad4b7fac31..355ae1f60df 100644 --- a/go/arrow/cdata/exports.go +++ b/go/arrow/cdata/exports.go @@ -18,42 +18,24 @@ package cdata import ( "reflect" - "sync" - "sync/atomic" + "runtime/cgo" "unsafe" "github.com/apache/arrow/go/v10/arrow" + "github.com/apache/arrow/go/v10/arrow/array" ) // #include // #include "arrow/c/helpers.h" +// +// typedef const char cchar_t; +// extern int streamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*); +// extern int streamGetNext(struct ArrowArrayStream*, struct ArrowArray*); +// extern const char* streamGetError(struct ArrowArrayStream*); +// extern void streamRelease(struct ArrowArrayStream*); +// import "C" -var ( - handles = sync.Map{} - handleIdx uintptr -) - -type dataHandle uintptr - -func storeData(d arrow.ArrayData) dataHandle { - h := atomic.AddUintptr(&handleIdx, 1) - if h == 0 { - panic("cgo: ran out of space") - } - d.Retain() - handles.Store(h, d) - return dataHandle(h) -} - -func (d dataHandle) releaseData() { - arrd, ok := handles.LoadAndDelete(uintptr(d)) - if !ok { - panic("cgo: invalid datahandle") - } - arrd.(arrow.ArrayData).Release() -} - //export releaseExportedSchema func releaseExportedSchema(schema *CArrowSchema) { if C.ArrowSchemaIsReleased(schema) == 1 { @@ -108,6 +90,42 @@ func releaseExportedArray(arr *CArrowArray) { C.free(unsafe.Pointer(arr.children)) } - h := dataHandle(arr.private_data) - h.releaseData() + h := cgo.Handle(arr.private_data) + h.Value().(arrow.ArrayData).Release() + h.Delete() +} + +//export streamGetSchema +func streamGetSchema(handle *CArrowArrayStream, out *CArrowSchema) C.int { + h := cgo.Handle(handle.private_data) + rdr := h.Value().(cRecordReader) + return C.int(rdr.getSchema(out)) +} + +//export streamGetNext +func streamGetNext(handle *CArrowArrayStream, out *CArrowArray) C.int { + h := cgo.Handle(handle.private_data) + rdr := h.Value().(cRecordReader) + return C.int(rdr.next(out)) +} + +//export streamGetError +func streamGetError(*CArrowArrayStream) *C.cchar_t { return nil } + +//export streamRelease +func streamRelease(handle *CArrowArrayStream) { + h := cgo.Handle(handle.private_data) + h.Value().(cRecordReader).release() + h.Delete() + handle.release = nil + handle.private_data = nil +} + +func exportStream(rdr array.RecordReader, out *CArrowArrayStream) { + out.get_schema = (*[0]byte)(C.streamGetSchema) + out.get_next = (*[0]byte)(C.streamGetNext) + out.get_last_error = (*[0]byte)(C.streamGetError) + out.release = (*[0]byte)(C.streamRelease) + h := cgo.NewHandle(cRecordReader{rdr}) + out.private_data = unsafe.Pointer(h) } diff --git a/go/arrow/cdata/interface.go b/go/arrow/cdata/interface.go index e567ce599a4..9b80b7c2f0d 100644 --- a/go/arrow/cdata/interface.go +++ b/go/arrow/cdata/interface.go @@ -225,6 +225,15 @@ func ExportArrowArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema exportArray(arr, out, outSchema) } +// ExportRecordReader populates the CArrowArrayStream that is passed in with the appropriate +// callbacks to be a working ArrowArrayStream utilizing the passed in RecordReader. The +// CArrowArrayStream takes ownership of the RecordReader until the consumer calls the release +// callback, as such it is unnecesary to call Release on the passed in reader unless it has +// previously been retained. +func ExportRecordReader(reader array.RecordReader, out *CArrowArrayStream) { + exportStream(reader, out) +} + // ReleaseCArrowArray calls ArrowArrayRelease on the passed in cdata array func ReleaseCArrowArray(arr *CArrowArray) { releaseArr(arr) } From a6654550c41291ff859e69142aaf6b86caa95530 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 7 Sep 2022 14:56:02 -0400 Subject: [PATCH 3/5] lost the retain, oops. --- go/arrow/cdata/cdata_exports.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go index edb5ff855e6..acb94b80e62 100644 --- a/go/arrow/cdata/cdata_exports.go +++ b/go/arrow/cdata/cdata_exports.go @@ -363,6 +363,7 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { out.buffers = (*unsafe.Pointer)(unsafe.Pointer(&buffers[0])) } + arr.Data().Retain() out.private_data = unsafe.Pointer(cgo.NewHandle(arr.Data())) out.release = (*[0]byte)(C.goReleaseArray) switch arr := arr.(type) { From b2913dc56072a5ddfa7b40e061acf63facdba4e2 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 7 Sep 2022 17:26:39 -0400 Subject: [PATCH 4/5] fix pointer handling and make it happy --- go/arrow/cdata/cdata_exports.go | 3 ++- go/arrow/cdata/cdata_test.go | 8 ++++---- go/arrow/cdata/cdata_test_framework.go | 2 ++ go/arrow/cdata/exports.go | 10 +++++----- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go index acb94b80e62..b69d44d9b50 100644 --- a/go/arrow/cdata/cdata_exports.go +++ b/go/arrow/cdata/cdata_exports.go @@ -364,7 +364,8 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { } arr.Data().Retain() - out.private_data = unsafe.Pointer(cgo.NewHandle(arr.Data())) + h := cgo.NewHandle(arr.Data()) + out.private_data = unsafe.Pointer(&h) out.release = (*[0]byte)(C.goReleaseArray) switch arr := arr.(type) { case *array.List: diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go index a36d82f2c99..a143b5cb79a 100644 --- a/go/arrow/cdata/cdata_test.go +++ b/go/arrow/cdata/cdata_test.go @@ -666,8 +666,8 @@ func TestExportRecordReaderStream(t *testing.T) { reclist := arrdata.Records["primitives"] rdr, _ := array.NewRecordReader(reclist[0].Schema(), reclist) - var out CArrowArrayStream - ExportRecordReader(rdr, &out) + out := createTestStreamObj() + ExportRecordReader(rdr, out) assert.NotNil(t, out.get_schema) assert.NotNil(t, out.get_next) @@ -675,10 +675,10 @@ func TestExportRecordReaderStream(t *testing.T) { assert.NotNil(t, out.release) assert.NotNil(t, out.private_data) - h := cgo.Handle(out.private_data) + h := *(*cgo.Handle)(out.private_data) assert.Same(t, rdr, h.Value().(cRecordReader).rdr) - importedRdr := ImportCArrayStream(&out, nil) + importedRdr := ImportCArrayStream(out, nil) i := 0 for { rec, err := importedRdr.Read() diff --git a/go/arrow/cdata/cdata_test_framework.go b/go/arrow/cdata/cdata_test_framework.go index bb4db1e339b..8bf2478401f 100644 --- a/go/arrow/cdata/cdata_test_framework.go +++ b/go/arrow/cdata/cdata_test_framework.go @@ -251,6 +251,8 @@ func createCArr(arr arrow.Array) *CArrowArray { return carr } +func createTestStreamObj() *CArrowArrayStream { return C.get_test_stream() } + func arrayStreamTest() *CArrowArrayStream { st := C.get_test_stream() C.setup_array_stream_test(2, st) diff --git a/go/arrow/cdata/exports.go b/go/arrow/cdata/exports.go index 355ae1f60df..c7d77a52a72 100644 --- a/go/arrow/cdata/exports.go +++ b/go/arrow/cdata/exports.go @@ -90,21 +90,21 @@ func releaseExportedArray(arr *CArrowArray) { C.free(unsafe.Pointer(arr.children)) } - h := cgo.Handle(arr.private_data) + h := *(*cgo.Handle)(arr.private_data) h.Value().(arrow.ArrayData).Release() h.Delete() } //export streamGetSchema func streamGetSchema(handle *CArrowArrayStream, out *CArrowSchema) C.int { - h := cgo.Handle(handle.private_data) + h := *(*cgo.Handle)(handle.private_data) rdr := h.Value().(cRecordReader) return C.int(rdr.getSchema(out)) } //export streamGetNext func streamGetNext(handle *CArrowArrayStream, out *CArrowArray) C.int { - h := cgo.Handle(handle.private_data) + h := *(*cgo.Handle)(handle.private_data) rdr := h.Value().(cRecordReader) return C.int(rdr.next(out)) } @@ -114,7 +114,7 @@ func streamGetError(*CArrowArrayStream) *C.cchar_t { return nil } //export streamRelease func streamRelease(handle *CArrowArrayStream) { - h := cgo.Handle(handle.private_data) + h := *(*cgo.Handle)(handle.private_data) h.Value().(cRecordReader).release() h.Delete() handle.release = nil @@ -127,5 +127,5 @@ func exportStream(rdr array.RecordReader, out *CArrowArrayStream) { out.get_last_error = (*[0]byte)(C.streamGetError) out.release = (*[0]byte)(C.streamRelease) h := cgo.NewHandle(cRecordReader{rdr}) - out.private_data = unsafe.Pointer(h) + out.private_data = unsafe.Pointer(&h) } From 1e1987d930fe886f2cdad13f5a5ef467f13c8a58 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 7 Sep 2022 18:17:09 -0400 Subject: [PATCH 5/5] fix asan issue by memset'ing the structs --- go/arrow/cdata/cdata.go | 18 ++++++++++++------ go/arrow/cdata/cdata_test_framework.go | 10 ++++++++-- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go index a2b583f268e..aec166a0110 100644 --- a/go/arrow/cdata/cdata.go +++ b/go/arrow/cdata/cdata.go @@ -27,7 +27,11 @@ package cdata // int stream_get_schema(struct ArrowArrayStream* st, struct ArrowSchema* out) { return st->get_schema(st, out); } // int stream_get_next(struct ArrowArrayStream* st, struct ArrowArray* out) { return st->get_next(st, out); } // const char* stream_get_last_error(struct ArrowArrayStream* st) { return st->get_last_error(st); } -// struct ArrowArray* get_arr() { return (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); } +// struct ArrowArray* get_arr() { +// struct ArrowArray* out = (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); +// memset(out, 0, sizeof(struct ArrowArray)); +// return out; +// } // struct ArrowArrayStream* get_stream() { return (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); } // import "C" @@ -655,18 +659,22 @@ func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, er func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) { rdr.stream = C.get_stream() C.ArrowArrayStreamMove(stream, rdr.stream) + rdr.arr = C.get_arr() runtime.SetFinalizer(rdr, func(r *nativeCRecordBatchReader) { if r.cur != nil { r.cur.Release() } C.ArrowArrayStreamRelease(r.stream) + C.ArrowArrayRelease(r.arr) C.free(unsafe.Pointer(r.stream)) + C.free(unsafe.Pointer(r.arr)) }) } // Record Batch reader that conforms to arrio.Reader for the ArrowArrayStream interface type nativeCRecordBatchReader struct { stream *CArrowArrayStream + arr *CArrowArray schema *arrow.Schema cur arrow.Record @@ -713,18 +721,16 @@ func (n *nativeCRecordBatchReader) next() error { n.cur = nil } - arr := C.get_arr() - defer C.free(unsafe.Pointer(arr)) - errno := C.stream_get_next(n.stream, arr) + errno := C.stream_get_next(n.stream, n.arr) if errno != 0 { return n.getError(int(errno)) } - if C.ArrowArrayIsReleased(arr) == 1 { + if C.ArrowArrayIsReleased(n.arr) == 1 { return io.EOF } - rec, err := ImportCRecordBatchWithSchema(arr, n.schema) + rec, err := ImportCRecordBatchWithSchema(n.arr, n.schema) if err != nil { return err } diff --git a/go/arrow/cdata/cdata_test_framework.go b/go/arrow/cdata/cdata_test_framework.go index 8bf2478401f..0274b01fb73 100644 --- a/go/arrow/cdata/cdata_test_framework.go +++ b/go/arrow/cdata/cdata_test_framework.go @@ -26,7 +26,11 @@ package cdata // // void setup_array_stream_test(const int n_batches, struct ArrowArrayStream* out); // struct ArrowArray* get_test_arr() { return (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); } -// struct ArrowArrayStream* get_test_stream() { return (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); } +// struct ArrowArrayStream* get_test_stream() { +// struct ArrowArrayStream* out = (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); +// memset(out, 0, sizeof(struct ArrowArrayStream)); +// return out; +// } // // void release_test_arr(struct ArrowArray* arr) { // for (int i = 0; i < arr->n_buffers; ++i) { @@ -251,7 +255,9 @@ func createCArr(arr arrow.Array) *CArrowArray { return carr } -func createTestStreamObj() *CArrowArrayStream { return C.get_test_stream() } +func createTestStreamObj() *CArrowArrayStream { + return C.get_test_stream() +} func arrayStreamTest() *CArrowArrayStream { st := C.get_test_stream()