Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
@Override
public RecordReader<Void, RowDataContainer> getRecordReader(
InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
FileStoreTable table = createFileStoreTable(jobConf);
PaimonInputSplit split = (PaimonInputSplit) inputSplit;
return createRecordReader(table, split, jobConf);
return createRecordReader(split, jobConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataOutputSerializer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.InstantiationUtil;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
Expand All @@ -41,12 +43,15 @@ public class PaimonInputSplit extends FileSplit {
private String path;
private DataSplit split;

private FileStoreTable table;

// public no-argument constructor for deserialization
public PaimonInputSplit() {}

public PaimonInputSplit(String path, DataSplit split) {
public PaimonInputSplit(String path, DataSplit split, FileStoreTable table) {
this.path = path;
this.split = split;
this.table = table;
}

public DataSplit split() {
Expand All @@ -73,13 +78,28 @@ public String[] getLocations() {
return ANYWHERE;
}

public FileStoreTable getTable() {
return table;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(path);
DataOutputSerializer out = new DataOutputSerializer(128);
split.serialize(out);
dataOutput.writeInt(out.length());
dataOutput.write(out.getCopyOfBuffer());
writeFileStoreTable(dataOutput);
}

private void writeFileStoreTable(DataOutput dataOutput) throws IOException {
if (table == null) {
dataOutput.writeInt(0);
} else {
byte[] bytes = InstantiationUtil.serializeObject(table);
dataOutput.writeInt(bytes.length);
dataOutput.write(bytes);
}
}

@Override
Expand All @@ -89,6 +109,22 @@ public void readFields(DataInput dataInput) throws IOException {
byte[] bytes = new byte[length];
dataInput.readFully(bytes);
split = DataSplit.deserialize(new DataInputDeserializer(bytes));
readFileStoreTable(dataInput);
}

private void readFileStoreTable(DataInput dataInput) throws IOException {
int length = dataInput.readInt();
if (length > 0) {
byte[] bytes = new byte[length];
dataInput.readFully(bytes);
try {
table =
InstantiationUtil.deserializeObject(
bytes, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}

@Override
Expand All @@ -105,11 +141,13 @@ public boolean equals(Object o) {
return false;
}
PaimonInputSplit that = (PaimonInputSplit) o;
return Objects.equals(path, that.path) && Objects.equals(split, that.split);
return Objects.equals(path, that.path)
&& Objects.equals(split, that.split)
&& Objects.equals(table, that.table);
}

@Override
public int hashCode() {
return Objects.hash(path, split);
return Objects.hash(path, split, table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public float getProgress() throws IOException {
}

public static RecordReader<Void, RowDataContainer> createRecordReader(
FileStoreTable table, PaimonInputSplit split, JobConf jobConf) throws IOException {
PaimonInputSplit split, JobConf jobConf) throws IOException {
FileStoreTable table = split.getTable();
ReadBuilder readBuilder = table.newReadBuilder();
createPredicate(table.schema(), jobConf, true).ifPresent(readBuilder::withFilter);
List<String> paimonColumns = table.schema().fieldNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ public static InputSplit[] generateSplits(FileStoreTable table, JobConf jobConf)
scan.plan()
.splits()
.forEach(
split -> splits.add(new PaimonInputSplit(location, (DataSplit) split)));
split ->
splits.add(
new PaimonInputSplit(
location, (DataSplit) split, table)));
}
return splits.toArray(new InputSplit[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@
package org.apache.paimon.hive.mapred;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileTestDataGenerator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -29,8 +41,11 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -62,8 +77,12 @@ public void testWriteAndRead() throws Exception {
.map(d -> d.meta)
.collect(Collectors.toList()))
.build();
PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), dataSplit);
PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), dataSplit, null);

assertPaimonInputSplitSerialization(split);
}

private void assertPaimonInputSplitSerialization(PaimonInputSplit split) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
split.write(output);
Expand All @@ -75,4 +94,37 @@ public void testWriteAndRead() throws Exception {
actual.readFields(input);
assertThat(actual).isEqualTo(split);
}

@Test
public void testWriteAndReadWithTable() throws Exception {
Path path = new Path(tempDir.toString());
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), path);
schemaManager.createTable(
new Schema(
RowType.of(VarCharType.STRING_TYPE).getFields(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap(),
""));

FileStoreTable fileStoreTable = FileStoreTableFactory.create(LocalFileIO.create(), path);
writeData(fileStoreTable);

DataSplit split = (DataSplit) fileStoreTable.newScan().plan().splits().get(0);

PaimonInputSplit paimonInputSplit =
new PaimonInputSplit(path.toString(), split, fileStoreTable);

assertPaimonInputSplitSerialization(paimonInputSplit);
}

private void writeData(FileStoreTable fileStoreTable) throws Exception {
String commitUser = UUID.randomUUID().toString();
TableWriteImpl<?> tableWrite = fileStoreTable.newWrite(commitUser);
tableWrite.write(GenericRow.of(BinaryString.fromString("1111")));
TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
commit.commit(0, tableWrite.prepareCommit(true, 0));
tableWrite.close();
commit.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private PaimonRecordReader read(
List<String> originalColumns = ((FileStoreTable) table).schema().fieldNames();
return new PaimonRecordReader(
table.newReadBuilder(),
new PaimonInputSplit(tempDir.toString(), dataSplit),
new PaimonInputSplit(tempDir.toString(), dataSplit, (FileStoreTable) table),
originalColumns,
originalColumns,
selectedColumns,
Expand Down