Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -519,4 +519,23 @@
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>jdk9plus</id>
<activation>
<!-- activated when building with JDK9+ -->
<jdk>!1.8</jdk>
</activation>
<dependencies>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>

</project>
146 changes: 64 additions & 82 deletions src/main/java/org/tikv/common/columnar/TiBlockColumnVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

package org.tikv.common.columnar;

import static org.tikv.common.util.MemoryUtil.EMPTY_BYTE_BUFFER_DIRECT;
import static org.tikv.common.util.MemoryUtil.EMPTY_BYTE_BUFFER;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Objects;
import org.joda.time.LocalDate;
import org.tikv.common.columnar.datatypes.CHType;
import org.tikv.common.types.AbstractDateTimeType;
Expand All @@ -29,36 +32,34 @@
import org.tikv.common.util.MemoryUtil;

public class TiBlockColumnVector extends TiColumnVector {
long offsetsAddr;
ByteBuffer offsets;
long nullMapAddr;
ByteBuffer nullMap;
long dataAddr;
ByteBuffer data;
private int fixedLength;
private final ByteBuffer offsets;
private final ByteBuffer nullMap;
private final ByteBuffer data;
private final int fixedLength;

public TiBlockColumnVector(CHType type, ByteBuffer data, int numOfRows, int fixedLength) {
super(type.toDataType(), numOfRows);
this.data = data;
this.dataAddr = MemoryUtil.getAddress(data);
fillEmptyNullMap();
fillEmptyOffsets();
this.data = Objects.requireNonNull(data);
this.nullMap = null;
this.offsets = null;
this.fixedLength = fixedLength;
}

public TiBlockColumnVector(CHType type) {
super(type.toDataType(), 0);
this.data = EMPTY_BYTE_BUFFER;
this.nullMap = null;
this.offsets = null;
this.fixedLength = -1;
}

public TiBlockColumnVector(
CHType type, ByteBuffer nullMap, ByteBuffer data, int numOfRows, int fixedLength) {
// chType -> data type
super(type.toDataType(), numOfRows);
this.nullMap = nullMap;
this.nullMapAddr = MemoryUtil.getAddress(nullMap);
this.data = data;
this.dataAddr = MemoryUtil.getAddress(data);
fillEmptyOffsets();
this.nullMap = Objects.requireNonNull(nullMap);
this.data = Objects.requireNonNull(data);
this.offsets = null;
this.fixedLength = fixedLength;
}

Expand All @@ -67,68 +68,50 @@ public TiBlockColumnVector(
CHType type, ByteBuffer nullMap, ByteBuffer offsets, ByteBuffer data, int numOfRows) {
// chType -> data type
super(type.toDataType(), numOfRows);
this.offsets = offsets;
this.offsetsAddr = MemoryUtil.getAddress(offsets);
this.nullMap = nullMap;
this.nullMapAddr = MemoryUtil.getAddress(nullMap);
this.data = data;
this.dataAddr = MemoryUtil.getAddress(data);
this.offsets = Objects.requireNonNull(offsets);
this.nullMap = nullMap; // may be null
this.data = Objects.requireNonNull(data);
this.fixedLength = -1;
}

private void fillEmptyNullMap() {
this.nullMap = EMPTY_BYTE_BUFFER_DIRECT;
this.nullMapAddr = MemoryUtil.getAddress(this.nullMap);
}

private void fillEmptyOffsets() {
this.offsets = EMPTY_BYTE_BUFFER_DIRECT;
this.offsetsAddr = MemoryUtil.getAddress(this.offsets);
}

/**
* Cleans up memory for this column vector. The column vector is not usable after this.
*
* <p>This overwrites `AutoCloseable.close` to remove the `throws` clause, as column vector is
* in-memory and we don't expect any exception to happen during closing.
*/
@Override
public void close() {
if (dataAddr != 0) {
MemoryUtil.free(data);
}

if (offsetsAddr != 0) {
MemoryUtil.free(offsets);
}

if (nullMapAddr != 0) {
MemoryUtil.free(nullMap);
}
dataAddr = 0;
offsetsAddr = 0;
nullMapAddr = 0;
}
public void close() {}

/** Returns true if this column vector contains any null values. */
@Override
public boolean hasNull() {
return nullMap == null;
if (nullMap != null) {
byte[] array = nullMap.array();
for (byte b : array) {
if (b != 0) return true;
}
}
return false;
}

/** Returns the number of nulls in this column vector. */
@Override
public int numNulls() {
throw new UnsupportedOperationException("numNulls is not supported for TiBlockColumnVector");
int n = 0;
if (nullMap != null) {
byte[] array = nullMap.array();
for (byte b : array) {
if (b != 0) n++;
}
}
return n;
}

/** Returns whether the value at rowId is NULL. */
@Override
public boolean isNullAt(int rowId) {
if (nullMap == EMPTY_BYTE_BUFFER_DIRECT) {
return false;
}
return MemoryUtil.getByte(nullMapAddr + rowId) != 0;
return nullMap != null && nullMap.get(rowId) != 0;
}

/**
Expand All @@ -146,7 +129,7 @@ public boolean getBoolean(int rowId) {
*/
@Override
public byte getByte(int rowId) {
return MemoryUtil.getByte(dataAddr + rowId);
return data.get(rowId);
}

/**
Expand All @@ -155,7 +138,7 @@ public byte getByte(int rowId) {
*/
@Override
public short getShort(int rowId) {
return MemoryUtil.getShort(dataAddr + (rowId << 1));
return data.getShort(rowId << 1);
}

/**
Expand All @@ -167,11 +150,12 @@ public int getInt(int rowId) {
if (type instanceof DateType) {
return (int) getTime(rowId);
}
return MemoryUtil.getInt(dataAddr + (rowId << 2));
return data.getInt(rowId << 2);
}

// returns microseconds since epoch - fields are interpreted in current default timezone
private long getDateTime(int rowId) {
long v = MemoryUtil.getLong(dataAddr + (rowId << 3));
long v = data.getLong(rowId << 3);
long ymdhms = v >>> 24;
long ymd = ymdhms >>> 17;
int day = (int) (ymd & ((1 << 5) - 1));
Expand All @@ -184,13 +168,15 @@ private long getDateTime(int rowId) {
int minute = (hms >>> 6) & ((1 << 6) - 1);
int hour = hms >>> 12;
int microsec = (int) (v % (1 << 24));
Timestamp ts =
new Timestamp(year - 1900, month - 1, day, hour, minute, second, microsec * 1000);
return ts.getTime() / 1000 * 1000000 + ts.getNanos() / 1000;
ZonedDateTime zdt =
ZonedDateTime.of(
year, month, day, hour, minute, second, microsec * 1000, ZoneId.systemDefault());
Instant instant = zdt.toInstant();
return instant.getEpochSecond() * 1000_000L + instant.getNano() / 1000;
}

private long getTime(int rowId) {
long v = MemoryUtil.getLong(dataAddr + (rowId << 3));
long v = data.getLong(rowId << 3);
long ymd = v >>> 41;
long ym = ymd >>> 5;
int year = (int) (ym / 13);
Expand All @@ -215,7 +201,7 @@ public long getLong(int rowId) {
} else if (fixedLength == 4) {
return getInt(rowId);
} else if (fixedLength == 8) {
return MemoryUtil.getLong(dataAddr + (rowId * fixedLength));
return data.getLong(rowId << 3);
}
throw new UnsupportedOperationException(
String.format("getting long with fixed length %d", fixedLength));
Expand All @@ -227,7 +213,7 @@ public long getLong(int rowId) {
*/
@Override
public float getFloat(int rowId) {
return MemoryUtil.getFloat(dataAddr + (rowId * fixedLength));
return data.getFloat(rowId * fixedLength);
}

/**
Expand All @@ -236,36 +222,32 @@ public float getFloat(int rowId) {
*/
@Override
public double getDouble(int rowId) {
return MemoryUtil.getDouble(dataAddr + (rowId * fixedLength));
return data.getDouble(rowId * fixedLength);
}

/**
* Returns the decimal type value for rowId. If the slot for rowId is null, it should return null.
*/
@Override
public BigDecimal getDecimal(int rowId, int precision, int scale) {
long rowIdAddr = rowId * fixedLength + dataAddr;
if (fixedLength == 4) {
return MemoryUtil.getDecimal32(rowIdAddr, scale);
return MemoryUtil.getDecimal32(data, rowId << 2, scale);
} else if (fixedLength == 8) {
return MemoryUtil.getDecimal64(rowIdAddr, scale);
return MemoryUtil.getDecimal64(data, rowId << 3, scale);
} else if (fixedLength == 16) {
return MemoryUtil.getDecimal128(rowIdAddr, scale);
return MemoryUtil.getDecimal128(data, rowId << 4, scale);
} else {
return MemoryUtil.getDecimal256(rowIdAddr, scale);
return MemoryUtil.getDecimal256(data, rowId * fixedLength, scale);
}
}

private long offsetAt(int i) {
return i == 0 ? 0 : MemoryUtil.getLong(offsetsAddr + ((i - 1) << 3));
return i == 0 ? 0L : offsets.getLong((i - 1) << 3);
}

public int sizeAt(int i) {
return (int)
(i == 0
? MemoryUtil.getLong(offsetsAddr)
: MemoryUtil.getLong(offsetsAddr + (i << 3))
- MemoryUtil.getLong(offsetsAddr + ((i - 1) << 3)));
(i == 0 ? offsets.getLong(0) : offsets.getLong(i << 3) - offsets.getLong((i - 1) << 3));
}

/**
Expand All @@ -278,13 +260,13 @@ public String getUTF8String(int rowId) {
// FixedString case
if (fixedLength != -1) {
byte[] chars = new byte[fixedLength];
MemoryUtil.getBytes((int) (dataAddr + fixedLength * rowId), chars, 0, fixedLength);
data.get(chars, rowId * fixedLength, fixedLength);
return new String(chars);
} else {
long offset = (dataAddr + offsetAt(rowId));
int offset = (int) offsetAt(rowId);
int numBytes = sizeAt(rowId) - 1;
byte[] chars = new byte[numBytes];
MemoryUtil.getBytes(offset, chars, 0, numBytes);
data.get(chars, offset, numBytes);
return new String(chars, StandardCharsets.UTF_8);
}
}
Expand All @@ -295,10 +277,10 @@ public String getUTF8String(int rowId) {
@Override
public byte[] getBinary(int rowId) {
if (type.equals(BytesType.BLOB) || type.equals(BytesType.TINY_BLOB)) {
long offset = (dataAddr + offsetAt(rowId));
int offset = (int) offsetAt(rowId);
int numBytes = sizeAt(rowId) - 1;
byte[] ret = new byte[numBytes];
MemoryUtil.getBytes(offset, ret, 0, numBytes);
data.get(ret, offset, numBytes);
return ret;
} else {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import org.tikv.common.util.MemoryUtil;

public class AutoGrowByteBuffer {
private final ByteBuffer initBuf;
private ByteBuffer buf;

public AutoGrowByteBuffer(ByteBuffer initBuf) {
initBuf.clear();
this.initBuf = initBuf;
this.buf = initBuf;
}

Expand All @@ -39,22 +37,12 @@ public ByteBuffer getByteBuffer() {

private void beforeIncrease(int inc) {
int minCap = buf.position() + inc;
if (minCap > buf.capacity()) {
int newCap = buf.capacity();
int newCap = buf.capacity();
if (minCap > newCap) {
do {
newCap = newCap << 1;
} while (minCap > newCap);

ByteBuffer newBuf = MemoryUtil.allocateDirect(newCap);
MemoryUtil.copyMemory(
MemoryUtil.getAddress(buf), MemoryUtil.getAddress(newBuf), buf.position());
newBuf.position(buf.position());

if (buf != initBuf) {
MemoryUtil.free(buf);
}

buf = newBuf;
buf = MemoryUtil.copyOf(buf, newCap);
}
}

Expand Down
Loading