Skip to content

MINOR: Fix regression in MM2 task forwarding introduced by KAFKA-14783#13548

Merged
C0urante merged 1 commit intoapache:trunkfrom
gharris1727:kafka-14783-fix-forwarding
Apr 13, 2023
Merged

MINOR: Fix regression in MM2 task forwarding introduced by KAFKA-14783#13548
C0urante merged 1 commit intoapache:trunkfrom
gharris1727:kafka-14783-fix-forwarding

Conversation

@gharris1727
Copy link
Copy Markdown
Contributor

The DistributedHerder was computing the forwarded URL for publishing task configs incorrectly leading to 404s in MM2 distributed mode.

This regression appears in #13424 and presently only exists on trunk. This manifests as a return to pre-KIP-710 behavior, and causes the DedicatedMirrorIntegrationTest to fail whenever forwarding happens, making the test flake in >50% of runs.

It appears to be just a typo and not an intended change, and was hidden by the github diff when this function was split into two parts.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

The DistributedHerder was computing the forwarded URL for publishing task
configs incorrectly leading to 404s in MM2 distributed mode.

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Copy link
Copy Markdown
Contributor

@C0urante C0urante left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Greg! LGTM

@C0urante C0urante merged commit f252c75 into apache:trunk Apr 13, 2023
@C0urante C0urante deleted the kafka-14783-fix-forwarding branch April 13, 2023 13:50
@C0urante C0urante restored the kafka-14783-fix-forwarding branch April 13, 2023 15:16
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Apr 14, 2023

The PR build has 95 failures, many of them in a related area (at least on the surface):

Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin()
Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin()
Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin()
Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin()
Build / JDK 17 and Scala 2.13 / kafka.security.authorizer.AuthorizerTest.testAuthorizeWithPrefixedResource(String).quorum=kraft
Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.shouldRestoreState()
Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin()
Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin()
Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin()
Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin()
Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
Build / JDK 8 and Scala 2.12 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()
Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures()
Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.executionError
Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerWithLegacyMessageFormatIntegrationTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.DelegationTokenEndToEndAuthorizationTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.EndToEndClusterIdTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.GroupCoordinatorIntegrationTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.LogAppendTimeTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.PlaintextConsumerTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.PlaintextEndToEndAuthorizationTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.ProducerFailureHandlingTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.ProducerSendWhileDeletionTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.SaslClientsWithInvalidCredentialsTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.SaslGssapiSslEndToEndAuthorizationTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.SaslOAuthBearerSslEndToEndAuthorizationTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.SaslPlaintextConsumerTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.SaslSslConsumerTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.SslConsumerTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.SslProducerSendTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsBounceTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.UserClientIdQuotaTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.api.test.ProducerCompressionTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.integration.MinIsrConfigTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.metrics.MetricsTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.network.DynamicConnectionQuotaTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.security.authorizer.AclAuthorizerWithZkSaslTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.security.authorizer.AuthorizerTest.testDeleteAclOnPrefixedResource(String).quorum=kraft
Build / JDK 8 and Scala 2.12 / kafka.security.authorizer.AuthorizerTest.testAuthorizeByResourceTypeNoAclFoundOverride(String).quorum=kraft
Build / JDK 8 and Scala 2.12 / kafka.server.AdvertiseBrokerTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.AlterReplicaLogDirsRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.AlterUserScramCredentialsRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.CreateTopicsRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.DelegationTokenRequestsOnPlainTextTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.DelegationTokenRequestsWithDisableTokenFeatureTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.DeleteTopicsRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.DescribeClusterRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.DescribeLogDirsRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.DescribeUserScramCredentialsRequestNotAuthorizedTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.DynamicBrokerReconfigurationTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.EdgeCaseRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.FetchRequestDownConversionConfigTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.FetchRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.FetchRequestWithLegacyMessageFormatTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.FinalizedFeatureChangeListenerTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.GssapiAuthenticationTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.KafkaMetricReporterClusterIdTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.KafkaMetricReporterExceptionHandlingTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.KafkaMetricsReporterTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.LeaderElectionTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.ListOffsetsRequestWithRemoteStoreTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.LogOffsetTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.MetadataRequestBetweenDifferentIbpTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.MultipleListenersWithDefaultJaasContextTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.OffsetsForLeaderEpochRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.ProduceRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.ReplicaFetchTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.ReplicationQuotasTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.RequestQuotaTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.ServerGenerateBrokerIdTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.ServerStartupTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.StopReplicaRequestTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.TopicIdWithOldInterBrokerProtocolTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.epoch.EpochDrivenReplicationProtocolAcceptanceTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.server.epoch.LeaderEpochIntegrationTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.tools.GetOffsetShellTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.tools.MirrorMakerIntegrationTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.utils.ReplicationUtilsTest.initializationError
Build / JDK 8 and Scala 2.12 / kafka.zookeeper.ZooKeeperClientTest.initializationError
Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testSingleNodeCluster()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.DedicatedMirrorIntegrationTest.testMultiNodeCluster()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicationIsCreatingTopicsUsingProvidedForwardingAdmin()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testSyncTopicConfigUseProvidedForwardingAdmin()
Build / JDK 11 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor()

