Skip to content

remove unnecessary synchronization overhead from complex Aggregators #8031

@himanshug

Description

@himanshug

Motivation

Many complex [Buffer]Aggregator implementations need to add synchronized access to internal data structures due to single-writer-multiple-reader concurrent usage of those during realtime indexing process where they are concurrently queried in addition to getting updated. However, that synchronization is totally unnecessary everywhere else but we pay its price anyway , for example at historical nodes while querying and in batch indexing tasks etc. Most recently this came up in apache/datasketches-java#263 .

Proposed changes

I haven't really done a prototype yet but I "think" these changes should be doable.

Add following methods (with default implementations) to AggregatorFactory .

  public Aggregator factorize(ColumnSelectorFactory metricFactory, boolean isConcurrent)
  {
    return factorize(metricFactory);
  }

  public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, boolean isConcurrent)
  {
    return factorizeBuffered(metricFactory);
  }

And, replace all calls inside druid code from AggregatorFactory.factorize[Buffered](ColumnSelectorFactory) to AggregatorFactory.factorize[Buffered](ColumnSelectorFactory, boolean isConcurrent) with right value for boolean isConcurrent specified .
IncrementalIndex would be made aware of its concurrency context (by changing existing variable concurrentEventAdd to isConcurrent and it being correctly specified in all places an IncrementalIndex instance is created ) so that it can set right value for isConcurrent when calling factorize[Buffered](..)
Relevant complex aggregator such as thetaSketch can then override newly added methods to add synchronization only for cases where it is really needed.

Rationale

One other option would be that aggregator implementors get additional contextual information (e.g. the nodeType they are running on ) and based on that enable/disable synchronization. However, proposed approach is simpler to use for extension writers and takes away the guessing game.
I also contemplated on adding an enum like

enum ConcurrencyContext {
  NONE
  MULTI_WRITE
  SINGLE_WRITE_MULTI_READ
 ...
 ..
}

and using it instead of boolean isConcurrent in newly introduced method arguments, but couldn't see any significant advantages of doing that for now.

Operational impact

None

Test plan (optional)

Existing unit/integration tests would cover the changes introduced.

Future work (optional)

Adjust relevant complex aggregator implementations to take advantage of newly added methods.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions