Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/main/java/com/increff/commons/sheet/AbstractDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.io.*;
import java.util.*;
import java.util.function.Function;

public abstract class AbstractDataFile<T> implements IDataFile<T> {

Expand All @@ -30,7 +31,7 @@ public abstract class AbstractDataFile<T> implements IDataFile<T> {
private ArrayList<T> data;

/**
* @params This function takes a string 's' as a delimeter.
* @param s This function takes a string 's' as a delimeter.
* By default it is '\t'
* To use any different delimeter, please override this function
*/
Expand Down Expand Up @@ -155,6 +156,14 @@ public void setProgressMonitor(IProgressMonitor progress) {
protected abstract void write(DataRow rec, T t);

protected abstract String[] getHeaders();
@Override
public String[] getPartitioningColumns(){
return new String[0];
}
@Override
public <K> Function<T, K> getPartitioningFunction(){
return null;
}

protected static void validateHeaders(Set<String> headers, Set<String> expectedHeaders) throws SheetException {
Set<String> missingHeaders = new HashSet<>(expectedHeaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.*;
import java.util.function.Function;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -175,7 +176,14 @@ public void setProgressMonitor(IProgressMonitor progress) {
protected abstract T read(GenericRecord rec) throws Exception;
protected abstract void write(GenericRecord rec, T t);
protected abstract Type[] getSchema();

@Override
public String[] getPartitioningColumns(){
return new String[0];
}
@Override
public <K> Function<T, K> getPartitioningFunction(){
return null;
};
protected static void validateHeaders(Schema incomingSchema, Schema expectedSchema) throws SheetException {

Map<String, Schema> fieldInfo1 = extractFieldInfo(incomingSchema);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/increff/commons/sheet/IDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.function.Function;

public interface IDataFile<T> {
public String getFileExtension();
Expand All @@ -29,6 +30,10 @@ public interface IDataFile<T> {

public void read(InputStream is) throws Exception;

public String[] getPartitioningColumns();

public <K> Function<T, K> getPartitioningFunction();

public void setProgressMonitor(IProgressMonitor progress);

}
11 changes: 11 additions & 0 deletions src/main/java/com/increff/commons/sheet/TsvFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.Arrays;
import java.util.Set;
import java.util.function.Function;

/**
* Implementation of the AbstractDataFile to handle .tsv files
Expand All @@ -21,6 +22,16 @@ public class TsvFile extends AbstractDataFile<DataRow> {

private String[] headers = {};

@Override
public String[] getPartitioningColumns() {
return new String[0];
}

@Override
public Function<DataRow, String> getPartitioningFunction() {
return (DataRow r) -> "";
}

@Override
protected DataRow read(DataRow rec) {
DataRow dataRow = new DataRow(rec.getColumns());
Expand Down
12 changes: 12 additions & 0 deletions src/test/java/com/increff/commons/sheet/ChannelStockFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package com.increff.commons.sheet;

import java.util.function.Function;

public class ChannelStockFile extends AbstractDataFile<ChannelStockRow> {

public ChannelStockFile() {
Expand Down Expand Up @@ -39,4 +41,14 @@ protected void write(DataRow r, ChannelStockRow o) {

}

@Override
public String[] getPartitioningColumns() {
return new String[]{"channel", "day"};
}

@Override
public Function<ChannelStockRow, String> getPartitioningFunction() {
return (ChannelStockRow r) -> r.channel + "_" + r.day;
}

}
11 changes: 11 additions & 0 deletions src/test/java/com/increff/commons/sheet/DemoParquetFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.time.LocalDate;
import java.util.Arrays;
import java.util.HashSet;
import java.util.function.Function;

public class DemoParquetFile extends AbstractParquetFile<DemoParquetRow>{

Expand Down Expand Up @@ -64,4 +65,14 @@ protected void write(GenericRecord r, DemoParquetRow o) {
r.put("seasons", String.join("#", o.seasons));
r.put("store_codes", String.join("#", o.store_codes));
}

@Override
public String[] getPartitioningColumns() {
return new String[]{"channel", "day"};
}

@Override
public <K> Function<DemoParquetRow, K> getPartitioningFunction() {
return (DemoParquetRow r) -> (K) (r.channel + "_" + r.day);
}
}