How come we merged this without mentioning why it's ok to do so?

@gharris1727
Copy link
Copy Markdown
Contributor Author

@ijuma The scope of this PR was small, addressing just one issue. We certainly have several flakey and failing tests that need to be addressed during this test stabilization period. Here are the relevant tickets i've found:

And outside of the connect/mirrormaker area:

  • AuthorizerTest failures are covered under https://issues.apache.org/jira/browse/KAFKA-14900
  • the initializationError tests all appear to be failing due to a leaked thread, I could not find a ticket for these.
    The other tests appear unrelated as this is a connect-specific change, and could be flakey tests that have not been identified yet.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Apr 14, 2023

But how do we even know that this is helping vs making it worse with so many failures?

@gharris1727
Copy link
Copy Markdown
Contributor Author

@ijuma I proposed this PR in the form I did because it restored a piece of code that was unintentionally changed. From an end user-perspective, the unintentional change was certainly bad, and reverting that change (in this PR) is certainly good. I'm sorry if the PR description did not make that clear, and I'll try to improve in the future.

I have looked into the other changes and identified their causes to be something distinct from this regression:

  • DedicatedMirrorIntegrationTest has been flakey since before this regression was committed, and affects both testSingleNodeCluster and testMultiNodeCluster (as seen in your list of test failures). The regression that this PR addresses only ever caused a failure in the MultiNode case, as this code path is only active in multi node clusters.

  • ExactlyOnceSourceIntegrationTest appears to be failing due to an error from the kafka TransactionManager, and surfaces with a different error than this regression would cause.

  • I have not yet completed my investigation on the ForwardingAdmin test failures, but those are consistently failing (not flakey like this regression), started failing on a different PR than the one which introduced this regression, and only affect the forwarding admin tests. Note: 'forwarding admin' is a feature unrelated to 'connect herder request forwarding', despite sharing the word 'forwarding'.

  • AuthorizerTest is failing due to an NPE in a completely unrelated part of the code base.

  • The initializationError failures are due to 3 leaked threads, which would surprise me if it was related to this URL formatting logic.

All in all, I saw an obvious typo in a previous PR, and opened a follow-up to fix it. I did not want to issue a "Fix every Connect test failure" as I think that would be harder to review and less modular if reverts needed to be made in the future.
How would you approach this differently?

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Apr 14, 2023

@gharris1727 I think what you did was fine. :) My concern is more on how we decided that it was ok to merge. I looked a bit more and I think the issues potentially started around here for a bunch of the tests:

#13373 (comment)

And it looks like a similar pattern happened there where we merged without a detailed analysis of the failures.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants