Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,6 @@ public void initialize() throws IOException {
if (footer.getRecordBatches().size() == 0) {
return;
}
// Read and load all dictionaries from schema
for (int i = 0; i < dictionaries.size(); i++) {
ArrowDictionaryBatch dictionaryBatch = readDictionary();
loadDictionary(dictionaryBatch);
}
}

/**
Expand Down Expand Up @@ -164,6 +159,13 @@ public boolean loadNextBatch() throws IOException {
ArrowBlock block = footer.getRecordBatches().get(currentRecordBatch++);
ArrowRecordBatch batch = readRecordBatch(in, block, allocator);
loadRecordBatch(batch);

// Read and load all dictionaries from schema
for (int i = 0; i < dictionaries.size(); i++) {
ArrowDictionaryBatch dictionaryBatch = readDictionary();
loadDictionary(dictionaryBatch);
}

return true;
} else {
return false;
Expand All @@ -185,14 +187,16 @@ public List<ArrowBlock> getRecordBlocks() throws IOException {
}

/**
* Loads record batch for the given block.
* Loads record batch and dictionaries for the given block.
*/
public boolean loadRecordBatch(ArrowBlock block) throws IOException {
ensureInitialized();
int blockIndex = footer.getRecordBatches().indexOf(block);
if (blockIndex == -1) {
throw new IllegalArgumentException("Arrow block does not exist in record batches: " + block);
}

currentDictionaryBatch = blockIndex * dictionaries.size();
currentRecordBatch = blockIndex;
return loadNextBatch();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class ArrowFileWriter extends ArrowWriter {
private final List<ArrowBlock> recordBlocks = new ArrayList<>();

private Map<String, String> metaData;
private boolean dictionariesWritten = false;

public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
super(root, provider, out);
Expand Down Expand Up @@ -129,12 +128,6 @@ protected void endInternal(WriteChannel out) throws IOException {
@Override
protected void ensureDictionariesWritten(DictionaryProvider provider, Set<Long> dictionaryIdsUsed)
throws IOException {
if (dictionariesWritten) {
return;
}
dictionariesWritten = true;
// Write out all dictionaries required.
// Replacement dictionaries are not supported in the IPC file format.
Comment on lines -132 to -137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my clarification:

Referring to Apache Arrow Columnar specification.

Further more, it is invalid to have more than one non-delta dictionary batch per dictionary ID (i.e. dictionary replacement is not supported).

Here the relevant part of the code to provide that functionality has been removed. Does this change that functionality? Does it need a documentation update?

for (long id : dictionaryIdsUsed) {
Dictionary dictionary = provider.lookup(id);
writeDictionaryBatch(dictionary);
Expand Down
Loading