From 623c3d2a64115ada1987fa6e8a1a89ddf39a6838 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Tue, 9 Apr 2019 18:25:57 +0200 Subject: [PATCH] ARROW-5173: [Go] handle multiple concatenated record batches --- go/arrow/ipc/cmd/arrow-cat/main.go | 35 +++++++++++++++--------- go/arrow/ipc/cmd/arrow-ls/main.go | 43 +++++++++++++++++++++++------- 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/go/arrow/ipc/cmd/arrow-cat/main.go b/go/arrow/ipc/cmd/arrow-cat/main.go index cb4ff5a121d..e5f75e75ebb 100644 --- a/go/arrow/ipc/cmd/arrow-cat/main.go +++ b/go/arrow/ipc/cmd/arrow-cat/main.go @@ -89,19 +89,24 @@ func processStream(w io.Writer, rin io.Reader) error { mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer mem.AssertSize(nil, 0) - r, err := ipc.NewReader(rin, ipc.WithAllocator(mem)) - if err != nil { - return err - } - defer r.Release() - - n := 0 - for r.Next() { - n++ - fmt.Fprintf(w, "record %d...\n", n) - rec := r.Record() - for i, col := range rec.Columns() { - fmt.Fprintf(w, " col[%d] %q: %v\n", i, rec.ColumnName(i), col) + for { + r, err := ipc.NewReader(rin, ipc.WithAllocator(mem)) + if err != nil { + if errors.Cause(err) == io.EOF { + return nil + } + return err + } + defer r.Release() + + n := 0 + for r.Next() { + n++ + fmt.Fprintf(w, "record %d...\n", n) + rec := r.Record() + for i, col := range rec.Columns() { + fmt.Fprintf(w, " col[%d] %q: %v\n", i, rec.ColumnName(i), col) + } } } return nil @@ -142,6 +147,9 @@ func processFile(w io.Writer, fname string) error { r, err := ipc.NewFileReader(f, ipc.WithAllocator(mem)) if err != nil { + if errors.Cause(err) == io.EOF { + return nil + } return err } defer r.Close() @@ -159,6 +167,7 @@ func processFile(w io.Writer, fname string) error { fmt.Fprintf(w, " col[%d] %q: %v\n", i, rec.ColumnName(i), col) } } + return nil } diff --git a/go/arrow/ipc/cmd/arrow-ls/main.go b/go/arrow/ipc/cmd/arrow-ls/main.go index b858bf5c5db..cf9a765d6dc 100644 --- a/go/arrow/ipc/cmd/arrow-ls/main.go +++ b/go/arrow/ipc/cmd/arrow-ls/main.go @@ -53,6 +53,7 @@ package main // import "github.com/apache/arrow/go/arrow/ipc/cmd/arrow-ls" import ( + "bytes" "flag" "fmt" "io" @@ -63,6 +64,7 @@ import ( "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/ipc" "github.com/apache/arrow/go/arrow/memory" + "github.com/pkg/errors" ) func main() { @@ -87,19 +89,24 @@ func processStream(w io.Writer, rin io.Reader) error { mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer mem.AssertSize(nil, 0) - r, err := ipc.NewReader(rin, ipc.WithAllocator(mem)) - if err != nil { - return err - } - defer r.Release() + for { + r, err := ipc.NewReader(rin, ipc.WithAllocator(mem)) + if err != nil { + if errors.Cause(err) == io.EOF { + return nil + } + return err + } + defer r.Release() - fmt.Fprintf(w, "schema:\n%v", displaySchema(r.Schema())) + fmt.Fprintf(w, "schema:\n%v", displaySchema(r.Schema())) - nrecs := 0 - for r.Next() { - nrecs++ + nrecs := 0 + for r.Next() { + nrecs++ + } + fmt.Fprintf(w, "records: %d\n", nrecs) } - fmt.Fprintf(w, "records: %d\n", nrecs) return nil } @@ -121,11 +128,26 @@ func processFile(w io.Writer, fname string) error { } defer f.Close() + hdr := make([]byte, len(ipc.Magic)) + _, err = io.ReadFull(f, hdr) + if err != nil { + return errors.Errorf("could not read file header: %v", err) + } + f.Seek(0, io.SeekStart) + + if !bytes.Equal(hdr, ipc.Magic) { + // try as a stream. + return processStream(w, f) + } + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) defer mem.AssertSize(nil, 0) r, err := ipc.NewFileReader(f, ipc.WithAllocator(mem)) if err != nil { + if errors.Cause(err) == io.EOF { + return nil + } return err } defer r.Close() @@ -133,6 +155,7 @@ func processFile(w io.Writer, fname string) error { fmt.Fprintf(w, "version: %v\n", r.Version()) fmt.Fprintf(w, "schema:\n%v", displaySchema(r.Schema())) fmt.Fprintf(w, "records: %d\n", r.NumRecords()) + return nil }