Skip to content

MINOR: Refactor replica log dir fetching for improved logging#6313

Merged
hachikuji merged 5 commits intoapache:trunkfrom
hachikuji:alter-log-dirs-stick-to-partition
Feb 26, 2019
Merged

MINOR: Refactor replica log dir fetching for improved logging#6313
hachikuji merged 5 commits intoapache:trunkfrom
hachikuji:alter-log-dirs-stick-to-partition

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

In order to debug problems with log directory reassignments, it is helpful to know when the fetcher thread begins moving a particular partition. This patch refactors the fetch logic so that we stick to a selected partition as long as it is available and log a message when a different partition is selected.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@hachikuji
Copy link
Copy Markdown
Contributor Author

cc @lindong28 or @junrao

Let me know if this seems reasonable.

@lindong28
Copy link
Copy Markdown
Member

lindong28 commented Feb 23, 2019

Hey @hachikuji, the motivation makes sense. I personally find it that the code looks a bit complicated as we split the original method into four separate methods. Other than that the patch LGTM. I will leave Jun or other people to decide the style.

@hachikuji
Copy link
Copy Markdown
Contributor Author

@lindong28 Thanks for the comment. The most important thing is whether the change itself makes sense.

I agree that style leaves a lot of room for subjectivity. I personally find that smaller functions which are understandable in isolation lead to simpler and clearer composition of logic (and often better testing). For example, compare the original here:

  def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
    // Only include replica in the fetch request if it is not throttled.
    val maxPartitionOpt = partitionMap.filter { case (_, partitionFetchState) =>
      partitionFetchState.isReadyForFetch && !quota.isQuotaExceeded
    }.reduceLeftOption { (left, right) =>
      if ((left._1.topic > right._1.topic()) || (left._1.topic == right._1.topic() && left._1.partition() >= right._1.partition()))
        left
      else
        right
    }

    // Only move one replica at a time to increase its catch-up rate and thus reduce the time spent on moving any given replica
    // Replicas are ordered by their TopicPartition
    val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
    val partitionsWithError = mutable.Set[TopicPartition]()

    if (maxPartitionOpt.nonEmpty) {
      val (topicPartition, partitionFetchState) = maxPartitionOpt.get
      try {
        val logStartOffset = replicaMgr.futureLocalReplicaOrException(topicPartition).logStartOffset
        requestMap.put(topicPartition, new FetchRequest.PartitionData(partitionFetchState.fetchOffset, logStartOffset,
          fetchSize, Optional.of(partitionFetchState.currentLeaderEpoch)))
      } catch {
        case _: KafkaStorageException =>
          partitionsWithError += topicPartition
      }
    }

    val fetchRequestOpt = if (requestMap.isEmpty) {
      None
    } else {
      // Set maxWait and minBytes to 0 because the response should return immediately if
      // the future log has caught up with the current log of the partition
      Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap)
        .setMaxBytes(maxBytes))
    }
    ResultWithPartitions(fetchRequestOpt, partitionsWithError)
}

with the update:

  def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
    // Only include replica in the fetch request if it is not throttled.
    if (quota.isQuotaExceeded) {
      ResultWithPartitions(None, Set.empty)
    } else {
      selectPartitionToFetch(partitionMap) match {
        case Some((tp, fetchState)) =>
          buildFetchForPartition(tp, fetchState)
        case None =>
          ResultWithPartitions(None, Set.empty)
      }
    }
  }

I didn't have too much trouble understanding the logic from the original version, but I think this version is much more approachable for contributors who haven't spent as much time working with Kafka as you and I.

Copy link
Copy Markdown
Contributor

@viktorsomogyi viktorsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed it, LGTM.

@lindong28
Copy link
Copy Markdown
Member

lindong28 commented Feb 25, 2019

@hachikuji Yeah I am not strong on this preference. Just from my personal opinion, for most developers who do not have to change the implementation of buildFetch(...), they only need to read the Java doc of this method and thus there is no difference between these two implementations.

If the developer needs to understand/change the implementation of this method, then the developer needs to understand how the input is actually used to derive the output. In this case, even if buildFetch(...) itself can be much shorter by using 3 more private methods, the developer still needs to jump to the definition of each method to understand their implementation, which seems less convenient then having these in the same method. If the private method is short and not re-used more than once, then in many cases I find it simpler to inline the implementation in the original method with clear comments. This is the experience I have had when reviewing this patch, since I can not simply read the buildFetch(...) itself to be sure whether this is correct.

This can be just my personal preference regarding the style. The change itself looks good to me.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji : Thanks for the PR. LGTM. Just a couple of minor comments below.

// Only move one partition at a time to increase its catch-up rate and thus reduce the time spent on
// moving any given replica. Replicas are selected in ascending order (lexicographically for topics) from the
// partition that are ready to fetch. Once selected, we will continue fetching the same partition until it
// becomes unavailable.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unavailable => unavailable or is removed ?

assertEquals(1, fetchInfos.length)
assertEquals("Expected fetch request for largest partition", t1p1, fetchInfos.head._1)
assertEquals(160, fetchInfos.head._2.fetchOffset)
assertEquals("Expected fetch request for largest partition", t1p0, fetchInfos.head._1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for largest partition => for first partition ?

@hachikuji hachikuji merged commit 66a6fc7 into apache:trunk Feb 26, 2019
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* AK/trunk: (36 commits)
  KAFKA-7962: Avoid NPE for StickyAssignor (apache#6308)
  Address flakiness of CustomQuotaCallbackTest#testCustomQuotaCallback (apache#6330)
  KAFKA-7918: Inline generic parameters Pt. II: RocksDB Bytes Store and Memory LRU Caches (apache#6327)
  MINOR: fix parameter naming (apache#6316)
  KAFKA-7956 In ShutdownableThread, immediately complete the shutdown if the thread has not been started (apache#6218)
  MINOR: Refactor replica log dir fetching for improved logging (apache#6313)
  [TRIVIAL] Remove unused StreamsGraphNode#repartitionRequired (apache#6227)
  MINOR: Increase produce timeout to 120 seconds (apache#6326)
  KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store (apache#6293)
  MINOR: Fix line break issue in upgrade notes (apache#6320)
  KAFKA-7972: Use automatic RPC generation in SaslHandshake
  MINOR: Enable capture of full stack trace in StreamTask#process (apache#6310)
  KAFKA-7938: Fix test flakiness in DeleteConsumerGroupsTest (apache#6312)
  KAFKA-7937: Fix Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup (apache#6311)
  MINOR: Update docs to say 2.2 (apache#6315)
  KAFKA-7672 : force write checkpoint during StreamTask #suspend (apache#6115)
  KAFKA-7961; Ignore assignment for un-subscribed partitions (apache#6304)
  KAFKA-7672: Restoring tasks need to be closed upon task suspension (apache#6113)
  KAFKA-7864; validate partitions are 0-based (apache#6246)
  KAFKA-7492 : Updated javadocs for aggregate and reduce methods returning null behavior. (apache#6285)
  ...
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…#6313)

In order to debug problems with log directory reassignments, it is helpful to know when the fetcher thread begins moving a particular partition. This patch refactors the fetch logic so that we stick to a selected partition as long as it is available and log a message when a different partition is selected.

Reviewers: Viktor Somogyi-Vass <viktorsomogyi@gmail.com>, Dong Lin <lindong28@gmail.com>, Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants