From e3a247cb449854a602223ec1509203f3b65d2e92 Mon Sep 17 00:00:00 2001 From: Raghav Aggarwal Date: Thu, 1 Feb 2024 22:07:49 +0530 Subject: [PATCH 1/3] TEZ-4540: Reading proto data more than 2GB from multiple splits fails --- .../tez/dag/history/logging/proto/ProtoMessageWritable.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java index df5743660c..63a1ebda08 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java @@ -98,5 +98,6 @@ public void readFields(DataInput in) throws IOException { } din.in = in; message = cin.readMessage(parser, ExtensionRegistry.getEmptyRegistry()); + cin.resetSizeCounter(); } } From c552f237a4a4077665a3953238748796a00063be Mon Sep 17 00:00:00 2001 From: Raghav Aggarwal Date: Thu, 25 Apr 2024 21:33:58 +0530 Subject: [PATCH 2/3] Unit test --- .../proto/TestProtoHistoryLoggingService.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java index fd3154d904..da9bf7d2bf 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java @@ -23,12 +23,14 @@ import java.io.EOFException; import java.io.IOException; +import java.lang.reflect.Field; import java.time.LocalDate; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import com.google.protobuf.CodedInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -135,6 +137,51 @@ public void testService() throws Exception { scanner.close(); } + @Test + public void testProtoMessageSizeReset() throws Exception { + // This test case is to confirm that cin.resetSizeCounter() was indeed called + ProtoHistoryLoggingService service = createService(false); + service.start(); + TezDAGID dagId = TezDAGID.getInstance(appId, 0); + List protos = new ArrayList<>(); + for (DAGHistoryEvent event : makeHistoryEvents(dagId, service)) { + protos.add(new HistoryEventProtoConverter().convert(event.getHistoryEvent())); + service.handle(event); + } + service.stop(); + + TezProtoLoggers loggers = new TezProtoLoggers(); + Assert.assertTrue(loggers.setup(service.getConfig(), clock)); + + // Verify dag events are logged. + DatePartitionedLogger dagLogger = loggers.getDagEventsLogger(); + Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId + "_1"); + try (ProtoMessageReader reader = dagLogger.getReader(dagFilePath)) { + assertEventsRead(reader, protos, 1, protos.size()); + + int totalBytesRead = getTotalBytesRead(reader); + // cin.resetSizeCounter() in ProtoMessageWritable.java ensures that + // totalBytesRead will always be 0. For reference read javadoc of CodedInputStream. + Assert.assertEquals(totalBytesRead, 0); + } + } + + private static int getTotalBytesRead(ProtoMessageReader reader) throws NoSuchFieldException, IllegalAccessException { + // writable is a private field in ProtoMessageReader.java + Field f = reader.getClass().getDeclaredField("writable"); + f.setAccessible(true); + ProtoMessageWritable writable = (ProtoMessageWritable) f.get(reader); + + // cin is a private filed in ProtoMessageWritable.java + Field c = writable.getClass().getDeclaredField("cin"); + c.setAccessible(true); + CodedInputStream cin = (CodedInputStream) c.get(writable); + + // Goal is to get value of: reader.writable.cin.getTotalBytesRead() + int totalBytesRead = cin.getTotalBytesRead(); + return totalBytesRead; + } + @Test public void testServiceSplitEvents() throws Exception { ProtoHistoryLoggingService service = createService(true); From d82fbb108603b00115ebadfad03c0cb494430140 Mon Sep 17 00:00:00 2001 From: Raghav Aggarwal Date: Thu, 20 Jun 2024 17:01:01 +0530 Subject: [PATCH 3/3] checkstyle fix --- .../logging/proto/TestProtoHistoryLoggingService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java index da9bf7d2bf..4f24d30a88 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java @@ -166,7 +166,8 @@ public void testProtoMessageSizeReset() throws Exception { } } - private static int getTotalBytesRead(ProtoMessageReader reader) throws NoSuchFieldException, IllegalAccessException { + private static int getTotalBytesRead(ProtoMessageReader reader) throws NoSuchFieldException, + IllegalAccessException { // writable is a private field in ProtoMessageReader.java Field f = reader.getClass().getDeclaredField("writable"); f.setAccessible(true); @@ -178,8 +179,7 @@ private static int getTotalBytesRead(ProtoMessageReader reade CodedInputStream cin = (CodedInputStream) c.get(writable); // Goal is to get value of: reader.writable.cin.getTotalBytesRead() - int totalBytesRead = cin.getTotalBytesRead(); - return totalBytesRead; + return cin.getTotalBytesRead(); } @Test