From 9c36ec8d052393a43ebcbd77c0194e213106c903 Mon Sep 17 00:00:00 2001 From: "Piotr P. Karwasz" Date: Sun, 12 Jan 2025 19:04:38 +0100 Subject: [PATCH 1/8] KAFKA-18483: Disable `Log4jController` and `Loggers` if Log4j Core absent If Log4j Core is absent, most calls to `Log4jController` and `Loggers` will end up with a `NoClassDefFoundError`. This changeset: - Profits from the major version bump to rename `k.util.Log4jController` to `LoggingController`. - Removes `o.a.l.l.Level` from the signature of public methods of `o.a.k.connect.runtime.Loggers` and replaces it with `String`. - Provides an additional no-op implementation of `k.util.LoggingController` and `o.a.k.connect.runtime.Loggers`: if Log4j Core is not present on the runtime classpath the no-op implementation will be used. --- .../kafka/connect/runtime/AbstractHerder.java | 9 +- .../apache/kafka/connect/runtime/Loggers.java | 345 +++++++++++------- .../kafka/connect/runtime/LoggersTest.java | 35 +- .../connect/runtime/MockLoggersTest.java | 21 +- .../server/logger/RuntimeLoggerManager.java | 12 +- .../scala/kafka/server/ConfigHelper.scala | 4 +- ...ntroller.scala => LoggingController.scala} | 69 +++- .../logger/RuntimeLoggerManagerTest.java | 8 +- .../api/PlaintextAdminIntegrationTest.scala | 12 +- .../test/scala/kafka/utils/LoggingTest.scala | 4 +- .../unit/kafka/server/KafkaApisTest.scala | 4 +- 11 files changed, 320 insertions(+), 203 deletions(-) rename core/src/main/scala/kafka/utils/{Log4jController.scala => LoggingController.scala} (69%) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 5984a32d3cd9f..3aa425bbeb2e0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -65,7 +65,6 @@ import org.apache.kafka.connect.util.Stage; import org.apache.kafka.connect.util.TemporaryStage; -import org.apache.logging.log4j.Level; import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException; import org.apache.maven.artifact.versioning.VersionRange; import org.slf4j.Logger; @@ -164,7 +163,7 @@ public AbstractHerder(Worker worker, this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy; this.connectorExecutor = Executors.newCachedThreadPool(); this.time = time; - this.loggers = new Loggers(time); + this.loggers = Loggers.newInstance(time); this.cachedConnectors = new CachedConnectors(worker.getPlugins()); } @@ -1261,13 +1260,13 @@ public Map allLoggerLevels() { @Override public List setWorkerLoggerLevel(String namespace, String desiredLevelStr) { - Level level = Level.toLevel(desiredLevelStr.toUpperCase(Locale.ROOT), null); + String normalizedLevel = desiredLevelStr.toUpperCase(Locale.ROOT); - if (level == null) { + if (!loggers.isValidLevel(normalizedLevel)) { log.warn("Ignoring request to set invalid level '{}' for namespace {}", desiredLevelStr, namespace); return Collections.emptyList(); } - return loggers.setLevel(namespace, level); + return loggers.setLevel(namespace, normalizedLevel); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java index 1593e3708fdf0..c3749cd3cda98 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** @@ -43,7 +44,7 @@ * This class is thread-safe; concurrent calls to all of its public methods from any number * of threads are permitted. */ -public class Loggers { +public abstract class Loggers { private static final Logger log = LoggerFactory.getLogger(Loggers.class); @@ -52,184 +53,274 @@ public class Loggers { /** * Log4j uses "root" (case-insensitive) as name of the root logger. * Note: In log4j, the root logger's name was "root" and Kafka also followed that name for dynamic logging control feature. - * + *

* While log4j2 changed the root logger's name to empty string (see: [[LogManager.ROOT_LOGGER_NAME]]), * for backward-compatibility purposes, we accept both empty string and "root" as valid root logger names. * This is why we have a dedicated definition that includes both values. + *

*/ private static final List VALID_ROOT_LOGGER_NAMES = List.of(LogManager.ROOT_LOGGER_NAME, ROOT_LOGGER_NAME); - private final Time time; + final Time time; /** * Maps logger names to their last modification timestamps. * Note: The logger name "root" refers to the actual root logger of log4j2. */ - private final Map lastModifiedTimes; + final Map lastModifiedTimes; - public Loggers(Time time) { + /** + * Creates a {@link Loggers} instance appropriate for the current environment. + * + * @param time A time source. + * @return A new {@link Loggers} instance, never {@link null}. + */ + public static Loggers newInstance(Time time) { + Objects.requireNonNull(time); + try { + return new Log4jLoggers(time); + } catch (ClassCastException | LinkageError e) { + log.info("No supported logging implementation found. Logging configuration endpoint will be disabled."); + return new NoOpLoggers(time); + } + } + + private Loggers(Time time) { this.time = time; - this.lastModifiedTimes = new HashMap<>(); + this.lastModifiedTimes = new ConcurrentHashMap<>(); } /** * Retrieve the current level for a single logger. - * @param logger the name of the logger to retrieve the level for; may not be null + * + * @param loggerName the name of the logger to retrieve the level for; may not be null * @return the current level (falling back on the effective level if necessary) of the logger, * or null if no logger with the specified name exists */ - public synchronized LoggerLevel level(String logger) { - Objects.requireNonNull(logger, "Logger may not be null"); - - org.apache.logging.log4j.Logger foundLogger = null; - if (isValidRootLoggerName(logger)) { - foundLogger = rootLogger(); - } else { - var currentLoggers = currentLoggers().values(); - // search within existing loggers for the given name. - // using LogManger.getLogger() will create a logger if it doesn't exist - // (potential leak since these don't get cleaned up). - for (org.apache.logging.log4j.Logger currentLogger : currentLoggers) { - if (logger.equals(currentLogger.getName())) { - foundLogger = currentLogger; - break; - } - } - } - - if (foundLogger == null) { - log.warn("Unable to find level for logger {}", logger); - return null; - } - - return loggerLevel(foundLogger); - } + public abstract LoggerLevel level(String loggerName); /** * Retrieve the current levels of all known loggers + * * @return the levels of all known loggers; may be empty, but never null */ - public synchronized Map allLevels() { - return currentLoggers() - .values() - .stream() - .filter(logger -> !logger.getLevel().equals(Level.OFF)) - .collect(Collectors.toMap( - this::getLoggerName, - this::loggerLevel, - (existing, replacing) -> replacing, - TreeMap::new) - ); - } + public abstract Map allLevels(); /** * Set the level for the specified logger and all of its children + * * @param namespace the name of the logger to adjust along with its children; may not be null - * @param level the level to set for the logger and its children; may not be null + * @param level the level to set for the logger and its children; may not be null * @return all loggers that were affected by this action, sorted by their natural ordering; * may be empty, but never null */ - public synchronized List setLevel(String namespace, Level level) { - Objects.requireNonNull(namespace, "Logging namespace may not be null"); - Objects.requireNonNull(level, "Level may not be null"); - String internalNameSpace = isValidRootLoggerName(namespace) ? LogManager.ROOT_LOGGER_NAME : namespace; - - log.info("Setting level of namespace {} and children to {}", internalNameSpace, level); - - var loggers = loggers(internalNameSpace); - var nameToLevel = allLevels(); - - List result = new ArrayList<>(); - Configurator.setAllLevels(internalNameSpace, level); - for (org.apache.logging.log4j.Logger logger : loggers) { - // We need to track level changes for each logger and record their update timestamps to ensure this method - // correctly returns only the loggers whose levels were actually modified. - String name = getLoggerName(logger); - String newLevel = logger.getLevel().name(); - String oldLevel = nameToLevel.getOrDefault(name, new LoggerLevel("", time.milliseconds())).level(); - if (!newLevel.equalsIgnoreCase(oldLevel)) { - lastModifiedTimes.put(name, time.milliseconds()); - result.add(name); + public abstract List setLevel(String namespace, String level); + + public abstract boolean isValidLevel(String level); + + static class Log4jLoggers extends Loggers { + + // package-private for testing + final LoggerContext loggerContext; + + // Package-private for testing + Log4jLoggers(Time time) { + super(time); + loggerContext = (LoggerContext) LogManager.getContext(false); + } + + @Override + public LoggerLevel level(String logger) { + Objects.requireNonNull(logger, "Logger may not be null"); + + org.apache.logging.log4j.Logger foundLogger = null; + if (isValidRootLoggerName(logger)) { + foundLogger = rootLogger(); + } else { + var currentLoggers = currentLoggers().values(); + // search within existing loggers for the given name. + // using LogManger.getLogger() will create a logger if it doesn't exist + // (potential leak since these don't get cleaned up). + for (org.apache.logging.log4j.Logger currentLogger : currentLoggers) { + if (logger.equals(currentLogger.getName())) { + foundLogger = currentLogger; + break; + } + } } + + if (foundLogger == null) { + log.warn("Unable to find level for logger {}", logger); + return null; + } + + return loggerLevel(foundLogger); } - Collections.sort(result); - return result; - } + @Override + public Map allLevels() { + return currentLoggers() + .values() + .stream() + .filter(logger -> !logger.getLevel().equals(Level.OFF)) + .collect(Collectors.toMap( + this::getLoggerName, + this::loggerLevel, + (existing, replacing) -> replacing, + TreeMap::new) + ); + } - /** - * Retrieve all known loggers within a given namespace, creating an ancestor logger for that - * namespace if one does not already exist - * @param namespace the namespace that the loggers should fall under; may not be null - * @return all loggers that fall under the given namespace; never null, and will always contain - * at least one logger (the ancestor logger for the namespace) - */ - private synchronized Collection loggers(String namespace) { - Objects.requireNonNull(namespace, "Logging namespace may not be null"); + @Override + public List setLevel(String namespace, String level) { + Objects.requireNonNull(namespace, "Logging namespace may not be null"); + Objects.requireNonNull(level, "Level may not be null"); + String internalNameSpace = isValidRootLoggerName(namespace) ? LogManager.ROOT_LOGGER_NAME : namespace; + + log.info("Setting level of namespace {} and children to {}", internalNameSpace, level); + + var loggers = loggers(internalNameSpace); + var nameToLevel = allLevels(); + + List result = new ArrayList<>(); + Configurator.setAllLevels(internalNameSpace, Level.valueOf(level)); + for (org.apache.logging.log4j.Logger logger : loggers) { + // We need to track level changes for each logger and record their update timestamps to ensure this method + // correctly returns only the loggers whose levels were actually modified. + String name = getLoggerName(logger); + String newLevel = logger.getLevel().name(); + String oldLevel = nameToLevel.getOrDefault(name, new LoggerLevel("", time.milliseconds())).level(); + if (!newLevel.equalsIgnoreCase(oldLevel)) { + lastModifiedTimes.put(name, time.milliseconds()); + result.add(name); + } + } + Collections.sort(result); - if (isValidRootLoggerName(namespace)) { - return currentLoggers().values(); + return result; } - var result = new ArrayList(); - var nameToLogger = currentLoggers(); - var ancestorLogger = lookupLogger(namespace); - var currentLoggers = nameToLogger.values(); + @Override + public boolean isValidLevel(String level) { + return !level.isEmpty() && Level.getLevel(level) != null; + } - boolean present = false; - for (org.apache.logging.log4j.Logger currentLogger : currentLoggers) { - if (currentLogger.getName().startsWith(namespace)) { - result.add(currentLogger); + /** + * Retrieve all known loggers within a given namespace, creating an ancestor logger for that + * namespace if one does not already exist + * + * @param namespace the namespace that the loggers should fall under; may not be null + * @return all loggers that fall under the given namespace; never null, and will always contain + * at least one logger (the ancestor logger for the namespace) + */ + private Collection loggers(String namespace) { + Objects.requireNonNull(namespace, "Logging namespace may not be null"); + + if (isValidRootLoggerName(namespace)) { + return currentLoggers().values(); } - if (namespace.equals(currentLogger.getName())) { - present = true; + + var result = new ArrayList(); + var nameToLogger = currentLoggers(); + var ancestorLogger = lookupLogger(namespace); + var currentLoggers = nameToLogger.values(); + + boolean present = false; + for (org.apache.logging.log4j.core.Logger currentLogger : currentLoggers) { + if (currentLogger.getName().startsWith(namespace)) { + result.add(currentLogger); + } + if (namespace.equals(currentLogger.getName())) { + present = true; + } } + + if (!present) { + result.add(ancestorLogger); + } + + return result; } - if (!present) { - result.add(ancestorLogger); + // visible for testing + org.apache.logging.log4j.core.Logger lookupLogger(String logger) { + return loggerContext.getLogger(isValidRootLoggerName(logger) ? LogManager.ROOT_LOGGER_NAME : logger); } - return result; - } + // visible for testing + Map currentLoggers() { + LoggerContext context = (LoggerContext) LogManager.getContext(false); + var results = new HashMap(); + context.getConfiguration().getLoggers().forEach((name, logger) -> results.put(name, loggerContext.getLogger(name))); + context.getLoggerRegistry().getLoggers().forEach(logger -> results.put(logger.getName(), logger)); + return results; + } - // visible for testing - org.apache.logging.log4j.Logger lookupLogger(String logger) { - return LogManager.getLogger(isValidRootLoggerName(logger) ? LogManager.ROOT_LOGGER_NAME : logger); - } + // visible for testing + org.apache.logging.log4j.Logger rootLogger() { + return LogManager.getRootLogger(); + } - Map currentLoggers() { - LoggerContext context = (LoggerContext) LogManager.getContext(false); - var results = new HashMap(); - context.getConfiguration().getLoggers().forEach((name, logger) -> results.put(name, LogManager.getLogger(name))); - context.getLoggerRegistry().getLoggers().forEach(logger -> results.put(logger.getName(), logger)); - return results; - } + private LoggerLevel loggerLevel(org.apache.logging.log4j.Logger logger) { + Long lastModified = lastModifiedTimes.get(getLoggerName(logger)); + return new LoggerLevel(Objects.toString(logger.getLevel()), lastModified); + } - // visible for testing - org.apache.logging.log4j.Logger rootLogger() { - return LogManager.getRootLogger(); - } + private boolean isValidRootLoggerName(String namespace) { + return VALID_ROOT_LOGGER_NAMES.stream() + .anyMatch(rootLoggerNames -> rootLoggerNames.equalsIgnoreCase(namespace)); + } - private LoggerLevel loggerLevel(org.apache.logging.log4j.Logger logger) { - Long lastModified = lastModifiedTimes.get(getLoggerName(logger)); - return new LoggerLevel(Objects.toString(logger.getLevel()), lastModified); - } + /** + * Converts logger name to ensure backward compatibility between Log4j 1 and Log4j 2. + * If the logger name is empty (Log4j 2 root logger representation), converts it to "root" (Log4j 1 style). + * Otherwise, returns the original logger name. + * + * @param loggerName The name of the logger. + * @return The logger name - returns "root" for empty string, otherwise returns the original logger name + */ + private String getLoggerName(String loggerName) { + return loggerName.equals(LogManager.ROOT_LOGGER_NAME) ? ROOT_LOGGER_NAME : loggerName; + } + + /** + * Converts logger name to ensure backward compatibility between Log4j 1 and Log4j 2. + * If the logger name is empty (Log4j 2 root logger representation), converts it to "root" (Log4j 1 style). + * Otherwise, returns the original logger name. + * + * @param logger The logger instance to get the name from + * @return The logger name - returns "root" for empty string, otherwise returns the original logger name + */ + private String getLoggerName(org.apache.logging.log4j.Logger logger) { + return getLoggerName(logger.getName()); + } - private boolean isValidRootLoggerName(String namespace) { - return VALID_ROOT_LOGGER_NAMES.stream() - .anyMatch(rootLoggerNames -> rootLoggerNames.equalsIgnoreCase(namespace)); } - /** - * Converts logger name to ensure backward compatibility between log4j and log4j2. - * If the logger name is empty (log4j2's root logger representation), converts it to "root" (log4j's style). - * Otherwise, returns the original logger name. - * - * @param logger The logger instance to get the name from - * @return The logger name - returns "root" for empty string, otherwise returns the original logger name - */ - private String getLoggerName(org.apache.logging.log4j.Logger logger) { - return logger.getName().equals(LogManager.ROOT_LOGGER_NAME) ? ROOT_LOGGER_NAME : logger.getName(); + private static class NoOpLoggers extends Loggers { + + private NoOpLoggers(Time time) { + super(time); + } + + @Override + public LoggerLevel level(String loggerName) { + return new LoggerLevel("OFF", 0L); + } + + @Override + public Map allLevels() { + return Map.of(); + } + + @Override + public List setLevel(String loggerName, String level) { + return List.of(); + } + + @Override + public boolean isValidLevel(String level) { + return "OFF".equals(level); + } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java index a965a061f82b5..d26cbcd1fdb2f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java @@ -20,9 +20,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.LoggerContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,14 +39,13 @@ public class LoggersTest { private static final long INITIAL_TIME = 1696951712135L; - private final LoggerContext context = (LoggerContext) LogManager.getContext(false); - private Loggers loggers; + private Loggers.Log4jLoggers loggers; private Time time; @BeforeEach public void setup() { time = new MockTime(0, INITIAL_TIME, 0); - loggers = new Loggers(time); + loggers = (Loggers.Log4jLoggers) Loggers.newInstance(time); } @Test @@ -68,7 +65,7 @@ public void testLevelWithValidRootLoggerNames() { @Test public void testLevelWithExistLoggerName() { - loggers.setLevel("foo", DEBUG); + loggers.setLevel("foo", DEBUG.name()); assertEquals(new LoggerLevel(DEBUG.name(), INITIAL_TIME), loggers.level("foo") ); @@ -81,7 +78,7 @@ public void testLevelWithNonExistLoggerName() { @Test public void testLevelWithNewlyCreatedLogger() { - loggers.setLevel("dummy", ERROR); + loggers.setLevel("dummy", ERROR.name()); assertEquals( new LoggerLevel(ERROR.name(), time.milliseconds()), loggers.level("dummy"), @@ -91,8 +88,8 @@ public void testLevelWithNewlyCreatedLogger() { @Test public void testAllLevelsAfterCreatingNewLogger() { - loggers.setLevel("foo", WARN); - loggers.setLevel("bar", ERROR); + loggers.setLevel("foo", WARN.name()); + loggers.setLevel("bar", ERROR.name()); Map loggerToLevel = loggers.allLevels(); Map expectedLevels = Map.of( "foo", new LoggerLevel(WARN.name(), INITIAL_TIME), @@ -113,8 +110,8 @@ public void testSetLevelWithNullNameSpaceOrNullLevel() { @Test public void testSetLevelWithValidRootLoggerNames() { - loggers.setLevel("", ERROR); - List setLevelResultWithRoot = loggers.setLevel("root", ERROR); + loggers.setLevel("", ERROR.name()); + List setLevelResultWithRoot = loggers.setLevel("root", ERROR.name()); assertTrue(setLevelResultWithRoot.isEmpty(), "Setting level with empty string ('') and 'root' should affect the same set of loggers - " + "when setting the same level twice, second call should return empty list indicating no loggers were affected"); @@ -122,9 +119,9 @@ public void testSetLevelWithValidRootLoggerNames() { @Test public void testSetLevel() { - loggers.setLevel("a.b.c", DEBUG); - loggers.setLevel("a.b", ERROR); - loggers.setLevel("a", WARN); + loggers.setLevel("a.b.c", DEBUG.name()); + loggers.setLevel("a.b", ERROR.name()); + loggers.setLevel("a", WARN.name()); Map expected = Map.of( "a", new LoggerLevel(WARN.name(), INITIAL_TIME), "a.b", new LoggerLevel(WARN.name(), INITIAL_TIME), @@ -135,7 +132,7 @@ public void testSetLevel() { @Test public void testLookupLoggerAfterCreatingNewLogger() { - loggers.setLevel("dummy", INFO); + loggers.setLevel("dummy", INFO.name()); Logger logger = loggers.lookupLogger("dummy"); assertNotNull(logger); assertEquals(INFO, logger.getLevel()); @@ -144,9 +141,9 @@ public void testLookupLoggerAfterCreatingNewLogger() { @Test public void testSetLevelWithSameLevel() { String loggerName = "dummy"; - loggers.setLevel(loggerName, DEBUG); + loggers.setLevel(loggerName, DEBUG.name()); time.sleep(100); - loggers.setLevel(loggerName, DEBUG); + loggers.setLevel(loggerName, DEBUG.name()); assertEquals( new LoggerLevel(DEBUG.name(), INITIAL_TIME), loggers.allLevels().get(loggerName), @@ -157,9 +154,9 @@ public void testSetLevelWithSameLevel() { @Test public void testSetLevelWithDifferentLevels() { String loggerName = "dummy"; - loggers.setLevel(loggerName, DEBUG); + loggers.setLevel(loggerName, DEBUG.name()); time.sleep(100); - loggers.setLevel(loggerName, WARN); + loggers.setLevel(loggerName, WARN.name()); assertEquals( new LoggerLevel(WARN.name(), INITIAL_TIME + 100), loggers.allLevels().get(loggerName), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockLoggersTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockLoggersTest.java index 3df5028461190..ceaf446ffd863 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockLoggersTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/MockLoggersTest.java @@ -22,7 +22,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.Logger; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.Configurator; @@ -51,7 +51,6 @@ @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class MockLoggersTest { private static final long INITIAL_TIME = 1696951712135L; - private final LoggerContext context = (LoggerContext) LogManager.getContext(false); private Time time; @BeforeEach @@ -137,7 +136,7 @@ public void testSetLevel() { // one should be created by the Loggers instance when we set the level TestLoggers loggers = new TestLoggers(root, x, y, z, w); - List modified = loggers.setLevel("a.b.c.p", Level.WARN); + List modified = loggers.setLevel("a.b.c.p", Level.WARN.name()); assertEquals(Arrays.asList("a.b.c.p", "a.b.c.p.X", "a.b.c.p.Y", "a.b.c.p.Z"), modified); assertEquals(Level.WARN.toString(), loggers.level("a.b.c.p").level()); assertEquals(Level.WARN, x.getLevel()); @@ -150,7 +149,7 @@ public void testSetLevel() { // Sleep a little and adjust the level of a leaf logger time.sleep(10); - loggers.setLevel("a.b.c.p.X", Level.ERROR); + loggers.setLevel("a.b.c.p.X", Level.ERROR.name()); expectedLevel = new LoggerLevel(Level.ERROR.toString(), INITIAL_TIME + 10); actualLevel = loggers.level("a.b.c.p.X"); assertEquals(expectedLevel, actualLevel); @@ -166,7 +165,7 @@ public void testSetLevel() { // Set the same level again, and verify that the last modified time hasn't been altered time.sleep(10); - loggers.setLevel("a.b.c.p.X", Level.ERROR); + loggers.setLevel("a.b.c.p.X", Level.ERROR.name()); expectedLevel = new LoggerLevel(Level.ERROR.toString(), INITIAL_TIME + 10); actualLevel = loggers.level("a.b.c.p.X"); assertEquals(expectedLevel, actualLevel); @@ -185,7 +184,7 @@ public void testSetRootLevel() { config.addLogger(rootLoggerName, rootConfig); loggerContext.updateLoggers(); - Logger root = LogManager.getLogger(rootLoggerName); + Logger root = loggerContext.getLogger(rootLoggerName); Configurator.setLevel(root, Level.ERROR); Logger p = loggerContext.getLogger("a.b.c.p"); @@ -201,7 +200,7 @@ public void testSetRootLevel() { Loggers loggers = new TestLoggers(root, x, y, z, w); - List modified = loggers.setLevel(rootLoggerName, Level.DEBUG); + List modified = loggers.setLevel(rootLoggerName, Level.DEBUG.name()); assertEquals(Arrays.asList("a.b.c.p.X", "a.b.c.p.Y", "a.b.c.p.Z", "a.b.c.s.W", rootLoggerName), modified); assertEquals(Level.DEBUG, p.getLevel()); @@ -230,17 +229,17 @@ public void testSetLevelNullArguments() { LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false); Logger root = loggerContext.getRootLogger(); Loggers loggers = new TestLoggers(root); - assertThrows(NullPointerException.class, () -> loggers.setLevel(null, Level.INFO)); + assertThrows(NullPointerException.class, () -> loggers.setLevel(null, Level.INFO.name())); assertThrows(NullPointerException.class, () -> loggers.setLevel("root", null)); } - private class TestLoggers extends Loggers { + private class TestLoggers extends Loggers.Log4jLoggers { private final Logger rootLogger; private final Map currentLoggers; public TestLoggers(Logger rootLogger, Logger... knownLoggers) { - super(time); + super(MockLoggersTest.this.time); this.rootLogger = rootLogger; this.currentLoggers = new HashMap<>(Stream.of(knownLoggers) .collect(Collectors.toMap( @@ -252,7 +251,7 @@ public TestLoggers(Logger rootLogger, Logger... knownLoggers) { @Override Logger lookupLogger(String logger) { - return currentLoggers.computeIfAbsent(logger, LogManager::getLogger); + return currentLoggers.computeIfAbsent(logger, loggerContext::getLogger); } @Override diff --git a/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java b/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java index 14d1b72c10942..05c516c6d5b9e 100644 --- a/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java +++ b/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java @@ -17,7 +17,7 @@ package kafka.server.logger; -import kafka.utils.Log4jController; +import kafka.utils.LoggingController; import org.apache.kafka.clients.admin.AlterConfigOp.OpType; import org.apache.kafka.common.config.LogLevelConfig; @@ -76,14 +76,14 @@ void alterLogLevelConfigs(Collection ops) { String logLevel = op.value(); switch (OpType.forId(op.configOperation())) { case SET: - if (Log4jController.logLevel(loggerName, logLevel)) { + if (LoggingController.logLevel(loggerName, logLevel)) { log.warn("Updated the log level of {} to {}", loggerName, logLevel); } else { log.error("Failed to update the log level of {} to {}", loggerName, logLevel); } break; case DELETE: - if (Log4jController.unsetLogLevel(loggerName)) { + if (LoggingController.unsetLogLevel(loggerName)) { log.warn("Unset the log level of {}", loggerName); } else { log.error("Failed to unset the log level of {}", loggerName); @@ -111,7 +111,7 @@ void validateResourceNameIsNodeId(String resourceName) { } void validateLoggerNameExists(String loggerName) { - if (!Log4jController.loggerExists(loggerName)) { + if (!LoggingController.loggerExists(loggerName)) { throw new InvalidConfigurationException("Logger " + loggerName + " does not exist!"); } } @@ -131,9 +131,9 @@ void validateLogLevelConfigs(Collection ops) { break; case DELETE: validateLoggerNameExists(loggerName); - if (loggerName.equals(Log4jController.ROOT_LOGGER())) { + if (loggerName.equals(LoggingController.ROOT_LOGGER())) { throw new InvalidRequestException("Removing the log level of the " + - Log4jController.ROOT_LOGGER() + " logger is not allowed"); + LoggingController.ROOT_LOGGER() + " logger is not allowed"); } break; case APPEND: diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 095a474441a13..8e25edf60f48d 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -21,7 +21,7 @@ import kafka.network.RequestChannel import java.util.{Collections, Properties} import kafka.server.metadata.ConfigRepository -import kafka.utils.{Log4jController, Logging} +import kafka.utils.{LoggingController, Logging} import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource} import org.apache.kafka.common.errors.{ApiException, InvalidRequestException} @@ -130,7 +130,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo else if (resourceNameToBrokerId(resource.resourceName) != config.brokerId) throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.resourceName}") else - createResponseConfig(Log4jController.loggers, + createResponseConfig(LoggingController.loggers, (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name) .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id) .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava)) diff --git a/core/src/main/scala/kafka/utils/Log4jController.scala b/core/src/main/scala/kafka/utils/LoggingController.scala similarity index 69% rename from core/src/main/scala/kafka/utils/Log4jController.scala rename to core/src/main/scala/kafka/utils/LoggingController.scala index 4bc022dadfeba..528e9a71aadbc 100755 --- a/core/src/main/scala/kafka/utils/Log4jController.scala +++ b/core/src/main/scala/kafka/utils/LoggingController.scala @@ -17,6 +17,7 @@ package kafka.utils +import kafka.utils.LoggingController.ROOT_LOGGER import org.apache.kafka.common.utils.Utils import org.apache.logging.log4j.core.LoggerContext import org.apache.logging.log4j.core.config.Configurator @@ -27,22 +28,55 @@ import java.util.Locale import scala.jdk.CollectionConverters._ -object Log4jController { +object LoggingController { /** - * Note: In log4j, the root logger's name was "root" and Kafka also followed that name for dynamic logging control feature. + * Note: In Log4j 1, the root logger's name was "root" and Kafka also followed that name for dynamic logging control feature. * * The root logger's name is changed in log4j2 to empty string (see: [[LogManager.ROOT_LOGGER_NAME]]) but for backward- * compatibility. Kafka keeps its original root logger name. It is why here is a dedicated definition for the root logger name. */ val ROOT_LOGGER = "root" + private[this] val delegate: LoggingControllerDelegate = { + try { + new Log4jCoreController + } catch { + case _: ClassCastException | _: LinkageError => new NoOpController + } + } + /** * Returns a map of the log4j loggers and their assigned log level. * If a logger does not have a log level assigned, we return the log level of the first ancestor with a level configured. */ - def loggers: Map[String, String] = { - val logContext = LogManager.getContext(false).asInstanceOf[LoggerContext] + def loggers: Map[String, String] = delegate.loggers + + /** + * Sets the log level of a particular logger. If the given logLevel is not an available level + * (i.e., one of OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL) it falls back to DEBUG. + * + * @see [[Level.toLevel]] + */ + def logLevel(loggerName: String, logLevel: String): Boolean = delegate.logLevel(loggerName, logLevel) + + def unsetLogLevel(loggerName: String): Boolean = delegate.unsetLogLevel(loggerName) + + def loggerExists(loggerName: String): Boolean = delegate.loggerExists(loggerName) +} + +private class NoOpController extends LoggingControllerDelegate { + override def loggers: Map[String, String] = Map.empty + + override def logLevel(loggerName: String, logLevel: String): Boolean = false + + override def unsetLogLevel(loggerName: String): Boolean = false +} + +private class Log4jCoreController extends LoggingControllerDelegate { + private[this] val logContext = LogManager.getContext(false).asInstanceOf[LoggerContext] + + override def loggers: Map[String, String] = { val rootLoggerLevel = logContext.getRootLogger.getLevel.toString // Loggers defined in the configuration @@ -63,13 +97,7 @@ object Log4jController { (configured ++ actual) + (ROOT_LOGGER -> rootLoggerLevel) } - /** - * Sets the log level of a particular logger. If the given logLevel is not an available log4j level - * (i.e., one of OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL) it falls back to DEBUG. - * - * @see [[Level.toLevel]] - */ - def logLevel(loggerName: String, logLevel: String): Boolean = { + override def logLevel(loggerName: String, logLevel: String): Boolean = { if (Utils.isBlank(loggerName) || Utils.isBlank(logLevel)) return false @@ -87,7 +115,7 @@ object Log4jController { } } - def unsetLogLevel(loggerName: String): Boolean = { + override def unsetLogLevel(loggerName: String): Boolean = { if (loggerName == ROOT_LOGGER) { Configurator.setAllLevels(LogManager.ROOT_LOGGER_NAME, null) true @@ -99,7 +127,12 @@ object Log4jController { else false } } +} +private abstract class LoggingControllerDelegate { + def loggers: Map[String, String] + def logLevel(loggerName: String, logLevel: String): Boolean + def unsetLogLevel(loggerName: String): Boolean def loggerExists(loggerName: String): Boolean = loggers.contains(loggerName) } @@ -109,25 +142,23 @@ object Log4jController { * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization * of the companion object. */ -class Log4jController extends Log4jControllerMBean { +class LoggingController extends LoggingControllerMBean { def getLoggers: util.List[String] = { // we replace scala collection by java collection so mbean client is able to deserialize it without scala library. - new util.ArrayList[String](Log4jController.loggers.map { + new util.ArrayList[String](LoggingController.loggers.map { case (logger, level) => s"$logger=$level" }.toSeq.asJava) } - def getLogLevel(loggerName: String): String = { - Log4jController.loggers.getOrElse(loggerName, "No such logger.") + LoggingController.loggers.getOrElse(loggerName, "No such logger.") } - def setLogLevel(loggerName: String, level: String): Boolean = Log4jController.logLevel(loggerName, level) + def setLogLevel(loggerName: String, level: String): Boolean = LoggingController.logLevel(loggerName, level) } - -trait Log4jControllerMBean { +trait LoggingControllerMBean { def getLoggers: java.util.List[String] def getLogLevel(logger: String): String def setLogLevel(logger: String, level: String): Boolean diff --git a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java b/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java index e2a05fc65f6ac..a17aaf48e07b3 100644 --- a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java +++ b/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java @@ -16,7 +16,7 @@ */ package kafka.server.logger; -import kafka.utils.Log4jController; +import kafka.utils.LoggingController; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigOp.OpType; @@ -82,18 +82,18 @@ public void testValidateBogusLogLevelNameNotAllowed() { @Test public void testValidateSetRootLogLevelConfig() { MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig(). - setName(Log4jController.ROOT_LOGGER()). + setName(LoggingController.ROOT_LOGGER()). setConfigOperation(OpType.SET.id()). setValue("TRACE"))); } @Test public void testValidateRemoveRootLogLevelConfigNotAllowed() { - assertEquals("Removing the log level of the " + Log4jController.ROOT_LOGGER() + + assertEquals("Removing the log level of the " + LoggingController.ROOT_LOGGER() + " logger is not allowed", Assertions.assertThrows(InvalidRequestException.class, () -> MANAGER.validateLogLevelConfigs(Arrays.asList(new AlterableConfig(). - setName(Log4jController.ROOT_LOGGER()). + setName(LoggingController.ROOT_LOGGER()). setConfigOperation(OpType.DELETE.id()). setValue("")))).getMessage()); } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 0f4174e1250d2..324a2e762f9dc 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -30,7 +30,7 @@ import java.{time, util} import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils._ -import kafka.utils.{Log4jController, TestInfoUtils, TestUtils} +import kafka.utils.{LoggingController, TestInfoUtils, TestUtils} import org.apache.kafka.clients.HostResolver import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource @@ -3776,20 +3776,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testIncrementalAlterConfigsForLog4jLogLevelsCanSetToRootLogger(quorum: String): Unit = { client = createAdminClient val initialLoggerConfig = describeBrokerLoggers() - val initialRootLogLevel = initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value() + val initialRootLogLevel = initialLoggerConfig.get(LoggingController.ROOT_LOGGER).value() val newRootLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL val alterRootLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET) + new AlterConfigOp(new ConfigEntry(LoggingController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET) ).asJavaCollection alterBrokerLoggers(alterRootLoggerEntry, validateOnly = true) val validatedRootLoggerConfig = describeBrokerLoggers() - assertEquals(initialRootLogLevel, validatedRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) + assertEquals(initialRootLogLevel, validatedRootLoggerConfig.get(LoggingController.ROOT_LOGGER).value()) alterBrokerLoggers(alterRootLoggerEntry) val changedRootLoggerConfig = describeBrokerLoggers() - assertEquals(newRootLogLevel, changedRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) + assertEquals(newRootLogLevel, changedRootLoggerConfig.get(LoggingController.ROOT_LOGGER).value()) } @ParameterizedTest @@ -3797,7 +3797,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(quorum: String): Unit = { client = createAdminClient val deleteRootLoggerEntry = Seq( - new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, ""), AlterConfigOp.OpType.DELETE) + new AlterConfigOp(new ConfigEntry(LoggingController.ROOT_LOGGER, ""), AlterConfigOp.OpType.DELETE) ).asJavaCollection assertTrue(assertThrows(classOf[ExecutionException], () => alterBrokerLoggers(deleteRootLoggerEntry)).getCause.isInstanceOf[InvalidRequestException]) diff --git a/core/src/test/scala/kafka/utils/LoggingTest.scala b/core/src/test/scala/kafka/utils/LoggingTest.scala index d1b389b4a8cf3..6016c496ff4ad 100644 --- a/core/src/test/scala/kafka/utils/LoggingTest.scala +++ b/core/src/test/scala/kafka/utils/LoggingTest.scala @@ -29,7 +29,7 @@ class LoggingTest extends Logging { @Test def testTypeOfGetLoggers(): Unit = { - val log4jController = new Log4jController + val log4jController = new LoggingController // the return object of getLoggers must be a collection instance from java standard library. // That enables mbean client to deserialize it without extra libraries. assertEquals(classOf[java.util.ArrayList[String]], log4jController.getLoggers.getClass) @@ -70,7 +70,7 @@ class LoggingTest extends Logging { @Test def testLoggerLevelIsResolved(): Unit = { - val controller = new Log4jController() + val controller = new LoggingController() val previousLevel = controller.getLogLevel("kafka") try { controller.setLogLevel("kafka", "TRACE") diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 5dc5ad06f29e3..3fbe919850297 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -24,7 +24,7 @@ import kafka.network.RequestChannel import kafka.server.QuotaFactory.QuotaManagers import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, MockConfigRepository} import kafka.server.share.SharePartitionManager -import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils} +import kafka.utils.{CoreUtils, LoggingController, Logging, TestUtils} import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.common._ @@ -9718,7 +9718,7 @@ class KafkaApisTest extends Logging { setResourceName(brokerId.toString). setResourceType(BROKER_LOGGER.id()). setConfigs(new IAlterableConfigCollection(asList(new IAlterableConfig(). - setName(Log4jController.ROOT_LOGGER). + setName(LoggingController.ROOT_LOGGER). setValue("TRACE")).iterator()))).iterator())), 1.toShort)) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) From b382950782040fa26a96f6fee56be2ea129ae929 Mon Sep 17 00:00:00 2001 From: "Piotr P. Karwasz" Date: Mon, 27 Jan 2025 19:24:29 +0100 Subject: [PATCH 2/8] Fix JMX registration code --- core/src/main/scala/kafka/utils/Logging.scala | 20 ++++++++++++------- .../test/scala/kafka/utils/LoggingTest.scala | 10 ++++++++-- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala index dd83e90336099..801aff998e67e 100755 --- a/core/src/main/scala/kafka/utils/Logging.scala +++ b/core/src/main/scala/kafka/utils/Logging.scala @@ -24,13 +24,19 @@ import org.slf4j.{LoggerFactory, Marker, MarkerFactory} object Log4jControllerRegistration { private val logger = Logger(this.getClass.getName) - try { - val log4jController = Class.forName("kafka.utils.Log4jController").asInstanceOf[Class[Object]] - val instance = log4jController.getDeclaredConstructor().newInstance() - CoreUtils.registerMBean(instance, "kafka:type=kafka.Log4jController") - logger.info("Registered kafka:type=kafka.Log4jController MBean") - } catch { - case _: Exception => logger.info("Couldn't register kafka:type=kafka.Log4jController MBean") + private val loggingMBean = new LoggingController + // Legacy name + registerMBean(loggingMBean, "kafka.Log4jController") + // New name + registerMBean(loggingMBean, "kafka.LoggingController") + + private def registerMBean(mbean: LoggingController, typeAttr: String): Unit = { + try { + CoreUtils.registerMBean(mbean, s"kafka:type=$typeAttr") + logger.info("Registered `kafka:type={}` MBean", typeAttr) + } catch { + case e: Exception => logger.warn("Couldn't register `kafka:type={}` MBean", typeAttr, e) + } } } diff --git a/core/src/test/scala/kafka/utils/LoggingTest.scala b/core/src/test/scala/kafka/utils/LoggingTest.scala index 6016c496ff4ad..3b3b471bff75a 100644 --- a/core/src/test/scala/kafka/utils/LoggingTest.scala +++ b/core/src/test/scala/kafka/utils/LoggingTest.scala @@ -38,10 +38,16 @@ class LoggingTest extends Logging { @Test def testLog4jControllerIsRegistered(): Unit = { val mbs = ManagementFactory.getPlatformMBeanServer + // Legacy name val log4jControllerName = ObjectName.getInstance("kafka:type=kafka.Log4jController") assertTrue(mbs.isRegistered(log4jControllerName), "kafka.utils.Log4jController is not registered") - val instance = mbs.getObjectInstance(log4jControllerName) - assertEquals("kafka.utils.Log4jController", instance.getClassName) + val log4jInstance = mbs.getObjectInstance(log4jControllerName) + assertEquals("kafka.utils.LoggingController", log4jInstance.getClassName) + // New name + val loggingControllerName = ObjectName.getInstance("kafka:type=kafka.LoggingController") + assertTrue(mbs.isRegistered(loggingControllerName), "kafka.utils.LoggingController is not registered") + val loggingInstance = mbs.getObjectInstance(loggingControllerName) + assertEquals("kafka.utils.LoggingController", loggingInstance.getClassName) } @Test From 88140bb5fa0f7c8cc1504438b67e454cda500f48 Mon Sep 17 00:00:00 2001 From: "Piotr P. Karwasz" Date: Mon, 27 Jan 2025 19:28:33 +0100 Subject: [PATCH 3/8] Use explicit types in declarations --- .../apache/kafka/connect/runtime/Loggers.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java index c3749cd3cda98..c8d1cda3e82a1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java @@ -137,11 +137,11 @@ public LoggerLevel level(String logger) { if (isValidRootLoggerName(logger)) { foundLogger = rootLogger(); } else { - var currentLoggers = currentLoggers().values(); + Collection currentLoggers = currentLoggers().values(); // search within existing loggers for the given name. // using LogManger.getLogger() will create a logger if it doesn't exist // (potential leak since these don't get cleaned up). - for (org.apache.logging.log4j.Logger currentLogger : currentLoggers) { + for (org.apache.logging.log4j.core.Logger currentLogger : currentLoggers) { if (logger.equals(currentLogger.getName())) { foundLogger = currentLogger; break; @@ -179,8 +179,8 @@ public List setLevel(String namespace, String level) { log.info("Setting level of namespace {} and children to {}", internalNameSpace, level); - var loggers = loggers(internalNameSpace); - var nameToLevel = allLevels(); + Collection loggers = loggers(internalNameSpace); + Map nameToLevel = allLevels(); List result = new ArrayList<>(); Configurator.setAllLevels(internalNameSpace, Level.valueOf(level)); @@ -220,10 +220,10 @@ private Collection loggers(String namespac return currentLoggers().values(); } - var result = new ArrayList(); - var nameToLogger = currentLoggers(); - var ancestorLogger = lookupLogger(namespace); - var currentLoggers = nameToLogger.values(); + Collection result = new ArrayList<>(); + Map nameToLogger = currentLoggers(); + org.apache.logging.log4j.core.Logger ancestorLogger = lookupLogger(namespace); + Collection currentLoggers = nameToLogger.values(); boolean present = false; for (org.apache.logging.log4j.core.Logger currentLogger : currentLoggers) { @@ -250,7 +250,7 @@ org.apache.logging.log4j.core.Logger lookupLogger(String logger) { // visible for testing Map currentLoggers() { LoggerContext context = (LoggerContext) LogManager.getContext(false); - var results = new HashMap(); + Map results = new HashMap<>(); context.getConfiguration().getLoggers().forEach((name, logger) -> results.put(name, loggerContext.getLogger(name))); context.getLoggerRegistry().getLoggers().forEach(logger -> results.put(logger.getName(), logger)); return results; From 3922eff91d7778c02bd0f1d1ebfb642a9daffd8c Mon Sep 17 00:00:00 2001 From: "Piotr P. Karwasz" Date: Mon, 27 Jan 2025 20:35:17 +0100 Subject: [PATCH 4/8] Remove `kafka.LoggingController` bean --- core/src/main/scala/kafka/utils/Logging.scala | 3 --- core/src/test/scala/kafka/utils/LoggingTest.scala | 7 +------ 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala index 801aff998e67e..7518fecd2f93f 100755 --- a/core/src/main/scala/kafka/utils/Logging.scala +++ b/core/src/main/scala/kafka/utils/Logging.scala @@ -25,10 +25,7 @@ object Log4jControllerRegistration { private val logger = Logger(this.getClass.getName) private val loggingMBean = new LoggingController - // Legacy name registerMBean(loggingMBean, "kafka.Log4jController") - // New name - registerMBean(loggingMBean, "kafka.LoggingController") private def registerMBean(mbean: LoggingController, typeAttr: String): Unit = { try { diff --git a/core/src/test/scala/kafka/utils/LoggingTest.scala b/core/src/test/scala/kafka/utils/LoggingTest.scala index 3b3b471bff75a..7479f021649e5 100644 --- a/core/src/test/scala/kafka/utils/LoggingTest.scala +++ b/core/src/test/scala/kafka/utils/LoggingTest.scala @@ -38,16 +38,11 @@ class LoggingTest extends Logging { @Test def testLog4jControllerIsRegistered(): Unit = { val mbs = ManagementFactory.getPlatformMBeanServer - // Legacy name + val log4jControllerName = ObjectName.getInstance("kafka:type=kafka.Log4jController") assertTrue(mbs.isRegistered(log4jControllerName), "kafka.utils.Log4jController is not registered") val log4jInstance = mbs.getObjectInstance(log4jControllerName) assertEquals("kafka.utils.LoggingController", log4jInstance.getClassName) - // New name - val loggingControllerName = ObjectName.getInstance("kafka:type=kafka.LoggingController") - assertTrue(mbs.isRegistered(loggingControllerName), "kafka.utils.LoggingController is not registered") - val loggingInstance = mbs.getObjectInstance(loggingControllerName) - assertEquals("kafka.utils.LoggingController", loggingInstance.getClassName) } @Test From 656135788e3feba3bef9af496238a07e904e1fef Mon Sep 17 00:00:00 2001 From: "Piotr P. Karwasz" Date: Mon, 27 Jan 2025 20:35:29 +0100 Subject: [PATCH 5/8] Revert "Use explicit types in declarations" This reverts commit 88140bb5fa0f7c8cc1504438b67e454cda500f48. --- .../apache/kafka/connect/runtime/Loggers.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java index c8d1cda3e82a1..c3749cd3cda98 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java @@ -137,11 +137,11 @@ public LoggerLevel level(String logger) { if (isValidRootLoggerName(logger)) { foundLogger = rootLogger(); } else { - Collection currentLoggers = currentLoggers().values(); + var currentLoggers = currentLoggers().values(); // search within existing loggers for the given name. // using LogManger.getLogger() will create a logger if it doesn't exist // (potential leak since these don't get cleaned up). - for (org.apache.logging.log4j.core.Logger currentLogger : currentLoggers) { + for (org.apache.logging.log4j.Logger currentLogger : currentLoggers) { if (logger.equals(currentLogger.getName())) { foundLogger = currentLogger; break; @@ -179,8 +179,8 @@ public List setLevel(String namespace, String level) { log.info("Setting level of namespace {} and children to {}", internalNameSpace, level); - Collection loggers = loggers(internalNameSpace); - Map nameToLevel = allLevels(); + var loggers = loggers(internalNameSpace); + var nameToLevel = allLevels(); List result = new ArrayList<>(); Configurator.setAllLevels(internalNameSpace, Level.valueOf(level)); @@ -220,10 +220,10 @@ private Collection loggers(String namespac return currentLoggers().values(); } - Collection result = new ArrayList<>(); - Map nameToLogger = currentLoggers(); - org.apache.logging.log4j.core.Logger ancestorLogger = lookupLogger(namespace); - Collection currentLoggers = nameToLogger.values(); + var result = new ArrayList(); + var nameToLogger = currentLoggers(); + var ancestorLogger = lookupLogger(namespace); + var currentLoggers = nameToLogger.values(); boolean present = false; for (org.apache.logging.log4j.core.Logger currentLogger : currentLoggers) { @@ -250,7 +250,7 @@ org.apache.logging.log4j.core.Logger lookupLogger(String logger) { // visible for testing Map currentLoggers() { LoggerContext context = (LoggerContext) LogManager.getContext(false); - Map results = new HashMap<>(); + var results = new HashMap(); context.getConfiguration().getLoggers().forEach((name, logger) -> results.put(name, loggerContext.getLogger(name))); context.getLoggerRegistry().getLoggers().forEach(logger -> results.put(logger.getName(), logger)); return results; From 09a9131a1b98737ab2495fed5eb43d56203c80bd Mon Sep 17 00:00:00 2001 From: "Piotr P. Karwasz" Date: Mon, 27 Jan 2025 21:20:14 +0100 Subject: [PATCH 6/8] Caught exceptions and log messages --- .../java/org/apache/kafka/connect/runtime/Loggers.java | 3 +++ .../src/main/scala/kafka/utils/LoggingController.scala | 10 +++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java index c3749cd3cda98..3767e31ac7ce7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java @@ -82,6 +82,9 @@ public static Loggers newInstance(Time time) { } catch (ClassCastException | LinkageError e) { log.info("No supported logging implementation found. Logging configuration endpoint will be disabled."); return new NoOpLoggers(time); + } catch (Exception e) { + log.warn("A problem occurred, while initializing the logging controller. Logging configuration endpoint will be disabled.", e); + return new NoOpLoggers(time); } } diff --git a/core/src/main/scala/kafka/utils/LoggingController.scala b/core/src/main/scala/kafka/utils/LoggingController.scala index 528e9a71aadbc..331c6772ca9b4 100755 --- a/core/src/main/scala/kafka/utils/LoggingController.scala +++ b/core/src/main/scala/kafka/utils/LoggingController.scala @@ -17,6 +17,7 @@ package kafka.utils +import com.typesafe.scalalogging.Logger import kafka.utils.LoggingController.ROOT_LOGGER import org.apache.kafka.common.utils.Utils import org.apache.logging.log4j.core.LoggerContext @@ -30,6 +31,8 @@ import scala.jdk.CollectionConverters._ object LoggingController { + private val logger = Logger[LoggingController] + /** * Note: In Log4j 1, the root logger's name was "root" and Kafka also followed that name for dynamic logging control feature. * @@ -42,7 +45,12 @@ object LoggingController { try { new Log4jCoreController } catch { - case _: ClassCastException | _: LinkageError => new NoOpController + case _: ClassCastException | _: LinkageError => + logger.info("No supported logging implementation found. Logging configuration endpoint will be disabled.") + new NoOpController + case e: Exception => + logger.warn("A problem occurred, while initializing the logging controller. Logging configuration endpoint will be disabled.", e) + new NoOpController } } From db05e49b49836e9392aaf1f3c0d6a27124ec4445 Mon Sep 17 00:00:00 2001 From: "Piotr P. Karwasz" Date: Mon, 27 Jan 2025 21:20:45 +0100 Subject: [PATCH 7/8] Add restrictions to `import-control` --- checkstyle/import-control-core.xml | 10 ++++++---- checkstyle/import-control.xml | 10 ++++++---- .../runtime/rest/resources/LoggingResource.java | 4 +--- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 3cfd0ce663cc1..ca5c7760aed80 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -1,6 +1,6 @@ + "-//Checkstyle//DTD ImportControl Configuration 1.4//EN" + "https://checkstyle.sourceforge.io/dtds/import_control_1_4.dtd">