Add shuffle metrics for parallel indexing#10359
Conversation
| .computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength); | ||
| } | ||
|
|
||
| public Map<String, PerDatasourceShuffleMetrics> snapshot() |
There was a problem hiding this comment.
should this be renamed to snapshotAndReset or may be just reset ?
public Map<String, PerDatasourceShuffleMetrics> reset()
{
return Collections.unmodifiableMap(datasourceMetrics.getAndSet(new ConcurrentHashMap<>()));
}
There was a problem hiding this comment.
Sounds good. Changed to snapshotAndReset() since it sounds more intuitive to me.
| { | ||
| datasourceMetrics | ||
| .get() | ||
| .computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength); |
There was a problem hiding this comment.
it's still possible to miss an update in reporting because of race condition, right? Since the reference could be reset while the accumulation is happening.
There was a problem hiding this comment.
The race condition exists, but it should be fine because the missing update should be included in the next call to snapshotAndReset(). I added javadocs explaining why.
There was a problem hiding this comment.
I think this needs to use something like AtomicReference.getAndUpdate so that it isn't racy with the monitor/emitter? Though I'm not sure getAndUpdate or the similar methods are actually appropriate since they are supposed to be side-effect free, so I'm not really sure how exactly to resolve this.
Like, the potentially problematic scenario I'm thinking of is where shuffleRequested is called "before" snapshotAndReset. It seems like once AtomicReference.get has completed, snapshotAndReset can proceed, so now the shuffle monitor has the same concurrent map we are still actively updating, and it is preparing to build the metrics to emit. It seems super unlikely that it would be a problem, but unless I'm missing something it does seem possible.
There was a problem hiding this comment.
Ah, you guys are right. Will fix it.
There was a problem hiding this comment.
The problem is that any updates on the reference to datasourceMetrics should be synchronized with any updates on the map itself and its values. I could use ConcurrentHashMap.compute() if I didn't have to reset the reference to the map when a snapshot is taken, but I think it's needed since the map can keep growing over time otherwise. I'm not sure if there is any other way than using a big lock. I made this change, let me know if you have a better idea.
There was a problem hiding this comment.
the lock should suffice. shuffleRequested doesn't need to be a high throughput call.
| { | ||
| datasourceMetrics | ||
| .get() | ||
| .computeIfAbsent(supervisorTaskId, k -> new PerDatasourceShuffleMetrics()).accumulate(fileLength); |
There was a problem hiding this comment.
I think this needs to use something like AtomicReference.getAndUpdate so that it isn't racy with the monitor/emitter? Though I'm not sure getAndUpdate or the similar methods are actually appropriate since they are supposed to be side-effect free, so I'm not really sure how exactly to resolve this.
Like, the potentially problematic scenario I'm thinking of is where shuffleRequested is called "before" snapshotAndReset. It seems like once AtomicReference.get has completed, snapshotAndReset can proceed, so now the shuffle monitor has the same concurrent map we are still actively updating, and it is preparing to build the metrics to emit. It seems super unlikely that it would be a problem, but unless I'm missing something it does seem possible.
|
|
||
| /** | ||
| * This method is called whenever the monitoring thread takes a snapshot of the current metrics. The map inside | ||
| * AtomicReference will be reset to an empty map after this call. This is to return the snapshot metrics collected |
There was a problem hiding this comment.
This comment needs an update after the latest changes.
There was a problem hiding this comment.
Good catch. Fixed.
suneet-s
left a comment
There was a problem hiding this comment.
Overall, looks very nice! Just one ask about a feature flag. I don't have a strong opinion on the name of the metric, but would love to know your thoughts
| */ | ||
| public void shuffleRequested(String supervisorTaskId, long fileLength) | ||
| { | ||
| synchronized (lock) { |
There was a problem hiding this comment.
Since there is a risk of the locking introducing a slow down here because of contention, can we update this to include a feature flag check?
This way, if there are some unforeseen issues with locking, we can disable metric computation and reporting. I think a static feature flag - like a system property would be good enough for this use case.
There was a problem hiding this comment.
I don't think this locking would introduce any noticeable slow down, but feature flag sounds good. Now, ShuffleMetrics and ShuffleMonitor will work only when ShuffleMonitor is defined in druid.monitoring.monitors. Added some doc for that too.
There was a problem hiding this comment.
I like this approach a lot 🤘
| * whenever a snapshot is taken since the map can keep growing over time otherwise. For concurrent access pattern, | ||
| * see {@link #shuffleRequested} and {@link #snapshotAndReset()}. | ||
| */ | ||
| @GuardedBy("lock") |
There was a problem hiding this comment.
Just curious - why did you choose to use the guarded by pattern instead of a ConcurrentMap?
There was a problem hiding this comment.
There was some prior discussion about it. It was mainly because not only updating the datasourceMetrics map, but also updating PerDatasourceShuffleMetrics should be synchronized as well. For example, if it was updating PerDatasourceShuffleMetrics when snapshotAndReset() is called, it should guarantee that the updating will be done before snapshotAndReset().
There was a problem hiding this comment.
Ah - that makes sense. Thanks for the explanation
| { | ||
| private static final String SUPERVISOR_TASK_ID_DIMENSION = "supervisorTaskId"; | ||
| private static final String SHUFFLE_BYTES_KEY = "shuffle/bytes"; | ||
| private static final String SHUFFLE_REQUESTS_KEY = "shuffle/requests"; |
There was a problem hiding this comment.
other ingestion related metrics start with "ingest/" any thoughts on whether these metrics fall under the ingestion metrics category?
I was thinking about where the metrics would live in the docs which is why I was asking this question. I thought maybe it belonged here https://druid.apache.org/docs/latest/operations/metrics.html#ingestion-metrics-realtime-process ?
There was a problem hiding this comment.
Good question. The new metrics don't seem to belong to any existing section, so I added a new one. But our current doc doesn't seem organized well (for example, the metrics in the above link are not only for realtime processes, but for all task types as well), maybe we need to tidy up at some point after #10352 is done.
There was a problem hiding this comment.
Also, I modified the metrics to start with ingest/ similar to other ingestion metrics.
| { | ||
| // ShuffleMonitor cannot be registered dynamically, but can only via the static configuration (MonitorsConfig). | ||
| // As a result, it is safe to check only one time if it is registered in MonitorScheduler. | ||
| final Optional<ShuffleMonitor> maybeMonitor = monitorScheduler.findMonitor(ShuffleMonitor.class); |
There was a problem hiding this comment.
I see that MonitorScheduler has a removeMonitor method, and ShuffleMetrics is provided as a Singleton. Can someone remove the ShuffleMonitor while Druid is running? If they do that how would it impact ShuffleMetrics being reported
There was a problem hiding this comment.
Currently, a monitor can be removed when 1) the monitor() method returns false or 2) tasks de-register task-specific monitors such as TaskRealtimeMetricsMonitor which is used in the deprecated Tranquility. So, ShuffleMonitor cannot be removed once a node is started.
In the future, I think we may want to dynamically register and remove monitors (because it's cool). In that case, we probably need to check all monitor implementations we have if they have any issues to do that. We can come back to ShuffleMonitor later to handle the case you mentioned.
suneet-s
left a comment
There was a problem hiding this comment.
LGTM with some asks for unit tests
| @Override | ||
| public void configure(Binder binder) | ||
| { | ||
| Jerseys.addResource(binder, ShuffleResource.class); |
There was a problem hiding this comment.
Can you add a ModuleTest that validates the ShuffleResource and Optional<ShuffleMetrics>is injectable a. I think I've written AuthorizerMapperModuleTest that would be a similar example
| emitter.emit(metricBuilder.build(SHUFFLE_REQUESTS_KEY, perDatasourceShuffleMetrics.getShuffleRequests())); | ||
| }); | ||
| } | ||
| return true; |
There was a problem hiding this comment.
Should we add unit tests for this function?
There was a problem hiding this comment.
Oops, I thought I added one already. Added now.
| { | ||
| final ShuffleMonitor shuffleMonitor = new ShuffleMonitor(); | ||
| final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class); | ||
| Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class))) |
There was a problem hiding this comment.
nit:
| Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class))) | |
| Mockito.when(monitorScheduler.findMonitor(ShuffleMonitor.class)) |
suneet-s
left a comment
There was a problem hiding this comment.
Looks like the analyzeDependencies job is failing
[WARNING] Unused declared dependencies found:
[WARNING] org.checkerframework:checker-qual:jar:2.5.7:compile
| final ShuffleMonitor shuffleMonitor = new ShuffleMonitor(); | ||
| final MonitorScheduler monitorScheduler = Mockito.mock(MonitorScheduler.class); | ||
| Mockito.when(monitorScheduler.findMonitor(ArgumentMatchers.eq(ShuffleMonitor.class))) | ||
| .thenReturn(Optional.of(shuffleMonitor)); | ||
| injector = Guice.createInjector( | ||
| binder -> { | ||
| binder.bindScope(LazySingleton.class, Scopes.SINGLETON); | ||
| binder.bind(MonitorScheduler.class).toInstance(monitorScheduler); | ||
| binder.bind(IntermediaryDataManager.class).toInstance(Mockito.mock(IntermediaryDataManager.class)); | ||
| }, | ||
| shuffleModule | ||
| ); |
There was a problem hiding this comment.
nit: you can move this into a @Before method
There was a problem hiding this comment.
As monitorScheduler behaves differently in tests, I think it's better to make them in each test. I extracted other common codes as a util method.
Thanks, fixed now. |
* Add shuffle metrics for parallel indexing * javadoc and concurrency test * concurrency * fix javadoc * Feature flag * doc * fix doc and add a test * checkstyle * add tests * fix build and address comments
Description
Part of #10352. This PR adds these metrics for middleManagers. These metrics have the
supervisorTaskIdas their dimension.ingest/shuffle/bytes: Number of bytes shuffled per emissionPeriod.ingest/shuffle/requests: Number of shuffle requests per emissionPeriod.I haven't updated document yet, will add them with missing shuffle configurations together in a follow-up PR.
This PR has: