diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go index e6889743dfd..e5c7d1dbcf6 100644 --- a/go/arrow/ipc/ipc.go +++ b/go/arrow/ipc/ipc.go @@ -37,8 +37,9 @@ const ( ) var ( - paddingBytes [kArrowAlignment]byte - kEOS = [4]byte{0, 0, 0, 0} // end of stream message + paddingBytes [kArrowAlignment]byte + kEOS = [8]byte{0, 0, 0, 0, 0, 0, 0, 0} // end of stream message + kIPCContToken uint32 = 0xFFFFFFFF // 32b continuation indicator for FlatBuffers 8b alignment ) func paddedLength(nbytes int64, alignment int32) int64 { diff --git a/go/arrow/ipc/message.go b/go/arrow/ipc/message.go index bb12dbbc518..bfd9494b6cb 100644 --- a/go/arrow/ipc/message.go +++ b/go/arrow/ipc/message.go @@ -181,12 +181,31 @@ func (r *MessageReader) Message() (*Message, error) { var buf = make([]byte, 4) _, err := io.ReadFull(r.r, buf) if err != nil { - return nil, errors.Wrap(err, "arrow/ipc: could not read message length") + return nil, errors.Wrap(err, "arrow/ipc: could not read continuation indicator") } - msgLen := int32(binary.LittleEndian.Uint32(buf)) - if msgLen == 0 { - // optional 0 EOS control message + var ( + cid = binary.LittleEndian.Uint32(buf) + msgLen int32 + ) + switch cid { + case 0: + // EOS message. return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error? + case kIPCContToken: + _, err = io.ReadFull(r.r, buf) + if err != nil { + return nil, errors.Wrap(err, "arrow/ipc: could not read message length") + } + msgLen = int32(binary.LittleEndian.Uint32(buf)) + if msgLen == 0 { + // optional 0 EOS control message + return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error? + } + + default: + // ARROW-6314: backwards compatibility for reading old IPC + // messages produced prior to version 0.15.0 + msgLen = int32(cid) } buf = make([]byte, msgLen) diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go index 7b4e3dad433..034d3e124f4 100644 --- a/go/arrow/ipc/metadata.go +++ b/go/arrow/ipc/metadata.go @@ -88,7 +88,19 @@ func (blk fileBlock) NewMessage() (*Message, error) { if err != nil { return nil, errors.Wrap(err, "arrow/ipc: could not read message metadata") } - meta := memory.NewBufferBytes(buf[4:]) // drop buf-size already known from blk.Meta + + prefix := 0 + switch binary.LittleEndian.Uint32(buf) { + case 0: + case kIPCContToken: + prefix = 8 + default: + // ARROW-6314: backwards compatibility for reading old IPC + // messages produced prior to version 0.15.0 + prefix = 4 + } + + meta := memory.NewBufferBytes(buf[prefix:]) // drop buf-size already known from blk.Meta buf = make([]byte, blk.Body) _, err = io.ReadFull(r, buf) @@ -1002,19 +1014,26 @@ func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error) ) // ARROW-3212: we do not make any assumption on whether the output stream is aligned or not. - paddedMsgLen := int32(msg.Len()) + 4 + paddedMsgLen := int32(msg.Len()) + 8 remainder := paddedMsgLen % alignment if remainder != 0 { paddedMsgLen += alignment - remainder } + tmp := make([]byte, 4) + + // write continuation indicator, to address 8-byte alignment requirement from FlatBuffers. + binary.LittleEndian.PutUint32(tmp, kIPCContToken) + _, err = w.Write(tmp) + if err != nil { + return 0, errors.Wrap(err, "arrow/ipc: could not write continuation bit indicator") + } + // the returned message size includes the length prefix, the flatbuffer, + padding n = int(paddedMsgLen) - tmp := make([]byte, 4) - // write the flatbuffer size prefix, including padding - sizeFB := paddedMsgLen - 4 + sizeFB := paddedMsgLen - 8 binary.LittleEndian.PutUint32(tmp, uint32(sizeFB)) _, err = w.Write(tmp) if err != nil { @@ -1028,7 +1047,7 @@ func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error) } // write any padding - padding := paddedMsgLen - int32(msg.Len()) - 4 + padding := paddedMsgLen - int32(msg.Len()) - 8 if padding > 0 { _, err = w.Write(paddingBytes[:padding]) if err != nil {