Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> restoreEnumer

@Override
public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
return IcebergSourceSplitSerializer.INSTANCE;
return new IcebergSourceSplitSerializer(scanContext.caseSensitive());
}

@Override
public SimpleVersionedSerializer<IcebergEnumeratorState> getEnumeratorCheckpointSerializer() {
return IcebergEnumeratorStateSerializer.INSTANCE;
return new IcebergEnumeratorStateSerializer(scanContext.caseSensitive());
}

private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@
public class IcebergEnumeratorStateSerializer
implements SimpleVersionedSerializer<IcebergEnumeratorState> {

public static final IcebergEnumeratorStateSerializer INSTANCE =
new IcebergEnumeratorStateSerializer();

private static final int VERSION = 2;

private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));

private final IcebergEnumeratorPositionSerializer positionSerializer =
IcebergEnumeratorPositionSerializer.INSTANCE;
private final IcebergSourceSplitSerializer splitSerializer =
IcebergSourceSplitSerializer.INSTANCE;
private final IcebergSourceSplitSerializer splitSerializer;

public IcebergEnumeratorStateSerializer(boolean caseSensitive) {
this.splitSerializer = new IcebergSourceSplitSerializer(caseSensitive);
}

@Override
public int getVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,28 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.util.InstantiationUtil;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.FileScanTaskParser;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

@Internal
public class IcebergSourceSplit implements SourceSplit, Serializable {
private static final long serialVersionUID = 1L;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));

private final CombinedScanTask task;

Expand Down Expand Up @@ -109,6 +118,7 @@ byte[] serializeV1() throws IOException {
if (serializedBytesCache == null) {
serializedBytesCache = InstantiationUtil.serializeObject(this);
}

return serializedBytesCache;
}

Expand All @@ -120,4 +130,48 @@ static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException {
throw new RuntimeException("Failed to deserialize the split.", e);
}
}

byte[] serializeV2() throws IOException {
if (serializedBytesCache == null) {
DataOutputSerializer out = SERIALIZER_CACHE.get();
Collection<FileScanTask> fileScanTasks = task.tasks();
Preconditions.checkArgument(
fileOffset >= 0 && fileOffset < fileScanTasks.size(),
"Invalid file offset: %s. Should be within the range of [0, %s)",
fileOffset,
fileScanTasks.size());

out.writeInt(fileOffset);
out.writeLong(recordOffset);
out.writeInt(fileScanTasks.size());

for (FileScanTask fileScanTask : fileScanTasks) {
String taskJson = FileScanTaskParser.toJson(fileScanTask);
out.writeUTF(taskJson);
}

serializedBytesCache = out.getCopyOfBuffer();
out.clear();
}

return serializedBytesCache;
}

static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive)
throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
int fileOffset = in.readInt();
long recordOffset = in.readLong();
int taskCount = in.readInt();

List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
for (int i = 0; i < taskCount; ++i) {
String taskJson = in.readUTF();
FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
tasks.add(task);
}

CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks);
return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, fileOffset, recordOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;

/**
* TODO: use Java serialization for now. Will switch to more stable serializer from <a
* href="https://github.com/apache/iceberg/issues/1698">issue-1698</a>.
*/
@Internal
public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer<IcebergSourceSplit> {
public static final IcebergSourceSplitSerializer INSTANCE = new IcebergSourceSplitSerializer();
private static final int VERSION = 1;
private static final int VERSION = 2;

private final boolean caseSensitive;

public IcebergSourceSplitSerializer(boolean caseSensitive) {
this.caseSensitive = caseSensitive;
}

@Override
public int getVersion() {
Expand All @@ -38,14 +39,16 @@ public int getVersion() {

@Override
public byte[] serialize(IcebergSourceSplit split) throws IOException {
return split.serializeV1();
return split.serializeV2();
}

@Override
public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case 1:
return IcebergSourceSplit.deserializeV1(serialized);
case 2:
return IcebergSourceSplit.deserializeV2(serialized, caseSensitive);
default:
throw new IOException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class TestIcebergEnumeratorStateSerializer {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final IcebergEnumeratorStateSerializer serializer =
IcebergEnumeratorStateSerializer.INSTANCE;
new IcebergEnumeratorStateSerializer(true);

protected final int version;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.flink.source.SplitHelpers;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -31,7 +33,7 @@ public class TestIcebergSourceSplitSerializer {

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final IcebergSourceSplitSerializer serializer = IcebergSourceSplitSerializer.INSTANCE;
private final IcebergSourceSplitSerializer serializer = new IcebergSourceSplitSerializer(true);

@Test
public void testLatestVersion() throws Exception {
Expand Down Expand Up @@ -81,6 +83,34 @@ private void serializeAndDeserializeV1(int splitCount, int filesPerSplit) throws
}
}

@Test
public void testV2() throws Exception {
serializeAndDeserializeV2(1, 1);
serializeAndDeserializeV2(10, 2);
}

private void serializeAndDeserializeV2(int splitCount, int filesPerSplit) throws Exception {
final List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(
TEMPORARY_FOLDER, splitCount, filesPerSplit);
for (IcebergSourceSplit split : splits) {
byte[] result = split.serializeV2();
IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV2(result, true);
assertSplitEquals(split, deserialized);
}
}

@Test
public void testDeserializeV1() throws Exception {
final List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);
for (IcebergSourceSplit split : splits) {
byte[] result = split.serializeV1();
IcebergSourceSplit deserialized = serializer.deserialize(1, result);
assertSplitEquals(split, deserialized);
}
}

@Test
public void testCheckpointedPosition() throws Exception {
final AtomicInteger index = new AtomicInteger();
Expand All @@ -90,9 +120,7 @@ public void testCheckpointedPosition() throws Exception {
split -> {
IcebergSourceSplit result;
if (index.get() % 2 == 0) {
result =
IcebergSourceSplit.fromCombinedScanTask(
split.task(), index.get(), index.get());
result = IcebergSourceSplit.fromCombinedScanTask(split.task(), 1, 1);
} else {
result = split;
}
Expand All @@ -115,7 +143,19 @@ public void testCheckpointedPosition() throws Exception {
}

private void assertSplitEquals(IcebergSourceSplit expected, IcebergSourceSplit actual) {
Assert.assertEquals(expected.splitId(), actual.splitId());
List<FileScanTask> expectedTasks = Lists.newArrayList(expected.task().tasks().iterator());
List<FileScanTask> actualTasks = Lists.newArrayList(actual.task().tasks().iterator());
Assert.assertEquals(expectedTasks.size(), actualTasks.size());
for (int i = 0; i < expectedTasks.size(); ++i) {
FileScanTask expectedTask = expectedTasks.get(i);
FileScanTask actualTask = actualTasks.get(i);
Assert.assertEquals(expectedTask.file().path(), actualTask.file().path());
Assert.assertEquals(expectedTask.sizeBytes(), actualTask.sizeBytes());
Assert.assertEquals(expectedTask.filesCount(), actualTask.filesCount());
Assert.assertEquals(expectedTask.start(), actualTask.start());
Assert.assertEquals(expectedTask.length(), actualTask.length());
}

Assert.assertEquals(expected.fileOffset(), actual.fileOffset());
Assert.assertEquals(expected.recordOffset(), actual.recordOffset());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,12 @@ public SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> restoreEnumer

@Override
public SimpleVersionedSerializer<IcebergSourceSplit> getSplitSerializer() {
return IcebergSourceSplitSerializer.INSTANCE;
return new IcebergSourceSplitSerializer(scanContext.caseSensitive());
}

@Override
public SimpleVersionedSerializer<IcebergEnumeratorState> getEnumeratorCheckpointSerializer() {
return IcebergEnumeratorStateSerializer.INSTANCE;
return new IcebergEnumeratorStateSerializer(scanContext.caseSensitive());
}

private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumerator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@
public class IcebergEnumeratorStateSerializer
implements SimpleVersionedSerializer<IcebergEnumeratorState> {

public static final IcebergEnumeratorStateSerializer INSTANCE =
new IcebergEnumeratorStateSerializer();

private static final int VERSION = 2;

private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));

private final IcebergEnumeratorPositionSerializer positionSerializer =
IcebergEnumeratorPositionSerializer.INSTANCE;
private final IcebergSourceSplitSerializer splitSerializer =
IcebergSourceSplitSerializer.INSTANCE;
private final IcebergSourceSplitSerializer splitSerializer;

public IcebergEnumeratorStateSerializer(boolean caseSensitive) {
this.splitSerializer = new IcebergSourceSplitSerializer(caseSensitive);
}

@Override
public int getVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,28 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.util.InstantiationUtil;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.FileScanTaskParser;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

@Internal
public class IcebergSourceSplit implements SourceSplit, Serializable {
private static final long serialVersionUID = 1L;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(1024));

private final CombinedScanTask task;

Expand Down Expand Up @@ -109,6 +118,7 @@ byte[] serializeV1() throws IOException {
if (serializedBytesCache == null) {
serializedBytesCache = InstantiationUtil.serializeObject(this);
}

return serializedBytesCache;
}

Expand All @@ -120,4 +130,48 @@ static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException {
throw new RuntimeException("Failed to deserialize the split.", e);
}
}

byte[] serializeV2() throws IOException {
if (serializedBytesCache == null) {
DataOutputSerializer out = SERIALIZER_CACHE.get();
Collection<FileScanTask> fileScanTasks = task.tasks();
Preconditions.checkArgument(
fileOffset >= 0 && fileOffset < fileScanTasks.size(),
"Invalid file offset: %s. Should be within the range of [0, %s)",
fileOffset,
fileScanTasks.size());

out.writeInt(fileOffset);
out.writeLong(recordOffset);
out.writeInt(fileScanTasks.size());

for (FileScanTask fileScanTask : fileScanTasks) {
String taskJson = FileScanTaskParser.toJson(fileScanTask);
out.writeUTF(taskJson);
}

serializedBytesCache = out.getCopyOfBuffer();
out.clear();
}

return serializedBytesCache;
}

static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive)
throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
int fileOffset = in.readInt();
long recordOffset = in.readLong();
int taskCount = in.readInt();

List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
for (int i = 0; i < taskCount; ++i) {
String taskJson = in.readUTF();
FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
tasks.add(task);
}

CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks);
return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, fileOffset, recordOffset);
}
}
Loading