diff --git a/src/main/java/com/increff/commons/sheet/AbstractDataFile.java b/src/main/java/com/increff/commons/sheet/AbstractDataFile.java index 05987ad..aea2efc 100644 --- a/src/main/java/com/increff/commons/sheet/AbstractDataFile.java +++ b/src/main/java/com/increff/commons/sheet/AbstractDataFile.java @@ -13,6 +13,7 @@ import java.io.*; import java.util.*; +import java.util.function.Function; public abstract class AbstractDataFile implements IDataFile { @@ -30,7 +31,7 @@ public abstract class AbstractDataFile implements IDataFile { private ArrayList data; /** - * @params This function takes a string 's' as a delimeter. + * @param s This function takes a string 's' as a delimeter. * By default it is '\t' * To use any different delimeter, please override this function */ @@ -155,6 +156,14 @@ public void setProgressMonitor(IProgressMonitor progress) { protected abstract void write(DataRow rec, T t); protected abstract String[] getHeaders(); + @Override + public String[] getPartitioningColumns(){ + return new String[0]; + } + @Override + public Function getPartitioningFunction(){ + return null; + } protected static void validateHeaders(Set headers, Set expectedHeaders) throws SheetException { Set missingHeaders = new HashSet<>(expectedHeaders); diff --git a/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java b/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java index fff240f..b8d64e9 100644 --- a/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java +++ b/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java @@ -6,6 +6,7 @@ import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.util.*; +import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -175,7 +176,14 @@ public void setProgressMonitor(IProgressMonitor progress) { protected abstract T read(GenericRecord rec) throws Exception; protected abstract void write(GenericRecord rec, T t); protected abstract Type[] getSchema(); - + @Override + public String[] getPartitioningColumns(){ + return new String[0]; + } + @Override + public Function getPartitioningFunction(){ + return null; + }; protected static void validateHeaders(Schema incomingSchema, Schema expectedSchema) throws SheetException { Map fieldInfo1 = extractFieldInfo(incomingSchema); diff --git a/src/main/java/com/increff/commons/sheet/IDataFile.java b/src/main/java/com/increff/commons/sheet/IDataFile.java index e3c1b8c..9d208f7 100644 --- a/src/main/java/com/increff/commons/sheet/IDataFile.java +++ b/src/main/java/com/increff/commons/sheet/IDataFile.java @@ -15,6 +15,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.function.Function; public interface IDataFile { public String getFileExtension(); @@ -29,6 +30,10 @@ public interface IDataFile { public void read(InputStream is) throws Exception; + public String[] getPartitioningColumns(); + + public Function getPartitioningFunction(); + public void setProgressMonitor(IProgressMonitor progress); } diff --git a/src/main/java/com/increff/commons/sheet/TsvFile.java b/src/main/java/com/increff/commons/sheet/TsvFile.java index 33af90f..fa5f04f 100644 --- a/src/main/java/com/increff/commons/sheet/TsvFile.java +++ b/src/main/java/com/increff/commons/sheet/TsvFile.java @@ -13,6 +13,7 @@ import java.util.Arrays; import java.util.Set; +import java.util.function.Function; /** * Implementation of the AbstractDataFile to handle .tsv files @@ -21,6 +22,16 @@ public class TsvFile extends AbstractDataFile { private String[] headers = {}; + @Override + public String[] getPartitioningColumns() { + return new String[0]; + } + + @Override + public Function getPartitioningFunction() { + return (DataRow r) -> ""; + } + @Override protected DataRow read(DataRow rec) { DataRow dataRow = new DataRow(rec.getColumns()); diff --git a/src/test/java/com/increff/commons/sheet/ChannelStockFile.java b/src/test/java/com/increff/commons/sheet/ChannelStockFile.java index 6f875a8..1238bee 100644 --- a/src/test/java/com/increff/commons/sheet/ChannelStockFile.java +++ b/src/test/java/com/increff/commons/sheet/ChannelStockFile.java @@ -11,6 +11,8 @@ package com.increff.commons.sheet; +import java.util.function.Function; + public class ChannelStockFile extends AbstractDataFile { public ChannelStockFile() { @@ -39,4 +41,14 @@ protected void write(DataRow r, ChannelStockRow o) { } + @Override + public String[] getPartitioningColumns() { + return new String[]{"channel", "day"}; + } + + @Override + public Function getPartitioningFunction() { + return (ChannelStockRow r) -> r.channel + "_" + r.day; + } + } \ No newline at end of file diff --git a/src/test/java/com/increff/commons/sheet/DemoParquetFile.java b/src/test/java/com/increff/commons/sheet/DemoParquetFile.java index ea72272..cbc1c57 100644 --- a/src/test/java/com/increff/commons/sheet/DemoParquetFile.java +++ b/src/test/java/com/increff/commons/sheet/DemoParquetFile.java @@ -10,6 +10,7 @@ import java.time.LocalDate; import java.util.Arrays; import java.util.HashSet; +import java.util.function.Function; public class DemoParquetFile extends AbstractParquetFile{ @@ -64,4 +65,14 @@ protected void write(GenericRecord r, DemoParquetRow o) { r.put("seasons", String.join("#", o.seasons)); r.put("store_codes", String.join("#", o.store_codes)); } + + @Override + public String[] getPartitioningColumns() { + return new String[]{"channel", "day"}; + } + + @Override + public Function getPartitioningFunction() { + return (DemoParquetRow r) -> (K) (r.channel + "_" + r.day); + } }