Skip to content

Replace Processing ExecutorService with QueryProcessingPool#11382

Merged
abhishekagarwal87 merged 7 commits intoapache:masterfrom
abhishekagarwal87:query-processing-pool
Jul 1, 2021
Merged

Replace Processing ExecutorService with QueryProcessingPool#11382
abhishekagarwal87 merged 7 commits intoapache:masterfrom
abhishekagarwal87:query-processing-pool

Conversation

@abhishekagarwal87
Copy link
Copy Markdown
Contributor

@abhishekagarwal87 abhishekagarwal87 commented Jun 24, 2021

Description

This PR refactors the code for QueryRunnerFactory#mergeRunners to accept a new interface called QueryProcessingPool instead of ExecutorService for concurrent execution of query runners. This interface will let custom extensions inject their own implementation for deciding which query-runner to prioritize first. The default implementation is the same as today that takes the priority of query into account. QueryProcessingPool can also be used as a regular executor service. It has a dedicated method for accepting query execution work so implementations can differentiate between regular async tasks and query execution tasks. This dedicated method also passes the QueryRunner object as part of the task information. This hook will let custom extensions carry any state from QuerySegmentWalker to QueryProcessingPool#mergeRunners which is not possible currently.


Key changed/added classes in this PR
  • QueryProcessingPool
  • QueryRunnerFactory
  • ForwardingQueryProcessingPool
  • DirectQueryProcessingPool
  • PrioritizedQueryRunnerCallable
  • AbstractPrioritizedQueryRunnerCallable

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.

/**
* @return - Returns this pool as an executor service that can be used for other asynchronous operations.
*/
ListeningExecutorService asExecutorService();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method leaks the internals of the interface. For instance, some caller may call this method and then pass the executor in other parts of code unintentionally. Also, if someone has to implement a QueryProcessingPool which is composite and contains multiple pools inside it, it would become hard to implement this interface.
Would it be better to rather have the original ExecutorService as is, and then inject that executor service to the DefaultProcessingPool? The ExecutorService interface is richer and common. QueryProcessingPool can be used in cases where a PrioritizedQueryRunnerCallable submission is needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like this?

@ExtensionPoint
public interface QueryProcessingPool extends ListeningExecutorService
{
  /**
   * Submits the query execution unit task for asynchronous execution.
   *
   * @param task - Task to be submitted.
   * @param <T>  - Task result type
   * @param <V>  - Query runner sequence type
   * @return - Future object for tracking the task completion.
   */
  <T, V> ListenableFuture<T> submitQueryExecution(PrioritizedQueryRunnerCallable<T, V> task);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was thinking like the snippet you mentioned above. (probably the interface can be called something like QueryRunnerProcessingPool since it only allows submit for QueryRunners currently - the submitQueryExecution can also become submit).
If this interface looks ok, then the DefaultProcessingPool's constructor signature can be : DefaultProcessingPool(@Processing ExecutorService)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you want to make QueryProcessingPool compatible with ExecutorService because of ConcurrentGrouper?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just clarifying my stance here, @rohangarg's idea sounds good to me, but I don't have strong preference here.

Copy link
Copy Markdown
Contributor Author

@abhishekagarwal87 abhishekagarwal87 Jun 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I do want to keep the method separate though. Either that or PrioritizedQueryRunnerCallable<T, V> task should not extend Callable. My reasoning is that then Implementations don't have to do instance of checks for differentiating between query execution tasks and other async tasks.

Yes

I assume you want to make QueryProcessingPool compatible with ExecutorService because of ConcurrentGrouper?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohangarg @jon-wei @jihoonson What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
maybe the naming could be more generic since this would be used in both ingestion and querying layer both.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On real-time nodes too, the pool is used in query execution.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

public @interface Processing
public interface PrioritizedQueryRunnerCallable<T, V> extends PrioritizedCallable<T>
{
QueryRunner<V> getRunner();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this method be used? I only see it used in a test in this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be used by the extensions to get the runner that the given query execution task corresponds to. That in turn can be used to fetch any state associated with the QueryRunner such as the segment info for example.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Can you add it in the javadoc of this method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing.

public @interface Processing
public interface PrioritizedQueryRunnerCallable<T, V> extends PrioritizedCallable<T>
{
QueryRunner<V> getRunner();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Can you add it in the javadoc of this method?

package org.apache.druid.query;

/**
* An implementation of {@link PrioritizedCallable} that also let's caller get access to associated {@link QueryRunner}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* An implementation of {@link PrioritizedCallable} that also let's caller get access to associated {@link QueryRunner}
* An implementation of {@link PrioritizedCallable} that also lets caller get access to associated {@link QueryRunner}

Comment on lines +25 to +26
* @param <T>
* @param <V>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please complete the javadoc for the parameters?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in next patch.

/**
* @return - Returns this pool as an executor service that can be used for other asynchronous operations.
*/
ListeningExecutorService asExecutorService();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you want to make QueryProcessingPool compatible with ExecutorService because of ConcurrentGrouper?

Copy link
Copy Markdown
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed only the design of new interfaces, QueryProcessingPool and PrioritizedQueryRunnerCallable. Their design LGTM.

package org.apache.druid.query;

/**
* An implementation of {@link PrioritizedCallable} that also let's caller get access to associated {@link QueryRunner}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* An implementation of {@link PrioritizedCallable} that also let's caller get access to associated {@link QueryRunner}
* An implementation of {@link PrioritizedCallable} that also lets caller get access to associated {@link QueryRunner}

/**
* @return - Returns this pool as an executor service that can be used for other asynchronous operations.
*/
ListeningExecutorService asExecutorService();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think making the processing a pool a kind of executor service makes sense, let's go with that if you agree

@abhishekagarwal87 abhishekagarwal87 merged commit 03a6a6d into apache:master Jul 1, 2021
jihoonson pushed a commit to jihoonson/druid that referenced this pull request Jul 12, 2021
…1382)

This PR refactors the code for QueryRunnerFactory#mergeRunners to accept a new interface called QueryProcessingPool instead of ExecutorService for concurrent execution of query runners. This interface will let custom extensions inject their own implementation for deciding which query-runner to prioritize first. The default implementation is the same as today that takes the priority of query into account. QueryProcessingPool can also be used as a regular executor service. It has a dedicated method for accepting query execution work so implementations can differentiate between regular async tasks and query execution tasks. This dedicated method also passes the QueryRunner object as part of the task information. This hook will let custom extensions carry any state from QuerySegmentWalker to QueryProcessingPool#mergeRunners which is not possible currently.
@abhishekagarwal87 abhishekagarwal87 deleted the query-processing-pool branch July 22, 2021 10:13
@clintropolis clintropolis added this to the 0.22.0 milestone Aug 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants