Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/parquet/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
// go get -u github.com/apache/arrow/go/parquet
//
// In addition, two cli utilities are provided:
// go install github.factset.com/mtopol/parquet-go/cmd/parquet_reader
// go install github.factset.com/mtopol/parquet-go/cmd/parquet_schema
// go install github.com/apache/arrow/go/parquet/cmd/parquet_reader
// go install github.com/apache/arrow/go/parquet/cmd/parquet_schema
//
// Modules
//
Expand Down
6 changes: 4 additions & 2 deletions go/parquet/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ module github.com/apache/arrow/go/parquet
go 1.15

require (
github.com/JohnCGriffin/overflow v0.0.0-20170615021017-4d914c927216
github.com/andybalholm/brotli v1.0.1
github.com/apache/arrow/go/arrow v0.0.0-20210310173904-5de02e3697aa
github.com/apache/arrow/go/arrow v0.0.0-20210520144409-d07f30ada677
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4
github.com/golang/snappy v0.0.3
github.com/klauspost/asmfmt v1.2.3
github.com/klauspost/compress v1.11.12
github.com/klauspost/compress v1.12.2
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3
github.com/stretchr/testify v1.7.0
github.com/zeebo/xxh3 v0.10.0
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
Expand Down
18 changes: 13 additions & 5 deletions go/parquet/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/JohnCGriffin/overflow v0.0.0-20170615021017-4d914c927216 h1:2ZboyJ8vl75fGesnG9NpMTD2DyQI3FzMXy4x752rGF0=
github.com/JohnCGriffin/overflow v0.0.0-20170615021017-4d914c927216/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc=
github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y=
github.com/apache/arrow/go/arrow v0.0.0-20210310173904-5de02e3697aa h1:0Bhiab9ep1wmbD1Lm17uqPkzgYhcBIZf1CsvrMhFMGI=
github.com/apache/arrow/go/arrow v0.0.0-20210310173904-5de02e3697aa/go.mod h1:c9sxoIT3YgLxH4UhLOCKaBlEojuMhVYpk4Ntv3opUTQ=
github.com/apache/arrow/go/arrow v0.0.0-20210520144409-d07f30ada677 h1:F7HiqIf4aBsF4YUBcLolXZ8duSEideNnZnr3lBGa2sA=
github.com/apache/arrow/go/arrow v0.0.0-20210520144409-d07f30ada677/go.mod h1:R4hW3Ug0s+n4CUsWHKOj00Pu01ZqU4x/hSF5kXUcXKQ=
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4 h1:orNYqmQGnSjgOauLWjHEp9/qIDT98xv/0Aa4Zet3/Y8=
github.com/apache/thrift/lib/go/thrift v0.0.0-20210120171102-e27e82c46ba4/go.mod h1:V/LzksIyqd3KZuQ2SunvReTG/UkArhII1dAWY5U1sCE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -42,22 +44,28 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/klauspost/asmfmt v1.2.3 h1:qEM7SLDo6DXXXz5yTpqUoxhsrtwH30nNR2riO2ZjznY=
github.com/klauspost/asmfmt v1.2.3/go.mod h1:RAoUvqkWr2rUa2I19qKMEVZQe4BVtcHGTMCUOcCU2Lg=
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8=
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/pierrec/lz4/v4 v4.1.4 h1:PjkB+qEooc9nw4F6Pxe/e0xaRdWz3suItXWxWqAO1QE=
github.com/pierrec/lz4/v4 v4.1.4/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.0/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/zeebo/xxh3 v0.10.0 h1:1+2Mov9zfxTNUeoDG9k9i13VfxTR0p1JQu8L0vikxB0=
github.com/zeebo/xxh3 v0.10.0/go.mod h1:AQY73TOrhF3jNsdiM9zZOb8MThrYbZONHj7ryDBaLpg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down Expand Up @@ -97,6 +105,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200727154430-2d971f7391a4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY=
golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -135,7 +144,6 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200910201057-6591123024b3/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
101 changes: 101 additions & 0 deletions go/parquet/internal/encoding/boolean_decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encoding

import (
"github.com/apache/arrow/go/arrow/bitutil"
"github.com/apache/arrow/go/parquet"
"github.com/apache/arrow/go/parquet/internal/utils"
"golang.org/x/xerrors"
)

// PlainBooleanDecoder is for the Plain Encoding type, there is no
// dictionary decoding for bools.
type PlainBooleanDecoder struct {
decoder

bitOffset int
}

// Type for the PlainBooleanDecoder is parquet.Types.Boolean
func (PlainBooleanDecoder) Type() parquet.Type {
return parquet.Types.Boolean
}

// Decode fills out with bools decoded from the data at the current point
// or until we reach the end of the data.
//
// Returns the number of values decoded
func (dec *PlainBooleanDecoder) Decode(out []bool) (int, error) {
max := utils.MinInt(len(out), dec.nvals)

unalignedExtract := func(start, end, curBitOffset int) int {
i := start
for ; curBitOffset < end; i, curBitOffset = i+1, curBitOffset+1 {
out[i] = (dec.data[0] & byte(1<<curBitOffset)) != 0
}
return i // return the number of bits we extracted
}

// if we aren't at a byte boundary, then get bools until we hit
// a byte boundary with the bit offset.
i := 0
if dec.bitOffset != 0 {
i = unalignedExtract(0, 8, dec.bitOffset)
dec.bitOffset = 0
}

// determine the number of full bytes worth of bits we can decode
// given the number of values we want to decode.
bitsRemain := max - i
batch := bitsRemain / 8 * 8
if batch > 0 { // only go in here if there's at least one full byte to decode
if i > 0 { // skip our data forward if we decoded anything above
dec.data = dec.data[1:]
out = out[i:]
}
// determine the number of aligned bytes we can grab using SIMD optimized
// functions to improve performance.
alignedBytes := bitutil.BytesForBits(int64(batch))
utils.BytesToBools(dec.data[:alignedBytes], out)
dec.data = dec.data[alignedBytes:]
out = out[alignedBytes*8:]
}

// grab any trailing bits now that we've got our aligned bytes.
dec.bitOffset += unalignedExtract(dec.bitOffset, bitsRemain-batch, dec.bitOffset)

dec.nvals -= max
return max, nil
}

// DecodeSpaced is like Decode except it expands the values to leave spaces for null
// as determined by the validBits bitmap.
func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount > 0 {
toRead := len(out) - nullCount
valuesRead, err := dec.Decode(out[:toRead])
if err != nil {
return 0, err
}
if valuesRead != toRead {
return valuesRead, xerrors.New("parquet: boolean decoder: number of values / definition levels read did not match")
}
return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}
return dec.Decode(out)
}
84 changes: 84 additions & 0 deletions go/parquet/internal/encoding/boolean_encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encoding

import (
"github.com/apache/arrow/go/arrow/bitutil"
"github.com/apache/arrow/go/parquet"
"github.com/apache/arrow/go/parquet/internal/utils"
)

const (
boolBufSize = 1024
boolsInBuf = boolBufSize * 8
)

// PlainBooleanEncoder encodes bools as a bitmap as per the Plain Encoding
type PlainBooleanEncoder struct {
encoder
bitsBuffer []byte
wr utils.BitmapWriter
}

// Type for the PlainBooleanEncoder is parquet.Types.Boolean
func (PlainBooleanEncoder) Type() parquet.Type {
return parquet.Types.Boolean
}

// Put encodes the contents of in into the underlying data buffer.
func (enc *PlainBooleanEncoder) Put(in []bool) {
if enc.bitsBuffer == nil {
enc.bitsBuffer = make([]byte, boolBufSize)
}
if enc.wr == nil {
enc.wr = utils.NewBitmapWriter(enc.bitsBuffer, 0, boolsInBuf)
}

n := enc.wr.AppendBools(in)
for n < len(in) {
enc.wr.Finish()
enc.append(enc.bitsBuffer)
enc.wr.Reset(0, boolsInBuf)
in = in[n:]
n = enc.wr.AppendBools(in)
}
}

// PutSpaced will use the validBits bitmap to determine which values are nulls
// and can be left out from the slice, and the encoded without those nulls.
func (enc *PlainBooleanEncoder) PutSpaced(in []bool, validBits []byte, validBitsOffset int64) {
bufferOut := make([]bool, len(in))
nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
enc.Put(bufferOut[:nvalid])
}

// EstimatedDataEncodedSize returns the current number of bytes that have
// been buffered so far
func (enc *PlainBooleanEncoder) EstimatedDataEncodedSize() int64 {
return int64(enc.sink.Len() + int(bitutil.BytesForBits(enc.wr.Pos())))
}

// FlushValues returns the buffered data, the responsibility is on the caller
// to release the buffer memory
func (enc *PlainBooleanEncoder) FlushValues() Buffer {
if enc.wr.Pos() > 0 {
toFlush := int(enc.wr.Pos())
enc.append(enc.bitsBuffer[:bitutil.BytesForBits(int64(toFlush))])
}

return enc.sink.Finish()
}
88 changes: 88 additions & 0 deletions go/parquet/internal/encoding/byte_array_decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encoding

import (
"encoding/binary"

"github.com/apache/arrow/go/parquet"
"github.com/apache/arrow/go/parquet/internal/utils"
"golang.org/x/xerrors"
)

// PlainByteArrayDecoder decodes a data chunk for bytearrays according to
// the plain encoding. The byte arrays will use slices to reference the
// data rather than copying it.
//
// The parquet spec defines Plain encoding for ByteArrays as a 4 byte little
// endian integer containing the length of the bytearray followed by that many
// bytes being the raw data of the byte array.
type PlainByteArrayDecoder struct {
decoder
}

// Type returns parquet.Types.ByteArray for this decoder
func (PlainByteArrayDecoder) Type() parquet.Type {
return parquet.Types.ByteArray
}

// Decode will populate the slice of bytearrays in full or until the number
// of values is consumed.
//
// Returns the number of values that were decoded.
func (pbad *PlainByteArrayDecoder) Decode(out []parquet.ByteArray) (int, error) {
max := utils.MinInt(len(out), pbad.nvals)

for i := 0; i < max; i++ {
// there should always be at least four bytes which is the length of the
// next value in the data.
if len(pbad.data) < 4 {
return i, xerrors.New("parquet: eof reading bytearray")
}

// the first 4 bytes are a little endian int32 length
byteLen := int32(binary.LittleEndian.Uint32(pbad.data[:4]))
if byteLen < 0 {
return i, xerrors.New("parquet: invalid BYTE_ARRAY value")
}

if int64(len(pbad.data)) < int64(byteLen)+4 {
return i, xerrors.New("parquet: eof reading bytearray")
}

out[i] = pbad.data[4 : byteLen+4 : byteLen+4]
pbad.data = pbad.data[byteLen+4:]
}

pbad.nvals -= max
return max, nil
}

// DecodeSpaced is like Decode, but expands the slice out to leave empty values
// where the validBits bitmap has 0s
func (pbad *PlainByteArrayDecoder) DecodeSpaced(out []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more thought, I'm not sure how you've organized the remaining code, but spacing values might more naturally fit with Arrow (not critical, and might not make sense)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the most part all of the code that interacts with Arrow arrays is isolated to a single package that i haven't put up yet, which utilizes decoders by being able to call the DecodeSpaced functions in order to easily populate arrow array bytes.

toRead := len(out) - nullCount
valuesRead, err := pbad.Decode(out[:toRead])
if err != nil {
return valuesRead, err
}
if valuesRead != toRead {
return valuesRead, xerrors.New("parquet: number of values / definition levels read did not match")
}

return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}
Loading