diff --git a/pom.xml b/pom.xml index a339d097e9..28f6cc516b 100644 --- a/pom.xml +++ b/pom.xml @@ -927,6 +927,7 @@ ${test.build.data} true ${hadoop.version} + ${project.build.directory} diff --git a/tez-api/src/main/java/org/apache/tez/common/LargeEntryLogger.java b/tez-api/src/main/java/org/apache/tez/common/LargeEntryLogger.java new file mode 100644 index 0000000000..488156e9a0 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/LargeEntryLogger.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.common; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.function.Consumer; + +import static org.apache.tez.dag.api.TezConfiguration.TEZ_LOGGING_PROPERTY_MASK; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_LOGGING_PROPERTY_MASK_DEFAULT; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_LOGGING_PROPERTY_SIZE_THRESHOLD; +import static org.apache.tez.dag.api.TezConfiguration.TEZ_LOGGING_PROPERTY_SIZE_THRESHOLD_DEFAULT; + +/** + * A configurable logger for large configuration/payload entries. + */ +public class LargeEntryLogger implements Consumer> { + private static final Logger LOG = LoggerFactory.getLogger(LargeEntryLogger.class); + private final int threshold; + private final boolean mask; + + public static LargeEntryLogger from(Configuration c) { + return new LargeEntryLogger( + c.getInt(TEZ_LOGGING_PROPERTY_SIZE_THRESHOLD, TEZ_LOGGING_PROPERTY_SIZE_THRESHOLD_DEFAULT), + c.getBoolean(TEZ_LOGGING_PROPERTY_MASK, TEZ_LOGGING_PROPERTY_MASK_DEFAULT)); + } + + public static LargeEntryLogger from(Map c) { + String threshold = c.getOrDefault(TEZ_LOGGING_PROPERTY_SIZE_THRESHOLD, + String.valueOf(TEZ_LOGGING_PROPERTY_SIZE_THRESHOLD_DEFAULT)); + String mask = c.getOrDefault(TEZ_LOGGING_PROPERTY_MASK, String.valueOf(TEZ_LOGGING_PROPERTY_MASK_DEFAULT)); + return new LargeEntryLogger(Integer.parseInt(threshold), Boolean.parseBoolean(mask)); + } + + private LargeEntryLogger(int threshold, boolean mask) { + this.threshold = threshold; + this.mask = mask; + } + + public void accept(Map.Entry e) { + String key = e.getKey(); + String value = e.getValue(); + if (value == null) { + LOG.debug("Skipping entry '{}' cause value is null.", key); + return; + } + if (value.length() > threshold) { + LOG.warn("Entry '{}' is unusually big ({} bytes); large entries may lead to OOM.", key, value.length()); + if (!mask) { + LOG.warn("Large entry '{}': {}", key, value); + } + } + } +} diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java index 88920a4e53..373d4d9d55 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; import com.google.protobuf.ByteString; @@ -154,10 +156,10 @@ public static Configuration createConfFromUserPayload(UserPayload payload) throw return createConfFromByteString(ByteString.copyFrom(payload.getPayload())); } - private static void writeConfInPB(OutputStream dos, Configuration conf) throws IOException { DAGProtos.ConfigurationProto.Builder confProtoBuilder = DAGProtos.ConfigurationProto.newBuilder(); - populateConfProtoFromEntries(conf, confProtoBuilder); + populateConfProtoFromStream(StreamSupport.stream(conf.spliterator(), false).peek(LargeEntryLogger.from(conf)), + confProtoBuilder); DAGProtos.ConfigurationProto confProto = confProtoBuilder.build(); confProto.writeTo(dos); } @@ -199,22 +201,30 @@ public static String convertToHistoryText(Configuration conf) { return convertToHistoryText(null, conf); } + /** + * Copy each entry with non-null value from the specified map to the configuration builder. + *

Implementation detail: For debugging purposes, this method can be configured to log large entries.

+ */ + public static void populateConfProtoFromMap(Map map, + DAGProtos.ConfigurationProto.Builder confBuilder) { + populateConfProtoFromStream(map.entrySet().stream().peek(LargeEntryLogger.from(map)), confBuilder); + } - /* Copy each Map.Entry with non-null value to DAGProtos.ConfigurationProto */ + /** + * Copy each entry with non-null value to the specified configuration builder. + * + * @deprecated Use {@link #populateConfProtoFromMap(Map, DAGProtos.ConfigurationProto.Builder)} instead. + */ + @Deprecated public static void populateConfProtoFromEntries(Iterable> params, DAGProtos.ConfigurationProto.Builder confBuilder) { - for(Map.Entry entry : params) { - String key = entry.getKey(); - String val = entry.getValue(); - if(val != null) { - DAGProtos.PlanKeyValuePair.Builder kvp = DAGProtos.PlanKeyValuePair.newBuilder(); - kvp.setKey(key); - kvp.setValue(val); - confBuilder.addConfKeyValues(kvp); - } else { - LOG.debug("null value for key={}. Skipping.", key); - } - } + populateConfProtoFromStream(StreamSupport.stream(params.spliterator(), false), confBuilder); } + private static void populateConfProtoFromStream(Stream> entries, + DAGProtos.ConfigurationProto.Builder proto) { + entries.filter(e -> e.getValue() != null) + .map(e -> DAGProtos.PlanKeyValuePair.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build()) + .forEach(proto::addConfKeyValues); + } } diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index 0864b82e80..f574b25d02 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -60,7 +60,6 @@ import org.apache.tez.common.TezYARNUtils; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; -import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.VertexGroup.GroupInfo; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; @@ -983,7 +982,7 @@ public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCr if (vertex.getConf()!= null && vertex.getConf().size() > 0) { ConfigurationProto.Builder confBuilder = ConfigurationProto.newBuilder(); - TezUtils.populateConfProtoFromEntries(vertex.getConf().entrySet(), confBuilder); + TezUtils.populateConfProtoFromMap(vertex.getConf(), confBuilder); vertexBuilder.setVertexConf(confBuilder); } @@ -1084,7 +1083,7 @@ public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCr ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder(); if (!this.dagConf.isEmpty()) { - TezUtils.populateConfProtoFromEntries(this.dagConf.entrySet(), confProtoBuilder); + TezUtils.populateConfProtoFromMap(this.dagConf, confProtoBuilder); } // Copy historyLogLevel from tezConf into dagConf if its not overridden in dagConf. String logLevel = this.dagConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL); diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 9e2e2d89cf..90b478cce2 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1580,6 +1580,27 @@ public TezConfiguration(boolean loadDefaults) { TEZ_PREFIX + "generate.debug.artifacts"; public static final boolean TEZ_GENERATE_DEBUG_ARTIFACTS_DEFAULT = false; + /** + * Int value. Property size threshold (in bytes) for logging during payload serialization. Properties exceeding the + * threshold are considered unusually large and potentially problematic thus they should be logged. + */ + @ConfigurationScope(Scope.VERTEX) + @ConfigurationProperty(type="integer") + public static final String TEZ_LOGGING_PROPERTY_SIZE_THRESHOLD = + TEZ_PREFIX + "logging.property.size.threshold"; + public static final int TEZ_LOGGING_PROPERTY_SIZE_THRESHOLD_DEFAULT = 512 * 1024; + + /** + * Boolean value. Whether property masking is enabled for logging. Properties may contain sensitive user information + * such as passwords, credentials, secrets, etc., so they shouldn't be logged unconditionally. When masking is + * enabled, the property value (content) is not displayed in the logs. + */ + @ConfigurationScope(Scope.VERTEX) + @ConfigurationProperty + public static final String TEZ_LOGGING_PROPERTY_MASK = + TEZ_PREFIX + "logging.property.mask"; + public static final boolean TEZ_LOGGING_PROPERTY_MASK_DEFAULT = true; + /** * Set of tasks for which specific launch command options need to be added. * Format: "vertexName[csv of task ids];vertexName[csv of task ids].." diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java index d599cafd76..c046047a55 100644 --- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java +++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java @@ -24,11 +24,17 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.BitSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; @@ -41,6 +47,7 @@ import com.google.protobuf.ByteString; public class TestTezUtils { + private static final Path LOG_PATH = Paths.get(System.getProperty("project.build.directory"), "tez-test.log"); @Test (timeout=2000) public void testByteStringToAndFromConf() throws IOException { @@ -103,6 +110,18 @@ public void testByteStringAddToLargeConf() throws IOException { Assert.assertEquals(conf.get("testLargeValue"), largeValue); } + @Test public void testByteStringFromConfEmitsLogForLargeEntry() throws IOException { + Configuration conf = new Configuration(false); + conf.set(TezConfiguration.TEZ_LOGGING_PROPERTY_SIZE_THRESHOLD, "10"); + conf.set(TezConfiguration.TEZ_LOGGING_PROPERTY_MASK, "false"); + conf.set("tez.fake.property.id00", "ABCDEFGHIJK"); + TezUtils.createByteStringFromConf(conf); + List logLines = Lists.reverse(Files.readAllLines(LOG_PATH)); + assertEquals("Entry 'tez.fake.property.id00' is unusually big (11 bytes); large entries may lead to OOM.", + logLines.get(1)); + assertEquals("Large entry 'tez.fake.property.id00': ABCDEFGHIJK", logLines.get(0)); + } + @Test (timeout=2000) public void testPayloadToAndFromConf() throws IOException { Configuration conf = getConf(); @@ -294,4 +313,23 @@ public void testPopulateConfProtoFromEntries() { assertEquals(confBuilder.getConfKeyValuesList().size(), 1); } + @Test public void testPopulateConfProtoFromMapSetsKeyValuePair() { + DAGProtos.ConfigurationProto.Builder builder = DAGProtos.ConfigurationProto.newBuilder(); + TezUtils.populateConfProtoFromMap(ImmutableMap.of("tez.fake.property", "someValue"), builder); + assertEquals(builder.getConfKeyValues(0).getKey(), "tez.fake.property"); + assertEquals(builder.getConfKeyValues(0).getValue(), "someValue"); + } + + @Test public void testPopulateConfProtoFromMapEmitsLogForLargeEntry() throws IOException { + Map map = new HashMap<>(); + map.put(TezConfiguration.TEZ_LOGGING_PROPERTY_SIZE_THRESHOLD, "10"); + map.put(TezConfiguration.TEZ_LOGGING_PROPERTY_MASK, "false"); + map.put("tez.fake.property.id01", "ABCDEFGHIJK"); + TezUtils.populateConfProtoFromMap(map, DAGProtos.ConfigurationProto.newBuilder()); + List logLines = Lists.reverse(Files.readAllLines(LOG_PATH)); + assertEquals("Entry 'tez.fake.property.id01' is unusually big (11 bytes); large entries may lead to OOM.", + logLines.get(1)); + assertEquals("Large entry 'tez.fake.property.id01': ABCDEFGHIJK", logLines.get(0)); + } + } diff --git a/tez-common/src/test/resources/log4j.properties b/tez-common/src/test/resources/log4j.properties index 531b68b5a9..3ab859a12c 100644 --- a/tez-common/src/test/resources/log4j.properties +++ b/tez-common/src/test/resources/log4j.properties @@ -12,8 +12,11 @@ # log4j configuration used during build and unit tests -log4j.rootLogger=info,stdout +log4j.rootLogger=info,stdout,r log4j.threshhold=ALL log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n +log4j.appender.r=org.apache.log4j.RollingFileAppender +log4j.appender.r.File=${project.build.directory}/tez-test.log +log4j.appender.r.layout=org.apache.log4j.PatternLayout