MINOR: extract jointly owned parts of BrokerServer and ControllerServer#12837
MINOR: extract jointly owned parts of BrokerServer and ControllerServer#12837cmccabe merged 12 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
We also need the controller to be available in order to process controlled shutdown.
There was a problem hiding this comment.
Good point. I'll rework this comment a bit.
There was a problem hiding this comment.
Shutdown jointServer here if broker is null? (Same thing for controller below.)
There was a problem hiding this comment.
Currently, if broker is null, then jointServer cannot have been started.
However, just to make it simple to understand what is going on, I'll add an unconditional call to jointServer#stopForBroker here
There was a problem hiding this comment.
nit: it's a little clearer if you provide the argument names, at least for the fatal arg. Same for other factories below.
fatal = config.processRoles.contains(ControllerRole)There was a problem hiding this comment.
Pre-existing issue, but I think we also need to close the Metrics instance.
There was a problem hiding this comment.
I will add this to JointServer#stop.
It was previously being done in BrokerServer#shutdown, which handles broker-only and combined mode. But I guess in the case of testing ControllerServer, we leaked this resource.
There was a problem hiding this comment.
nit: extra \ at the end of the line intended?
There was a problem hiding this comment.
that is weird. I think the IDE inserted it?
It does nothing here. I will remove it
There was a problem hiding this comment.
We end up doing a bunch of stuff that KafkaRaftServer is already doing. Seems like it would be simpler to use KafkaRaftServer directly and get rid of all this logic to manage the lower-level components. That would also get us more consistent shutdown behavior.
There was a problem hiding this comment.
It's not simple at all to use KafkaRaftServer in most of our tests. Let me give an example. If someone shuts down a broker in a test by calling BrokerServer#shutdown, and the broker was a standalone broker, you have to somehow shut down the associated KafkaRaftServer, the associated snapshot generator, and the associated metadata loader. And clear the associated dynamic metadata.
If you maintain KafkaRaftServer more or less the way it is, where it owns a BrokerServer, ControllerServer, and some other stuff, and those owned objects don't have any pointers back to it, this is not really possible. You would have to either rewrite the tests in terms of KafkaRaftServer, which is not really feasible in the time we have available, or just accept that BrokerServer#shutdown is not going to clean up everything. I don't think either course of action really works here.
In general we haven't tested combined mode very much, so we've been able to handwave some of this. Or just accept resource leaks in the tests. But to do it correctly, we should acknowledge that in combined mode there is some joint state. Hence, JointServer.
I think this PR greatly simplifies the test code (and will do so for the other test harnesses we have). We cannot have each test harness manually managing the joint state, it is just too much (and grows over time). This is a clean way to do that: standalone mode = your own JointServer, combined mode = shared JointServer.
Do not use colocation in PlaintextAdminIntegrationTest.testCreateTopicsReturnsConfigs since QuorumTestHarness does not really support colocation Allow node configurations to be set for controllers
|
There were a few test flakes that don't look related. Will merge in trunk and get another test run, I guess. |
| val config = jointServer.config | ||
| val time = jointServer.time | ||
| val metrics = jointServer.metrics | ||
| def raftManager: KafkaRaftManager[ApiMessageAndVersion] = jointServer.raftManager |
There was a problem hiding this comment.
nit: why not make this a val also?
| JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq()); | ||
| } catch (Throwable e) { | ||
| log.error("Error creating broker {}", node.id(), e); | ||
| if (broker != null) broker.shutdown(); |
There was a problem hiding this comment.
nit: similar comment as before. I think JointServer still has resources that need to be cleaned up even if it doesn't get started (e.g. Metrics). Same thing for the controller above.
There was a problem hiding this comment.
I think Metrics is the only one. It looks like its constructor starts a thread and so on.
But yes, we should shut down all that. Fixed.
| if (kafkaRaftMetrics != null) { | ||
| kafkaRaftMetrics.close(); | ||
| } | ||
| if (memoryPool instanceof BatchMemoryPool) { |
There was a problem hiding this comment.
Perhaps we can let MemoryPool implement Closeable?
By the way, was this a leak? I think the only reference to the pool is in KafkaRaftClient, so does that mean we were leaking KafkaRaftClient references?
There was a problem hiding this comment.
I have been playing whack-a-mole with cases where we leak KafkaRaftClient instances in tests. Most of them seem to be related to the metrics closures dragging in giant objects, and the metrics not getting deleted properly. These are existing issues not caused by this PR.
This is a bit ugly but it stabilizes the build greatly so I think we should leave it in. I didn't want to make it a close() function since I didn't want to start thinking about dealing with a closed state in all of the memory pools.
Can we leave this for now and open a JIRA?
|
Test failures not related |
|
The test failures in the latest build are showing an NPE: |
| * their reference. We opted to use two booleans here rather than a reference count in order to | ||
| * make debugging easier and reduce the chance of resource leaks. | ||
| */ | ||
| class JointServer( |
There was a problem hiding this comment.
Would SharedServer be a better name?
There was a problem hiding this comment.
yes, I suppose SharedServer might be better.
Yes. raftManager is initialized in |
- BrokerServer#raftManager must be a def, not val - rename jointserver -> sharedserver - fix one more failure case in a test harness where we needed to call stopForController to clean up the metrics object
| val config = sharedServer.config | ||
| val time = sharedServer.time | ||
| val metrics = sharedServer.metrics | ||
| def raftManager: KafkaRaftManager[ApiMessageAndVersion] = sharedServer.raftManager |
There was a problem hiding this comment.
Can you add a comment here why this needs to be a def?
…er (apache#12837) Extract jointly owned parts of BrokerServer and ControllerServer into SharedServer. Shut down SharedServer when the last component using it shuts down. But make sure to stop the raft manager before closing the ControllerServer's sockets. This PR also fixes a memory leak where ReplicaManager was not removing some topic metric callbacks during shutdown. Finally, we now release memory from the BatchMemoryPool in KafkaRaftClient#close. These changes should reduce memory consumption while running junit tests. Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
Extract jointly owned parts of BrokerServer and ControllerServer into JointServer. Shut down JointServer when the last component using it shuts down. (But make sure to stop the raft manager before closing the ControllerServer's sockets.)