From 58409076761f78235f4cc678f28a765b17c1ac0a Mon Sep 17 00:00:00 2001 From: "vrutik.rabadia" Date: Wed, 21 Feb 2024 17:19:57 +0530 Subject: [PATCH 1/3] adds partitioning column and group by function --- pom.xml | 28 +++++++++---------- .../commons/sheet/AbstractDataFile.java | 7 ++++- .../commons/sheet/AbstractParquetFile.java | 6 +++- .../com/increff/commons/sheet/IDataFile.java | 5 ++++ .../com/increff/commons/sheet/TsvFile.java | 11 ++++++++ .../commons/sheet/ChannelStockFile.java | 12 ++++++++ .../commons/sheet/DemoParquetFile.java | 11 ++++++++ 7 files changed, 64 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index 7f72d07..86becd4 100644 --- a/pom.xml +++ b/pom.xml @@ -149,20 +149,20 @@ - - org.apache.maven.plugins - maven-gpg-plugin - 1.5 - - - sign-artifacts - verify - - sign - - - - + + + + + + + + + + + + + + org.sonatype.plugins nexus-staging-maven-plugin diff --git a/src/main/java/com/increff/commons/sheet/AbstractDataFile.java b/src/main/java/com/increff/commons/sheet/AbstractDataFile.java index 05987ad..7affde3 100644 --- a/src/main/java/com/increff/commons/sheet/AbstractDataFile.java +++ b/src/main/java/com/increff/commons/sheet/AbstractDataFile.java @@ -13,6 +13,7 @@ import java.io.*; import java.util.*; +import java.util.function.Function; public abstract class AbstractDataFile implements IDataFile { @@ -30,7 +31,7 @@ public abstract class AbstractDataFile implements IDataFile { private ArrayList data; /** - * @params This function takes a string 's' as a delimeter. + * @param s This function takes a string 's' as a delimeter. * By default it is '\t' * To use any different delimeter, please override this function */ @@ -155,6 +156,10 @@ public void setProgressMonitor(IProgressMonitor progress) { protected abstract void write(DataRow rec, T t); protected abstract String[] getHeaders(); + @Override + public abstract String[] getPartitioningColumns(); + @Override + public abstract Function getPartitioningFunction(); protected static void validateHeaders(Set headers, Set expectedHeaders) throws SheetException { Set missingHeaders = new HashSet<>(expectedHeaders); diff --git a/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java b/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java index fff240f..2347eda 100644 --- a/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java +++ b/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java @@ -6,6 +6,7 @@ import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.util.*; +import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -175,7 +176,10 @@ public void setProgressMonitor(IProgressMonitor progress) { protected abstract T read(GenericRecord rec) throws Exception; protected abstract void write(GenericRecord rec, T t); protected abstract Type[] getSchema(); - + @Override + public abstract String[] getPartitioningColumns(); + @Override + public abstract Function getPartitioningFunction(); protected static void validateHeaders(Schema incomingSchema, Schema expectedSchema) throws SheetException { Map fieldInfo1 = extractFieldInfo(incomingSchema); diff --git a/src/main/java/com/increff/commons/sheet/IDataFile.java b/src/main/java/com/increff/commons/sheet/IDataFile.java index e3c1b8c..9d208f7 100644 --- a/src/main/java/com/increff/commons/sheet/IDataFile.java +++ b/src/main/java/com/increff/commons/sheet/IDataFile.java @@ -15,6 +15,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.function.Function; public interface IDataFile { public String getFileExtension(); @@ -29,6 +30,10 @@ public interface IDataFile { public void read(InputStream is) throws Exception; + public String[] getPartitioningColumns(); + + public Function getPartitioningFunction(); + public void setProgressMonitor(IProgressMonitor progress); } diff --git a/src/main/java/com/increff/commons/sheet/TsvFile.java b/src/main/java/com/increff/commons/sheet/TsvFile.java index 33af90f..fa5f04f 100644 --- a/src/main/java/com/increff/commons/sheet/TsvFile.java +++ b/src/main/java/com/increff/commons/sheet/TsvFile.java @@ -13,6 +13,7 @@ import java.util.Arrays; import java.util.Set; +import java.util.function.Function; /** * Implementation of the AbstractDataFile to handle .tsv files @@ -21,6 +22,16 @@ public class TsvFile extends AbstractDataFile { private String[] headers = {}; + @Override + public String[] getPartitioningColumns() { + return new String[0]; + } + + @Override + public Function getPartitioningFunction() { + return (DataRow r) -> ""; + } + @Override protected DataRow read(DataRow rec) { DataRow dataRow = new DataRow(rec.getColumns()); diff --git a/src/test/java/com/increff/commons/sheet/ChannelStockFile.java b/src/test/java/com/increff/commons/sheet/ChannelStockFile.java index 6f875a8..1238bee 100644 --- a/src/test/java/com/increff/commons/sheet/ChannelStockFile.java +++ b/src/test/java/com/increff/commons/sheet/ChannelStockFile.java @@ -11,6 +11,8 @@ package com.increff.commons.sheet; +import java.util.function.Function; + public class ChannelStockFile extends AbstractDataFile { public ChannelStockFile() { @@ -39,4 +41,14 @@ protected void write(DataRow r, ChannelStockRow o) { } + @Override + public String[] getPartitioningColumns() { + return new String[]{"channel", "day"}; + } + + @Override + public Function getPartitioningFunction() { + return (ChannelStockRow r) -> r.channel + "_" + r.day; + } + } \ No newline at end of file diff --git a/src/test/java/com/increff/commons/sheet/DemoParquetFile.java b/src/test/java/com/increff/commons/sheet/DemoParquetFile.java index ea72272..cbc1c57 100644 --- a/src/test/java/com/increff/commons/sheet/DemoParquetFile.java +++ b/src/test/java/com/increff/commons/sheet/DemoParquetFile.java @@ -10,6 +10,7 @@ import java.time.LocalDate; import java.util.Arrays; import java.util.HashSet; +import java.util.function.Function; public class DemoParquetFile extends AbstractParquetFile{ @@ -64,4 +65,14 @@ protected void write(GenericRecord r, DemoParquetRow o) { r.put("seasons", String.join("#", o.seasons)); r.put("store_codes", String.join("#", o.store_codes)); } + + @Override + public String[] getPartitioningColumns() { + return new String[]{"channel", "day"}; + } + + @Override + public Function getPartitioningFunction() { + return (DemoParquetRow r) -> (K) (r.channel + "_" + r.day); + } } From 64af6b03fd4fde4ad1d0093b12c550ea318cf577 Mon Sep 17 00:00:00 2001 From: "vrutik.rabadia" Date: Tue, 27 Feb 2024 15:12:04 +0530 Subject: [PATCH 2/3] revert pom changes --- pom.xml | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index 86becd4..7f72d07 100644 --- a/pom.xml +++ b/pom.xml @@ -149,20 +149,20 @@ - - - - - - - - - - - - - - + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + + sign-artifacts + verify + + sign + + + + org.sonatype.plugins nexus-staging-maven-plugin From 091a5abdf97ff1ba659c64ac13eed57599b2e6ff Mon Sep 17 00:00:00 2001 From: "vrutik.rabadia" Date: Thu, 14 Mar 2024 10:57:11 +0530 Subject: [PATCH 3/3] default functions for partitioning --- .../java/com/increff/commons/sheet/AbstractDataFile.java | 8 ++++++-- .../com/increff/commons/sheet/AbstractParquetFile.java | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/increff/commons/sheet/AbstractDataFile.java b/src/main/java/com/increff/commons/sheet/AbstractDataFile.java index 7affde3..aea2efc 100644 --- a/src/main/java/com/increff/commons/sheet/AbstractDataFile.java +++ b/src/main/java/com/increff/commons/sheet/AbstractDataFile.java @@ -157,9 +157,13 @@ public void setProgressMonitor(IProgressMonitor progress) { protected abstract String[] getHeaders(); @Override - public abstract String[] getPartitioningColumns(); + public String[] getPartitioningColumns(){ + return new String[0]; + } @Override - public abstract Function getPartitioningFunction(); + public Function getPartitioningFunction(){ + return null; + } protected static void validateHeaders(Set headers, Set expectedHeaders) throws SheetException { Set missingHeaders = new HashSet<>(expectedHeaders); diff --git a/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java b/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java index 2347eda..b8d64e9 100644 --- a/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java +++ b/src/main/java/com/increff/commons/sheet/AbstractParquetFile.java @@ -177,9 +177,13 @@ public void setProgressMonitor(IProgressMonitor progress) { protected abstract void write(GenericRecord rec, T t); protected abstract Type[] getSchema(); @Override - public abstract String[] getPartitioningColumns(); + public String[] getPartitioningColumns(){ + return new String[0]; + } @Override - public abstract Function getPartitioningFunction(); + public Function getPartitioningFunction(){ + return null; + }; protected static void validateHeaders(Schema incomingSchema, Schema expectedSchema) throws SheetException { Map fieldInfo1 = extractFieldInfo(incomingSchema);