KAFKA-13874 Avoid synchronization in SocketServer metrics#13285
KAFKA-13874 Avoid synchronization in SocketServer metrics#13285chia7712 wants to merge 10 commits intoapache:trunkfrom
Conversation
|
Hello @chia7712 In The purpose of this lock on the entire SocketServer object is to ensure that If we remove this lock, the Perhaps a better way to solve this to 1\ reduce the granularity of the lock since we don't want a lock on entire SocketServer object but just on the processors and 2\ use shared locks and exclusive lock which will allow concurrent reads to happen and the same time entire that writes happen in isolation. |
|
@divijvaidya thanks for feedback!
pardon me. the mutable ArrayBuffer won't be modified after it is created. It seems to me the "mutable" won't hurt us here.
All we need to update metrics is the |
|
Hey @chia7712
I have a different view of this. Please correct me if I am wrong. Consider the following scenario which will go wrong with the changes in this PR:
|
@divijvaidya thanks for nice explanation. You are right. I will adopt the solution#1 - reduce the granularity of the lock. PTAL and thanks! |
|
I am afraid the latest approach will also not work. This is because there is a possibility that When I mentioned, option 1 of using fine grained locking, I actually implied locking on processors object instead of locking on entire SocketServer object. If we go down this path, we will have to change other places in the file to acquire this processor lock when mutation and we have also have to ensure that deadlock doesn't occur when trying to acquire SocketServer lock and Processors lock. Hence, my suggestion would be to opt for a lock-free concurrent access data structure for storing processors. Here's our requirement for such a data structure:
Based on the above, we can choose to use a ConcurrentHashMap or a CopyOnWriteArrayList for storing processors. |
|
@divijvaidya thanks for great feedback!
that is interesting. the mutation of processors is locked by
I will use |
| } | ||
|
|
||
| private[network] val processors = new ArrayBuffer[Processor]() | ||
| private[network] val processors = new CopyOnWriteArrayList[Processor]() |
There was a problem hiding this comment.
It would be nice if you could add a comment here on why we chose this data structure. Folks who look at this code in future will have a clear explanation of choices and tradeoffs we made for this.
| newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { | ||
| val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors) | ||
| newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => { | ||
| val dataPlaneProcessors = dataPlaneAcceptors.values.asScala.flatMap(a => a.processors.asScala) |
There was a problem hiding this comment.
We cannot convert processors to Scala since it transforms it into a mutable ArrayBuffer.
We probably don't need Scala transformations here. Could you please try something like this:
dataPlaneAcceptors.values.stream.flatMap(a => a.processors.stream)
(same comment for other places such as line 119)
There was a problem hiding this comment.
We cannot convert processors to Scala since it transforms it into a mutable ArrayBuffer.
pardon me. why we can't use "mutable ArrayBuffer" here?
| metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) | ||
| } | ||
| if (dataPlaneProcessors.isEmpty) { | ||
| }.toArray |
There was a problem hiding this comment.
This leads to object creation on every call to this metric (which is going to happen frequently). Do we really want this?
If I understand correctly, your motivation is to guard against scenarios where the number of processors between the time when we calculate ioWaitRatioMetricNames and when we calculate dataPlaneProcessors.size. To mitigate it, we can store the size in an int before this and then, we don't have to convert ioWaitRatioMetricNames to an array.
There was a problem hiding this comment.
To mitigate it, we can store the size in an int before this and then, we don't have to convert ioWaitRatioMetricNames to an array.
the ioWaitRatioMetricNames might reflect modifications of processors, so the actual size of processors may get changed when we calculate the average. For example, we cache the size = 5 but the processors could be increased to 6 when we summarize all processors.
divijvaidya
left a comment
There was a problem hiding this comment.
Thank you for making the changes @chia7712 . The code looks good to me!
|
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) |
|
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
The collections used by generating metrics are thread-safe already, so the synchronization is unnecessary
Committer Checklist (excluded from commit message)