Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go/arrow/ipc/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ const (
)

var (
paddingBytes [kArrowAlignment]byte
kEOS = [4]byte{0, 0, 0, 0} // end of stream message
paddingBytes [kArrowAlignment]byte
kEOS = [8]byte{0, 0, 0, 0, 0, 0, 0, 0} // end of stream message
kIPCContToken uint32 = 0xFFFFFFFF // 32b continuation indicator for FlatBuffers 8b alignment
)

func paddedLength(nbytes int64, alignment int32) int64 {
Expand Down
27 changes: 23 additions & 4 deletions go/arrow/ipc/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,31 @@ func (r *MessageReader) Message() (*Message, error) {
var buf = make([]byte, 4)
_, err := io.ReadFull(r.r, buf)
if err != nil {
return nil, errors.Wrap(err, "arrow/ipc: could not read message length")
return nil, errors.Wrap(err, "arrow/ipc: could not read continuation indicator")
}
msgLen := int32(binary.LittleEndian.Uint32(buf))
if msgLen == 0 {
// optional 0 EOS control message
var (
cid = binary.LittleEndian.Uint32(buf)
msgLen int32
)
switch cid {
case 0:
// EOS message.
return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error?
case kIPCContToken:
_, err = io.ReadFull(r.r, buf)
if err != nil {
return nil, errors.Wrap(err, "arrow/ipc: could not read message length")
}
msgLen = int32(binary.LittleEndian.Uint32(buf))
if msgLen == 0 {
// optional 0 EOS control message
return nil, io.EOF // FIXME(sbinet): send nil instead? or a special EOS error?
}

default:
// ARROW-6314: backwards compatibility for reading old IPC
// messages produced prior to version 0.15.0
msgLen = int32(cid)
}

buf = make([]byte, msgLen)
Expand Down
31 changes: 25 additions & 6 deletions go/arrow/ipc/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,19 @@ func (blk fileBlock) NewMessage() (*Message, error) {
if err != nil {
return nil, errors.Wrap(err, "arrow/ipc: could not read message metadata")
}
meta := memory.NewBufferBytes(buf[4:]) // drop buf-size already known from blk.Meta

prefix := 0
switch binary.LittleEndian.Uint32(buf) {
case 0:
case kIPCContToken:
prefix = 8
default:
// ARROW-6314: backwards compatibility for reading old IPC
// messages produced prior to version 0.15.0
prefix = 4
}

meta := memory.NewBufferBytes(buf[prefix:]) // drop buf-size already known from blk.Meta

buf = make([]byte, blk.Body)
_, err = io.ReadFull(r, buf)
Expand Down Expand Up @@ -1002,19 +1014,26 @@ func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error)
)

// ARROW-3212: we do not make any assumption on whether the output stream is aligned or not.
paddedMsgLen := int32(msg.Len()) + 4
paddedMsgLen := int32(msg.Len()) + 8
remainder := paddedMsgLen % alignment
if remainder != 0 {
paddedMsgLen += alignment - remainder
}

tmp := make([]byte, 4)

// write continuation indicator, to address 8-byte alignment requirement from FlatBuffers.
binary.LittleEndian.PutUint32(tmp, kIPCContToken)
_, err = w.Write(tmp)
if err != nil {
return 0, errors.Wrap(err, "arrow/ipc: could not write continuation bit indicator")
}

// the returned message size includes the length prefix, the flatbuffer, + padding
n = int(paddedMsgLen)

tmp := make([]byte, 4)

// write the flatbuffer size prefix, including padding
sizeFB := paddedMsgLen - 4
sizeFB := paddedMsgLen - 8
binary.LittleEndian.PutUint32(tmp, uint32(sizeFB))
_, err = w.Write(tmp)
if err != nil {
Expand All @@ -1028,7 +1047,7 @@ func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error)
}

// write any padding
padding := paddedMsgLen - int32(msg.Len()) - 4
padding := paddedMsgLen - int32(msg.Len()) - 8
if padding > 0 {
_, err = w.Write(paddingBytes[:padding])
if err != nil {
Expand Down