From 83726fe1e0d9c2909027ea86bf184ad5f8746e12 Mon Sep 17 00:00:00 2001 From: Bob Barrett Date: Wed, 20 Feb 2019 17:00:00 -0800 Subject: [PATCH 1/2] MINOR: Improve logging for alter log dirs --- core/src/main/scala/kafka/log/LogManager.scala | 1 + core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 2 +- .../main/scala/kafka/server/ReplicaAlterLogDirsThread.scala | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 508dcd0e68529..fcf483655106e 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -796,6 +796,7 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) + info(s"Replacing current log $sourceLog with $destLog") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 7f7ed1f1a6e99..2dce46cd891d8 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -136,6 +136,7 @@ abstract class AbstractFetcherThread(name: String, // deal with partitions with errors, potentially due to leadership changes private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) { if (partitions.nonEmpty) + debug(s"Handling errors for partitions $partitions") delayPartitions(partitions, fetchBackOffMs) } @@ -366,7 +367,6 @@ abstract class AbstractFetcherThread(name: String, } if (partitionsWithError.nonEmpty) { - debug(s"Handling errors for partitions $partitionsWithError") handlePartitionsWithErrors(partitionsWithError) } } diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 54bb2a2248740..f0107fd774969 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -208,7 +208,8 @@ class ReplicaAlterLogDirsThread(name: String, requestMap.put(topicPartition, new FetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(partitionFetchState.currentLeaderEpoch))) } catch { - case _: KafkaStorageException => + case e: KafkaStorageException => + info(s"Failed to build fetch for $topicPartition with $e") partitionsWithError += topicPartition } } From c3bee6ce9879c7f786ba1b47efb88cd313a062ed Mon Sep 17 00:00:00 2001 From: Bob Barrett Date: Tue, 26 Feb 2019 10:32:10 -0800 Subject: [PATCH 2/2] PR feedback and minor wording changes --- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../scala/kafka/server/AbstractFetcherThread.scala | 12 ++++++------ .../kafka/server/ReplicaAlterLogDirsThread.scala | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index fcf483655106e..cae47f724e679 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -796,7 +796,7 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Replacing current log $sourceLog with $destLog") + info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 2dce46cd891d8..959c2bfdd36f6 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -118,7 +118,7 @@ abstract class AbstractFetcherThread(name: String, val fetchStates = partitionStates.partitionStateMap.asScala val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(fetchStates) - handlePartitionsWithErrors(partitionsWithError) + handlePartitionsWithErrors(partitionsWithError, "maybeFetch") if (fetchRequestOpt.isEmpty) { trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request") @@ -134,9 +134,9 @@ abstract class AbstractFetcherThread(name: String, } // deal with partitions with errors, potentially due to leadership changes - private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) { + private def handlePartitionsWithErrors(partitions: Iterable[TopicPartition], methodName: String) { if (partitions.nonEmpty) - debug(s"Handling errors for partitions $partitions") + debug(s"Handling errors in $methodName for partitions $partitions") delayPartitions(partitions, fetchBackOffMs) } @@ -200,7 +200,7 @@ abstract class AbstractFetcherThread(name: String, } val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets) - handlePartitionsWithErrors(partitionsWithError) + handlePartitionsWithErrors(partitionsWithError, "truncateToEpochEndOffsets") updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) } } @@ -225,7 +225,7 @@ abstract class AbstractFetcherThread(name: String, } } - handlePartitionsWithErrors(partitionsWithError) + handlePartitionsWithErrors(partitionsWithError, "truncateToHighWatermark") updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets) } @@ -367,7 +367,7 @@ abstract class AbstractFetcherThread(name: String, } if (partitionsWithError.nonEmpty) { - handlePartitionsWithErrors(partitionsWithError) + handlePartitionsWithErrors(partitionsWithError, "processFetchRequest") } } diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index f0107fd774969..b5eb10d803763 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -209,7 +209,7 @@ class ReplicaAlterLogDirsThread(name: String, fetchSize, Optional.of(partitionFetchState.currentLeaderEpoch))) } catch { case e: KafkaStorageException => - info(s"Failed to build fetch for $topicPartition with $e") + debug(s"Failed to build fetch for $topicPartition", e) partitionsWithError += topicPartition } }