diff --git a/core/src/main/java/org/apache/druid/guice/LifecycleModule.java b/core/src/main/java/org/apache/druid/guice/LifecycleModule.java index 695b5ee3c260..66cb8b2cde00 100644 --- a/core/src/main/java/org/apache/druid/guice/LifecycleModule.java +++ b/core/src/main/java/org/apache/druid/guice/LifecycleModule.java @@ -61,7 +61,7 @@ public class LifecycleModule implements Module * is materialized and injected, meaning that objects are not actually instantiated in dependency order. * Registering with the LifecyceModule, on the other hand, will instantiate the objects after the normal object * graph has already been instantiated, meaning that objects will be created in dependency order and this will - * only actually instantiate something that wasn't actually dependend upon. + * only actually instantiate something that wasn't actually depended upon. * * @param clazz the class to instantiate * @return this, for chaining. @@ -85,7 +85,7 @@ public static void register(Binder binder, Class clazz) * is materialized and injected, meaning that objects are not actually instantiated in dependency order. * Registering with the LifecyceModule, on the other hand, will instantiate the objects after the normal object * graph has already been instantiated, meaning that objects will be created in dependency order and this will - * only actually instantiate something that wasn't actually dependend upon. + * only actually instantiate something that wasn't actually depended upon. * * @param clazz the class to instantiate * @param annotation The annotation class to register with Guice @@ -110,7 +110,7 @@ public static void register(Binder binder, Class clazz, Class iterFromMake) * {@link MergeCombineAction} do a final merge combine of all the parallel computed results, again pushing * {@link ResultBatch} into a {@link BlockingQueue} with a {@link QueuePusher}. */ - @SuppressWarnings("serial") private static class MergeCombinePartitioningAction extends RecursiveAction { private final List> sequences; @@ -502,7 +501,6 @@ private int computeNumTasks() * how many times a task has continued executing, and utilized to compute a cumulative moving average of task run time * per amount yielded in order to 'smooth' out the continual adjustment. */ - @SuppressWarnings("serial") private static class MergeCombineAction extends RecursiveAction { private final PriorityQueue> pQueue; @@ -685,7 +683,6 @@ protected void compute() * majority of its time will be spent managed blocking until results are ready for each cursor, or will be incredibly * short lived if all inputs are already available. */ - @SuppressWarnings("serial") private static class PrepareMergeCombineInputsAction extends RecursiveAction { private final List> partition; diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index f80342b02dbb..2d1f490d4af0 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -128,39 +128,39 @@ Returns the serialized JSON of segments to load and drop for each Historical pro #### Segment Loading by Datasource Note that all _interval_ query parameters are ISO 8601 strings (e.g., 2016-06-27/2016-06-28). -Also note that these APIs only guarantees that the segments are available at the time of the call. +Also note that these APIs only guarantees that the segments are available at the time of the call. Segments can still become missing because of historical process failures or any other reasons afterward. ##### GET * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}` -Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given -datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set. -Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store -(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given +datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) -Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. If no used segments are found for the given inputs, this API returns `204 No Content` * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}` -Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource -over the given interval (or last 2 weeks if interval is not given). This does not include segment replication counts. `forceMetadataRefresh` is required to be set. -Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store -(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource +over the given interval (or last 2 weeks if interval is not given). This does not include segment replication counts. `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) -Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. -If no used segments are found for the given inputs, this API returns `204 No Content` +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +If no used segments are found for the given inputs, this API returns `204 No Content` * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}` -Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource -over the given interval (or last 2 weeks if interval is not given). This includes segment replication counts. `forceMetadataRefresh` is required to be set. -Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store -(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource +over the given interval (or last 2 weeks if interval is not given). This includes segment replication counts. `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) -Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. You can pass the optional query parameter `computeUsingClusterView` to factor in the available cluster services when calculating the segments left to load. See [Coordinator Segment Loading](#coordinator-segment-loading) for details. If no used segments are found for the given inputs, this API returns `204 No Content` @@ -464,7 +464,7 @@ Update overlord dynamic worker configuration. * `/druid/coordinator/v1/compaction/progress?dataSource={dataSource}` -Returns the total size of segments awaiting compaction for the given dataSource. +Returns the total size of segments awaiting compaction for the given dataSource. The specified dataSource must have [automatic compaction](../ingestion/automatic-compaction.md) enabled. ##### GET @@ -490,7 +490,7 @@ The `latestStatus` object has the following keys: * `/druid/coordinator/v1/compaction/status?dataSource={dataSource}` -Similar to the API `/druid/coordinator/v1/compaction/status` above but filters response to only return information for the {dataSource} given. +Similar to the API `/druid/coordinator/v1/compaction/status` above but filters response to only return information for the {dataSource} given. Note that {dataSource} given must have/had auto-compaction enabled. #### Automatic compaction configuration @@ -935,11 +935,21 @@ Returns segment information lists including server locations for the given query #### GET +* `/druid/v1/router/cluster` + +Returns a list of the servers registered within the cluster. Similar to +`/druid/coordinator/v1/cluster`, but visible on the Router to allow discovery of Druid +servers if all you have is the Router endpoint. + > Note: Much of this information is available in a simpler, easier-to-use form through the Druid SQL > [`INFORMATION_SCHEMA.TABLES`](../querying/sql-metadata-tables.md#tables-table), > [`INFORMATION_SCHEMA.COLUMNS`](../querying/sql-metadata-tables.md#columns-table), and > [`sys.segments`](../querying/sql-metadata-tables.md#segments-table) tables. +This API is primarily for debugging when setting up a cluster and things are broken +enough that SQL doesn't work: this API gives a direct view of the nodes registered +in ZooKeeper. + * `/druid/v2/datasources` Returns a list of queryable datasources. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index c521dbf3b899..a222807960ec 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -1202,12 +1202,12 @@ public CompactionTask build() } /** - * Compcation Task Tuning Config. + * Compaction Task Tuning Config. * * An extension of ParallelIndexTuningConfig. As of now, all this TuningConfig * does is fail if the TuningConfig contains * `awaitSegmentAvailabilityTimeoutMillis` that is != 0 since it is not - * supported for Compcation Tasks. + * supported for Compaction Tasks. */ public static class CompactionTuningConfig extends ParallelIndexTuningConfig { diff --git a/integration-tests/README.md b/integration-tests/README.md index e80151d92f6d..6723b42b74b7 100644 --- a/integration-tests/README.md +++ b/integration-tests/README.md @@ -23,13 +23,13 @@ Integration Testing To run integration tests, you have to specify the druid cluster the tests should use. -Druid comes with the mvn profile integration-tests -for setting up druid running in docker containers, and using that +Druid comes with the Maven profile integration-tests +for setting up druid running in Docker containers, and using that cluster to run the integration tests. -To use a druid cluster that is already running, use the -mvn profile int-tests-config-file, which uses a configuration file -describing the cluster. +To use a Druid cluster that is already running, use the +Maven profile `int-tests-config-file`, which uses a configuration file +to describe the cluster. Integration Testing Using Docker ------------------- @@ -40,7 +40,7 @@ have at least 4GiB of memory allocated to the docker engine. (You can verify it under Preferences > Resources > Advanced.) Also set the `DOCKER_IP` -environment variable to localhost on your system, as follows: +environment variable to localhost, as follows: ```bash export DOCKER_IP=127.0.0.1 @@ -52,73 +52,92 @@ Optionally, you can also set `APACHE_ARCHIVE_MIRROR_HOST` to override `https://a export APACHE_ARCHIVE_MIRROR_HOST=https://example.com/remote-generic-repo ``` -## Running tests againt auto brought up Docker containers +## Running Tests With Test-Run Docker Containers -> NOTE: This section describes how to start integration tests against docker containers which will be brought up automatically by following commands. -If you want to buid docker images and run tests separately, see the next section. +> NOTE: This section describes how to start integration tests against Docker containers which will be brought up automatically by following commands. +If you want to build Docker images and run tests separately, see the next section. + +To run all tests from a test group using Docker and Maven run the following command: -To run all tests from a test group using docker and mvn run the following command: -(list of test groups can be found at `integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java`) ```bash mvn verify -P integration-tests -Dgroups= ``` -To run only a single test using mvn run the following command: +To run only a single test using Maven: + ```bash mvn verify -P integration-tests -Dgroups= -Dit.test= ``` + The test group should always be set, as certain test setup and cleanup tasks are based on the test group. You can find -the test group for a given test as an annotation in the respective test class. +the test group for a given test as an annotation in the respective test class. A list of test groups can be found at +`integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java`. The annotation uses a string +constant defined in `TsetNGGroup.java`, be sure to use the constant value, not name. For example, if your test has the the annotation: -Add `-rf :druid-integration-tests` when running integration tests for the second time or later without changing +```java +@Test(groups = TestNGGroup.BATCH_INDEX) +``` + +then use the argument `-Dgroups=batch-index`. + +Add `-pl :druid-integration-tests` when running integration tests for the second time or later without changing the code of core modules in between to skip up-to-date checks for the whole module dependency tree. -Integration tests can also be run with either Java 8 or Java 11 by adding `-Djvm.runtime=#` to mvn command, where `#` +Sometimes the full test build can fail for reasons unrelated to your code. If that happens, a workaround +is to do a normal build, then do the `-pl` option above to run the integration tests. + +Integration tests can also be run with either Java 8 or Java 11 by adding `-Djvm.runtime=#` to the Maven command, where `#` can either be 8 or 11. -Druid's configuration (using Docker) can be overrided by providing `-Doverride.config.path=`. +Druid's configuration (using Docker) can be overridden by providing `-Doverride.config.path=`. The file must contain one property per line, the key must start with `druid_` and the format should be snake case. -Note that when bringing up docker containers through mvn and -Doverride.config.path is provided, additional +Note that when bringing up Docker containers through Maven and `-Doverride.config.path` is provided, additional Druid routers for security group integration test (permissive tls, no client auth tls, custom check tls) will not be started. -## Running tests against mannually brought up Docker containers +## Running Tests Against Docker Containers Started Manually 1. Build docker images. - From root module run maven command, run the following command: + From root module, run the following command: + ```bash mvn clean install -pl integration-tests -P integration-tests -Ddocker.run.skip=true -Dmaven.test.skip=true -Ddocker.build.hadoop=true ``` > **NOTE**: `-Ddocker.build.hadoop=true` is optional if you don't run tests against Hadoop. -2. Choose a docker-compose file to start containers. +2. Choose a `docker-compose` file to start containers. - There are a few different Docker compose yamls located in "docker" folder that could be used to start containers for different tests. + There are a few different Docker compose YAML files located in the `docker` folder that can be used to start containers for different tests. - To start basic Druid cluster (skip this if running Druid cluster with override configs): + ```bash docker-compose -f integration-tests/docker/docker-compose.yml up ``` - - To start Druid cluster with override configs + - To start Druid cluster with override configs: + ```bash OVERRIDE_ENV= docker-compose -f docker-compose.yml up ``` - - To start tests against Hadoop + - To start tests against Hadoop: + ```bash docker-compose -f docker-compose.druid-hadoop.yml up ``` - - To start tests againt security group + - To start tests againt security group: + ```bash docker-compose -f docker-compose.yml -f docker-compose.security.yml up ``` 3. Run tests. - Execute the following command from root module, where `` is the class name of a test, such as ITIndexerTest. + Execute the following command from root module, where `` is the class name of a test, such as `ITIndexerTest`: + ```bash mvn verify -P integration-tests -pl integration-tests -Ddocker.build.skip=true -Ddocker.run.skip=true -Dit.test= ``` @@ -146,11 +165,11 @@ The values shown above are for the default docker compose cluster. For other clu ## Docker Compose files -- docker-compose.base.yml +- `docker-compose.base.yml` - Base file that defines all containers for integration testing + Base file that defines all containers for integration testing. -- docker-compose.yml +- `docker-compose.yml` Defines a Druid cluster with default configuration that is used for running integration tests. @@ -168,7 +187,7 @@ The values shown above are for the default docker compose cluster. For other clu OVERRIDE_ENV=./environment-configs/test-groups/prepopulated-data DRUID_INTEGRATION_TEST_GROUP=query docker-compose -f docker-compose.yml up ``` -- docker-compose.security.yml +- `docker-compose.security.yml` Defines three additional Druid router services with permissive tls, no client auth tls, and custom check tls respectively. This is meant to be used together with docker-compose.yml and is only needed for the "security" group integration test. @@ -177,7 +196,7 @@ The values shown above are for the default docker compose cluster. For other clu docker-compose -f docker-compose.yml -f docker-compose.security.yml up ``` -- docker-compose.druid-hadoop.yml +- `docker-compose.druid-hadoop.yml` For starting Apache Hadoop 2.8.5 cluster with the same setup as the Druid tutorial. @@ -186,20 +205,20 @@ The values shown above are for the default docker compose cluster. For other clu ``` -## Tips & tricks for debugging and developing integration tests +## Tips & tricks for Debugging and Developing Integration Tests -### Useful mvn command flags +### Useful Maven command flags | Flag | Description | -|:---|---| -| -Ddocker.build.skip=true | Skip building the containers.

If you do not apply any change to Druid then you skip rebuilding the containers. This can save ~4 minutes. You need to build druid containers only once, after you can skip docker build step. | -| -Ddocker.run.skip=true | Skip starting docker containers.

This can save ~3 minutes by skipping building and bringing up the docker containers (Druid, Kafka, Hadoop, MYSQL, zookeeper, etc).
Please make sure that you actually do have these containers already running if using this flag.

Additionally, please make sure that the running containers are in the same state that the setup script (run_cluster.sh) would have brought it up in. | -| -Ddocker.build.hadoop=true | Build the hadoop image when either running integration tests or when building the integration test docker images without running the tests. | -| -Dstart.hadoop.docker=true | Start hadoop container when you need to run IT tests that utilize local hadoop docker. | +| :--- | --- | +| `-Ddocker.build.skip=true` | Skip building the containers.

If you do not apply any change to Druid then you skip rebuilding the containers. This can save ~4 minutes. You need to build Druid containers only once, after you can skip Docker build step. | +| `-Ddocker.run.skip=true` | Skip starting Docker containers.

This can save ~3 minutes by skipping building and bringing up the Docker containers (Druid, Kafka, Hadoop, MYSQL, Zookeeper, etc).
Please make sure that you actually do have these containers already running if using this flag.

Additionally, please make sure that the running containers are in the same state that the setup script (run_cluster.sh) would have brought it up in. | +| `-Ddocker.build.hadoop=true` | Build the Hadoop image when either running integration tests or when building the integration test docker images without running the tests. | +| `-Dstart.hadoop.docker=true` | Start a Hadoop container when you need to run IT tests that utilize a local Hadoop Docker. | -### Debugging Druid while running tests +### Debugging Druid While Running Tests -For your convenience, Druid processes running inside Docker have been debugging enabled at following debugging ports: +For your convenience, Druid processes running inside Docker have debugging enabled at following debugging ports: | Process | Remote Debugging Port | | --- | :---: | @@ -213,7 +232,7 @@ For your convenience, Druid processes running inside Docker have been debugging | Middlemanager | 5008 | | Overlord | 5009 | -You can use remote debugger(such as via IntelliJ IDEA's Remote Configuration) to debug the corresponding Druid process at above port. +You can use a remote debugger (such as via IntelliJ IDEA's Remote Configuration) to debug the corresponding Druid process at above port. Running Tests Using A Quickstart Cluster ------------------- @@ -226,11 +245,11 @@ machine. > **NOTE**: Not all features run by default on a quickstart cluster, so it may not make sense to run the entire test suite against this configuration. > -> Quickstart does not run with ssl, so to trick the integration tests we specify the `*_tls_url` in the config to be the same as the http url. +> Quickstart does not run with ssl, so to trick the integration tests we specify the `*_tls_url` in the config to be the same as the HTTP url. Make sure you have at least 6GiB of memory available before you run the tests. -The tests rely on files in the test/resources folder to exist under the path /resources, +The tests rely on files in the `test/resources` folder to exist under the path `/resources`, so create a symlink to make them available: ```bash @@ -238,17 +257,20 @@ ln -s ${DRUID_HOME}/integration-tests/src/test/resources /resources ``` Set the cluster config file environment variable to the quickstart config: + ```bash export CONFIG_FILE=${DRUID_HOME}/integration-tests/quickstart-it.json ``` The test group `quickstart-compatible` has tests that have been verified to work against the quickstart cluster. There may be more tests that work, if you find that they do, please mark it as quickstart-compatible -(TestNGGroup#QUICKSTART_COMPATIBLE) and open a PR. -If you find some integration tests do not work, look at the docker files to see what setup they do. You may need to +(`TestNGGroup#QUICKSTART_COMPATIBLE`) and open a PR. + +If you find some integration tests do not work, look at the Docker files to see what setup they do. You may need to do similar steps to get the test to work. Then run the tests using a command similar to: + ```bash mvn verify -P int-tests-config-file -Dit.test= # Run all integration tests that have been verified to work against a quickstart cluster. @@ -260,7 +282,7 @@ Running Tests Using A Configuration File for Any Cluster Make sure that you have at least 6GiB of memory available before you run the tests. -To run tests on any druid cluster that is already running, create a configuration file: +To run tests on any Druid cluster that is already running, create a configuration file: { "broker_host": "", @@ -278,109 +300,122 @@ To run tests on any druid cluster that is already running, create a configuratio } Set the environment variable `CONFIG_FILE` to the name of the configuration file: + ``` export CONFIG_FILE= ``` -To run all tests from a test group using mvn run the following command: -(list of test groups can be found at integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java) +To run all tests from a test group using Maven run the following command: + ```bash mvn verify -P int-tests-config-file -Dgroups= ``` -To run only a single test using mvn run the following command: +To run only a single test using Maven run the following command: + ```bash mvn verify -P int-tests-config-file -Dit.test= ``` -Running a Test That Uses Cloud +Running a Test That Uses the Cloud ------------------- -The integration test that indexes from Cloud or uses Cloud as deep storage is not run as part +The integration test that indexes from the cloud or uses the cloud as deep storage is not run as part of the integration test run discussed above. Running these tests requires the user to provide -their own Cloud. +their own cloud. Currently, the integration test supports Amazon Kinesis, Google Cloud Storage, Amazon S3, and Microsoft Azure. -These can be run by providing "kinesis-index", "kinesis-data-format", "gcs-deep-storage", "s3-deep-storage", or "azure-deep-storage" -to -Dgroups for Amazon Kinesis, Google Cloud Storage, Amazon S3, and Microsoft Azure respectively. Note that only -one group should be run per mvn command. +These can be run by providing `kinesis-index`, `kinesis-data-format`, `gcs-deep-storage`, `s3-deep-storage`, or `azure-deep-storage` +to `-Dgroups` for Amazon Kinesis, Google Cloud Storage, Amazon S3, and Microsoft Azure respectively. Note that only +one group should be run per Maven command. + +For all the cloud Integration tests, the following will also need to be provided: -For all the Cloud Integration tests, the following will also need to be provided: -1) Provide -Doverride.config.path= with your Cloud credentials/configs set. See -integration-tests/docker/environment-configs/override-examples/ directory for env vars to provide for each Cloud. +1) Provide `-Doverride.config.path=` with your Cloud credentials/configs set. See +`integration-tests/docker/environment-configs/override-examples/` directory for env vars to provide for each cloud. For Amazon Kinesis, the following will also need to be provided: -1) Provide -Ddruid.test.config.streamEndpoint= with the endpoint of your stream set. -For example, kinesis.us-east-1.amazonaws.com + +1) Provide `-Ddruid.test.config.streamEndpoint=` with the endpoint of your stream set. +For example, `kinesis.us-east-1.amazonaws.com` For Google Cloud Storage, Amazon S3, and Microsoft Azure, the following will also need to be provided: -1) Set the bucket and path for your test data. This can be done by setting -Ddruid.test.config.cloudBucket and --Ddruid.test.config.cloudPath in the mvn command or setting "cloud_bucket" and "cloud_path" in the config file. -2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json -located in integration-tests/src/test/resources/data/batch_index/json to your Cloud storage at the location set in step 1. + +1) Set the bucket and path for your test data. This can be done by setting `-Ddruid.test.config.cloudBucket` and +`-Ddruid.test.config.cloudPath` in the Maven command or setting `cloud_bucket` and `cloud_path` in the config file. +2) Copy `wikipedia_index_data1.json`, `wikipedia_index_data2.json`, and `wikipedia_index_data3.json ` +located in `integration-tests/src/test/resources/data/batch_index/json` to your cloud storage at the location set in step 1. For Google Cloud Storage, in addition to the above, you will also have to: -1) Provide -Dresource.file.dir.path= with folder that contains GOOGLE_APPLICATION_CREDENTIALS file + +1) Provide `-Dresource.file.dir.path=` with folder that contains `GOOGLE_APPLICATION_CREDENTIALS` file For example, to run integration test for Google Cloud Storage: + ```bash mvn verify -P integration-tests -Dgroups=gcs-deep-storage -Doverride.config.path= -Dresource.file.dir.path= -Ddruid.test.config.cloudBucket=test-bucket -Ddruid.test.config.cloudPath=test-data-folder/ ``` - Running a Test That Uses Hadoop ------------------- -The integration test that indexes from hadoop is not run as part -of the integration test run discussed above. This is because druid -test clusters might not, in general, have access to hadoop. +The integration test that indexes from Hadoop is not run as part +of the integration test run discussed above. This is because Druid +test clusters might not, in general, have access to Hadoop. This also applies to integration test that uses Hadoop HDFS as an inputSource or as a deep storage. To run integration test that uses Hadoop, you will have to run a Hadoop cluster. This can be done in two ways: -1) Run Druid Docker test clusters with Hadoop container by passing -Dstart.hadoop.docker=true to the mvn command. If you have not already built the hadoop image, you will also need to add -Ddocker.build.hadoop=true to the mvn command. +1) Run Druid Docker test clusters with Hadoop container by passing `-Dstart.hadoop.docker=true` to the Maven command. +If you have not already built the hadoop image, you will also need to add -Ddocker.build.hadoop=true to the mvn command. 2) Run your own Druid + Hadoop cluster and specified Hadoop configs in the configuration file (CONFIG_FILE). -Currently, hdfs-deep-storage and other -deep-storage integration test groups can only be run with -Druid Docker test clusters by passing -Dstart.hadoop.docker=true to start Hadoop container. -You will also have to provide -Doverride.config.path= with your Druid's Hadoop configs set. -See integration-tests/docker/environment-configs/override-examples/hdfs directory for example. +Currently, `hdfs-deep-storage` and other `-deep-storage` integration test groups can only be run with +Druid Docker test clusters by passing `-Dstart.hadoop.docker=true` to start Hadoop container. +You will also have to provide `-Doverride.config.path=` with your Druid's Hadoop configs set. +See `integration-tests/docker/environment-configs/override-examples/hdfs` directory for example. Note that if the integration test you are running also uses other cloud extension (S3, Azure, GCS), additional credentials/configs may need to be set in the same file as your Druid's Hadoop configs set. -If you are running ITHadoopIndexTest with your own Druid + Hadoop cluster, please follow the below steps: -- Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json - located in integration-tests/src/test/resources/data/batch_index/json to your HDFS at /batch_index/json/ -- Copy batch_hadoop.data located in integration-tests/src/test/resources/data/batch_index/hadoop_tsv to your HDFS - at /batch_index/hadoop_tsv/ +If you are running `ITHadoopIndexTest` with your own Druid + Hadoop cluster, please follow the below steps: + +- Copy `wikipedia_index_data1.json`, `wikipedia_index_data2.json`, and `wikipedia_index_data3.json` + located in `integration-tests/src/test/resources/data/batch_index/json` to your HDFS at `/batch_index/json/` +- Copy `batch_hadoop.data` located in `integration-tests/src/test/resources/data/batch_index/hadoop_tsv` to your HDFS + at `/batch_index/hadoop_tsv/` + If using the Docker-based Hadoop container, the steps above are automatically done by the integration tests. -When running the Hadoop tests, you must set `-Dextra.datasource.name.suffix=''`, due to https://github.com/apache/druid/issues/9788. +When running the Hadoop tests, you must set `-Dextra.datasource.name.suffix=''`, due to `https://github.com/apache/druid/issues/9788`. + +Option 1: Run the test using Maven (using the bundled Docker-based Hadoop cluster and building docker images at runtime): -Option 1: Run the test using mvn (using the bundled Docker-based Hadoop cluster and building docker images at runtime): ```bash mvn verify -P integration-tests -Dit.test=ITHadoopIndexTest -Dstart.hadoop.docker=true -Ddocker.build.hadoop=true -Doverride.config.path=docker/environment-configs/override-examples/hdfs -Dextra.datasource.name.suffix='' ``` -Option 2: Run the test using mvn (using the bundled Docker-based hadoop cluster and not building images at runtime): +Option 2: Run the test using Maven (using the bundled Docker-based Hadoop cluster and not building images at runtime): + ```bash mvn verify -P integration-tests -Dit.test=ITHadoopIndexTest -Dstart.hadoop.docker=true -Ddocker.build.skip=true -Doverride.config.path=docker/environment-configs/override-examples/hdfs -Dextra.datasource.name.suffix='' ``` -Option 3: Run the test using mvn (using the bundled Docker-based hadoop cluster and when you have already started all containers) +Option 3: Run the test using Maven (using the bundled Docker-based Hadoop cluster and when you have already started all containers): + ```bash mvn verify -P integration-tests -Dit.test=ITHadoopIndexTest -Ddocker.run.skip=true -Ddocker.build.skip=true -Doverride.config.path=docker/environment-configs/override-examples/hdfs -Dextra.datasource.name.suffix='' ``` -Option 4: Run the test using mvn (using config file for existing Hadoop cluster): +Option 4: Run the test using Maven (using config file for existing Hadoop cluster): + ```bash mvn verify -P int-tests-config-file -Dit.test=ITHadoopIndexTest -Dextra.datasource.name.suffix='' ``` In some test environments, the machine where the tests need to be executed -cannot access the outside internet, so mvn cannot be run. In that case, -do the following instead of running the tests using mvn: +cannot access the outside internet, so Maven cannot be run. In that case, +do the following instead of running the tests using Maven: ### Compile druid and the integration tests -On a machine that can do mvn builds: +On a machine that can do Maven builds: ```bash cd druid @@ -391,7 +426,7 @@ mvn dependency:copy-dependencies package ### Put the compiled test code into your test cluster -Copy the integration-tests directory to the test cluster. +Copy the `integration-tests` directory to the test cluster. ### Set CLASSPATH @@ -410,70 +445,75 @@ java -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.test.config.type=configFi Writing a New Test ------------------- -## What should we cover in integration tests +## What to Cover in Integration Tests For every end-user functionality provided by druid we should have an integration-test verifying the correctness. -## Rules to be followed while writing a new integration test +## Rules While Writing a new Integration Test -### Every Integration Test must follow these rules: +### Every Integration Test Must Follow these Rules: -1) Name of the test must start with a prefix "IT" -2) A test should be independent of other tests -3) Tests are to be written in TestNG style ([http://testng.org/doc/documentation-main.html#methods](http://testng.org/doc/documentation-main.html#methods)) -4) If a test loads some data it is the responsibility of the test to clean up the data from the cluster +1) Name of the test must start with a prefix "IT". +2) A test should be independent of other tests. +3) Tests are to be written in TestNG style ([http://testng.org/doc/documentation-main.html#methods](http://testng.org/doc/documentation-main.html#methods)). +4) If a test loads some data it is the responsibility of the test to clean up the data from the cluster. -### How to use Guice Dependency Injection in a test +### How to Use Guice Dependency Injection in a Test -A test can access different helper and utility classes provided by test-framework in order to access Coordinator,Broker etc.. +A test can access different helper and utility classes provided by test-framework in order to access the Coordinator, Broker etc. To mark a test be able to use Guice Dependency Injection - Annotate the test class with the below annotation ```java @Guice(moduleFactory = DruidTestModuleFactory.class) ``` + This will tell the test framework that the test class needs to be constructed using guice. -### Helper Classes provided +### Helper Classes Provided -1) IntegrationTestingConfig - configuration of the test -2) CoordinatorResourceTestClient - httpclient for coordinator endpoints -3) OverlordResourceTestClient - httpclient for indexer endpoints -4) QueryResourceTestClient - httpclient for broker endpoints +1) `IntegrationTestingConfig` - configuration of the test +2) `CoordinatorResourceTestClient` - HTTP client for coordinator endpoints +3) `OverlordResourceTestClient` - HTTP client for indexer endpoints +4) `QueryResourceTestClient` - HTTP client for broker endpoints -### Static Utility classes +### Static Utility Classes -1) RetryUtil - provides methods to retry an operation until it succeeds for configurable no. of times -2) FromFileTestQueryHelper - reads queries with expected results from file and executes them and verifies the results using ResultVerifier +1) `RetryUtil` - provides methods to retry an operation until it succeeds for configurable no. of times. +2) `FromFileTestQueryHelper` - reads queries with expected results from file and executes them and verifies the results using `esultVerifier`. -Refer ITIndexerTest as an example on how to use dependency Injection +Refer to `ITIndexerTest` as an example on how to use dependency injection. -### Running test methods in parallel +### Running Test Methods in Parallel By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding -the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests" -test tag section in integration-tests/src/test/resources/testng.xml. TestNG uses two parameters, i.e., +the given class/package from the `AllSerializedTests` test tag section and including it in the `AllParallelizedTests` +test tag section in `integration-tests/src/test/resources/testng.xml`. TestNG uses two parameters, i.e., `thread-count` and `data-provider-thread-count`, for parallel test execution, which are both set to 2 for Druid integration tests. -For test using parallel execution with data provider, you will also need to set `@DataProvider(parallel = true)` -on your data provider method in your test class. Note that for test using parallel execution with data provider, the test +For tests using parallel execution with a data provider, you will also need to set `@DataProvider(parallel = true)` +on your data provider method in your test class. Note that for tests using parallel execution with a data provider, the test class does not need to be in the "AllParallelizedTests" test tag section and if it is in the "AllParallelizedTests" -test tag section it will actually be run with `thread-count` times `data-provider-thread-count` threads. +test tag section will actually be run with `thread-count` times `data-provider-thread-count` threads. You may want to modify those values for faster execution. -See https://testng.org/doc/documentation-main.html#parallel-running and https://testng.org/doc/documentation-main.html#parameters-dataproviders for details. +See [Parallel Running](https://testng.org/doc/documentation-main.html#parallel-running) and +[Data Provider Parameters](https://testng.org/doc/documentation-main.html#parameters-dataproviders) for details. -Please be mindful when adding tests to the "AllParallelizedTests" test tag that the tests can run in parallel with -other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers, -test does not use excessive memory starving other concurent task, test does not modify and/or use other task, -supervisor, datasource it did not create. +Please be mindful when adding tests to the `AllParallelizedTests` test tag that the tests can run in parallel with +other tests from the same class at the same time. i.e. -### Limitation of Druid cluster in Travis environment +* The test does not modify/restart/stop the druid cluster or other dependency containers, +* The test does not use excessive memory starving other concurrent task, and +* The test does not modify and/or use other task, supervisor, datasource it did not create. -By default, integration tests are run in Travis environment on commits made to open PR. These integration test jobs are -required to pass for a PR to be elligible to be merged. Here are known issues and limitations to the Druid docker cluster +### Limitation of Druid cluster in teh Travis environment + +By default, integration tests are run in the Travis environment on commits made to open PR. These integration test jobs are +required to pass for a PR to be eligible to be merged. Here are known issues and limitations to the Druid docker cluster running in Travis machine that may cause the tests to fail: -- Number of concurrent running tasks. Although the default Druid cluster config sets the maximum number of tasks (druid.worker.capacity) to 10, + +- Number of concurrent running tasks. Although the default Druid cluster config sets the maximum number of tasks (`druid.worker.capacity`) to 10, the actual maximum can be lowered depending on the type of the tasks. For example, running 2 range partitioning compaction tasks with 2 subtasks each (for a total of 6 tasks) concurrently can cause the cluster to intermittently fail. This can cause the Travis job to become stuck until it timeouts (50 minutes) and/or terminates after 10 mins of not receiving new output. diff --git a/integration-tests/src/main/java/org/apache/druid/cli/CliCustomNodeRole.java b/integration-tests/src/main/java/org/apache/druid/cli/CliCustomNodeRole.java index be8a5be50941..5542bbea95e0 100644 --- a/integration-tests/src/main/java/org/apache/druid/cli/CliCustomNodeRole.java +++ b/integration-tests/src/main/java/org/apache/druid/cli/CliCustomNodeRole.java @@ -66,6 +66,7 @@ public class CliCustomNodeRole extends ServerRunnable public static final String SERVICE_NAME = "custom-node-role"; public static final int PORT = 9301; public static final int TLS_PORT = 9501; + public static final NodeRole NODE_ROLE = new NodeRole(CliCustomNodeRole.SERVICE_NAME); public CliCustomNodeRole() { @@ -75,7 +76,7 @@ public CliCustomNodeRole() @Override protected Set getNodeRoles(Properties properties) { - return ImmutableSet.of(new NodeRole(CliCustomNodeRole.SERVICE_NAME)); + return ImmutableSet.of(NODE_ROLE); } @Override diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/CustomNodeRoleClientModule.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/CustomNodeRoleClientModule.java new file mode 100644 index 000000000000..363727600f2e --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/CustomNodeRoleClientModule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.guice; + +import com.fasterxml.jackson.databind.Module; +import com.google.inject.Binder; +import org.apache.druid.cli.CliCustomNodeRole; +import org.apache.druid.discovery.NodeRoles; +import org.apache.druid.initialization.DruidModule; + +import java.util.Collections; +import java.util.List; + +/** + * Super-simple "client" for the custom node role which defines + * the node role so that REST APIs and the system tables are + * aware of this role. + */ +public class CustomNodeRoleClientModule implements DruidModule +{ + @Override + public void configure(Binder binder) + { + NodeRoles.bindRole(binder, CliCustomNodeRole.NODE_ROLE); + } + + @Override + public List getJacksonModules() + { + return Collections.emptyList(); + } +} diff --git a/integration-tests/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/integration-tests/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule index fa9b4c128301..701f6ff010f0 100644 --- a/integration-tests/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule +++ b/integration-tests/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -14,3 +14,4 @@ # limitations under the License. org.apache.druid.testing.guice.ITTLSCertificateCheckerModule +org.apache.druid.testing.guice.CustomNodeRoleClientModule diff --git a/integration-tests/src/test/java/org/apache/druid/tests/leadership/ITHighAvailabilityTest.java b/integration-tests/src/test/java/org/apache/druid/tests/leadership/ITHighAvailabilityTest.java index 78e071b90a74..49a083409100 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/leadership/ITHighAvailabilityTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/leadership/ITHighAvailabilityTest.java @@ -31,6 +31,7 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; @@ -55,6 +56,7 @@ import java.net.URL; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -180,6 +182,44 @@ public void testCustomDiscovery() ); } + @Test + public void testRouterCluster() + { + ITRetryUtil.retryUntil( + () -> { + try { + return verifyRouterCluster(); + } + catch (Throwable t) { + return false; + } + }, + true, + RETRY_DELAY, + NUM_RETRIES, + "router cluster API validated" + ); + } + + @Test + public void testCoordinatorCluster() + { + ITRetryUtil.retryUntil( + () -> { + try { + return verifyCoordinatorCluster(); + } + catch (Throwable t) { + return false; + } + }, + true, + RETRY_DELAY, + NUM_RETRIES, + "coordinator cluster API validated" + ); + } + private int testSelfDiscovery(Collection nodes) throws MalformedURLException, ExecutionException, InterruptedException { @@ -262,18 +302,21 @@ private static String fillTemplate(IntegrationTestingConfig config, String templ {"host":"%%BROKER%%","server_type":"broker", "is_leader": %%NON_LEADER%%}, {"host":"%%COORDINATOR_ONE%%","server_type":"coordinator", "is_leader": %%COORDINATOR_ONE_LEADER%%}, {"host":"%%COORDINATOR_TWO%%","server_type":"coordinator", "is_leader": %%COORDINATOR_TWO_LEADER%%}, + {"host":"%%CUSTOM_ROLE%%","server_type":"custom-node-role", "is_leader": %%NON_LEADER%%}, {"host":"%%OVERLORD_ONE%%","server_type":"overlord", "is_leader": %%OVERLORD_ONE_LEADER%%}, {"host":"%%OVERLORD_TWO%%","server_type":"overlord", "is_leader": %%OVERLORD_TWO_LEADER%%}, {"host":"%%ROUTER%%","server_type":"router", "is_leader": %%NON_LEADER%%} */ String working = template; + String customNode = "druid-custom-node-role"; working = StringUtils.replace(working, "%%OVERLORD_ONE%%", config.getOverlordInternalHost()); working = StringUtils.replace(working, "%%OVERLORD_TWO%%", config.getOverlordTwoInternalHost()); working = StringUtils.replace(working, "%%COORDINATOR_ONE%%", config.getCoordinatorInternalHost()); working = StringUtils.replace(working, "%%COORDINATOR_TWO%%", config.getCoordinatorTwoInternalHost()); working = StringUtils.replace(working, "%%BROKER%%", config.getBrokerInternalHost()); working = StringUtils.replace(working, "%%ROUTER%%", config.getRouterInternalHost()); + working = StringUtils.replace(working, "%%CUSTOM_ROLE%%", customNode); if (isOverlordOneLeader(config, overlordLeader)) { working = StringUtils.replace(working, "%%OVERLORD_ONE_LEADER%%", "1"); working = StringUtils.replace(working, "%%OVERLORD_TWO_LEADER%%", "0"); @@ -310,4 +353,86 @@ private static String transformHost(String host) { return StringUtils.format("%s:", host); } + + private boolean verifyCoordinatorCluster() + { + Map results; + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.GET, + new URL(StringUtils.format( + "%s/druid/coordinator/v1/cluster", + config.getCoordinatorUrl() + )) + ), + StatusResponseHandler.getInstance() + ).get(); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching cluster members from [%s] status [%s] content [%s]", + config.getCoordinatorUrl(), + response.getStatus(), + response.getContent() + ); + } + results = jsonMapper.readValue(response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + // Verify the basics: 5 service types, one of which is the custom node role. + // One of the two-node services has a size of 2. + // This endpoint includes an entry for historicals, even if none are running. + Assert.assertEquals(6, results.size(), "Wrong result count"); + Assert.assertNotNull(results.get(CliCustomNodeRole.SERVICE_NAME), "Custom node role missing"); + @SuppressWarnings("unchecked") + List coordNodes = (List) results.get(NodeRole.COORDINATOR.getJsonName()); + Assert.assertEquals(2, coordNodes.size(), "Expected two coordinators"); + @SuppressWarnings("unchecked") + List histNodes = (List) results.get(NodeRole.HISTORICAL.getJsonName()); + Assert.assertTrue(histNodes.isEmpty(), "Expected no nistorical nodes"); + return true; + } + + private boolean verifyRouterCluster() + { + Map results; + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.GET, + new URL(StringUtils.format( + "%s/druid/router/v1/cluster", + config.getRouterUrl() + )) + ), + StatusResponseHandler.getInstance() + ).get(); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while fetching cluster members from [%s] status [%s] content [%s]", + config.getRouterUrl(), + response.getStatus(), + response.getContent() + ); + } + results = jsonMapper.readValue(response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + // Verify the basics: 5 service types, one of which is the custom node role. + // One of the two-node services has a size of 2. + Assert.assertEquals(5, results.size(), "Wrong result count"); + Assert.assertNotNull(results.get(CliCustomNodeRole.SERVICE_NAME), "Custom node role missing"); + @SuppressWarnings("unchecked") + List coordNodes = (List) results.get(NodeRole.COORDINATOR.getJsonName()); + Assert.assertEquals(2, coordNodes.size(), "Expected two coordinators"); + return true; + } } diff --git a/integration-tests/src/test/resources/queries/high_availability_sys.json b/integration-tests/src/test/resources/queries/high_availability_sys.json index d5d60d4f2979..5a2dcb5d383a 100644 --- a/integration-tests/src/test/resources/queries/high_availability_sys.json +++ b/integration-tests/src/test/resources/queries/high_availability_sys.json @@ -8,6 +8,7 @@ {"host":"%%BROKER%%","server_type":"broker", "is_leader": %%NON_LEADER%%}, {"host":"%%COORDINATOR_ONE%%","server_type":"coordinator", "is_leader": %%COORDINATOR_ONE_LEADER%%}, {"host":"%%COORDINATOR_TWO%%","server_type":"coordinator", "is_leader": %%COORDINATOR_TWO_LEADER%%}, + {"host":"%%CUSTOM_ROLE%%","server_type":"custom-node-role", "is_leader": %%NON_LEADER%%}, {"host":"%%OVERLORD_ONE%%","server_type":"overlord", "is_leader": %%OVERLORD_ONE_LEADER%%}, {"host":"%%OVERLORD_TWO%%","server_type":"overlord", "is_leader": %%OVERLORD_TWO_LEADER%%}, {"host":"%%ROUTER%%","server_type":"router", "is_leader": %%NON_LEADER%%} diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index b5bc6f5fe529..ba95d9ae237c 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -42,7 +42,6 @@ /** * */ -@SuppressWarnings("serial") public class DruidDefaultSerializersModule extends SimpleModule { @SuppressWarnings("rawtypes") diff --git a/processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java b/processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java index 0a631175be74..e8414d70fa39 100644 --- a/processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java +++ b/processing/src/main/java/org/apache/druid/query/context/ResponseContextDeserializer.java @@ -31,7 +31,6 @@ * (That is, it is non-concurrent.) Clients of this code should convert the * context to concurrent if it will be used across threads. */ -@SuppressWarnings("serial") public class ResponseContextDeserializer extends StdDeserializer { public ResponseContextDeserializer() diff --git a/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java b/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java index c614a6aeaed1..bce9c562f6b0 100644 --- a/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java +++ b/server/src/main/java/org/apache/druid/client/FilteredHttpServerInventoryViewProvider.java @@ -36,20 +36,20 @@ public class FilteredHttpServerInventoryViewProvider implements FilteredServerIn @JacksonInject @NotNull @EscalatedClient - HttpClient httpClient = null; + HttpClient httpClient; @JacksonInject @NotNull @Smile - ObjectMapper smileMapper = null; + ObjectMapper smileMapper; @JacksonInject @NotNull - HttpServerInventoryViewConfig config = null; + HttpServerInventoryViewConfig config; @JacksonInject @NotNull - private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = null; + private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; @Override public HttpServerInventoryView get() diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index decab1f7cc71..f9a3212ee2c5 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -126,7 +126,6 @@ public HttpServerInventoryView( this.execNamePrefix = execNamePrefix; } - @LifecycleStart public void start() { diff --git a/server/src/main/java/org/apache/druid/discovery/LocalDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/LocalDruidNodeDiscoveryProvider.java new file mode 100644 index 000000000000..64f6eefccca7 --- /dev/null +++ b/server/src/main/java/org/apache/druid/discovery/LocalDruidNodeDiscoveryProvider.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.discovery; + +import org.apache.druid.server.DruidNode; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BooleanSupplier; + +/** + * Local node discovery. In this mode, Druid may run a number of + * logical servers within a single process, so that node discovery + * is all within the same process, but matches a true distributed + * node discovery. + *

+ * Concurrency is at the level of the entire provider, which is + * necessary to coordinate the active state with ongoing node + * discovery and listener registrations. Such a "global" lock + * is fine because the actions don't occur frequently, nor do the + * actions take much time. For this same reason, we use plain old + * Java synchronization rather than a fancier locking mechanism. + *

+ * At present, this class is used in unit tests so that it is + * possible to simulate cluster membership changes without actually running + * ZK, etc. The class can become the primary provider if/when Druid + * can run all services in a single process: in that case, we won't + * need ZK to tell the in-process services about each other. + */ +public class LocalDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvider implements DruidNodeAnnouncer +{ + private class RoleEntry implements DruidNodeDiscovery + { + private final Map nodes = new HashMap<>(); + private final List listeners = new ArrayList<>(); + + private void register(DiscoveryDruidNode node) + { + List targets; + synchronized (LocalDruidNodeDiscoveryProvider.this) { + DiscoveryDruidNode prev = nodes.put(node.getDruidNode(), node); + if (prev != null) { + return; + } + targets = new ArrayList<>(listeners); + } + for (Listener listener : targets) { + listener.nodesAdded(Collections.singletonList(node)); + } + } + + private void deregister(DiscoveryDruidNode node) + { + List targets; + synchronized (LocalDruidNodeDiscoveryProvider.this) { + DiscoveryDruidNode prev = nodes.remove(node.getDruidNode()); + if (prev == null) { + return; + } + targets = new ArrayList<>(listeners); + } + for (Listener listener : targets) { + listener.nodesRemoved(Collections.singletonList(node)); + } + } + + @Override + public Collection getAllNodes() + { + synchronized (LocalDruidNodeDiscoveryProvider.this) { + return new ArrayList<>(nodes.values()); + } + } + + @Override + public void registerListener(Listener listener) + { + synchronized (LocalDruidNodeDiscoveryProvider.this) { + listeners.add(listener); + if (!active) { + return; + } + } + listener.nodeViewInitialized(); + } + + private boolean contains(DruidNode node) + { + synchronized (LocalDruidNodeDiscoveryProvider.this) { + return nodes.containsKey(node); + } + } + } + + private boolean active; + private final Map roles = new HashMap<>(); + + public void initialized() + { + List targets = new ArrayList<>(); + synchronized (this) { + if (active) { + return; + } + active = true; + for (RoleEntry value : roles.values()) { + targets.addAll(value.listeners); + } + } + for (DruidNodeDiscovery.Listener target : targets) { + target.nodeViewInitialized(); + } + } + + @Override + public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole) + { + return () -> entry(nodeRole).contains(node); + } + + @Override + public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole) + { + return entry(nodeRole); + } + + public RoleEntry entry(NodeRole nodeRole) + { + return roles.computeIfAbsent(nodeRole, role -> new RoleEntry()); + } + + @Override + public void announce(DiscoveryDruidNode node) + { + entry(node.getNodeRole()).register(node); + } + + @Override + public void unannounce(DiscoveryDruidNode node) + { + entry(node.getNodeRole()).deregister(node); + } +} diff --git a/server/src/main/java/org/apache/druid/discovery/NodeRole.java b/server/src/main/java/org/apache/druid/discovery/NodeRole.java index c24fb7ba9e28..88fe06cecc5a 100644 --- a/server/src/main/java/org/apache/druid/discovery/NodeRole.java +++ b/server/src/main/java/org/apache/druid/discovery/NodeRole.java @@ -48,6 +48,12 @@ * * These abstractions can all potentially be merged when Druid updates to Jackson 2.9 that supports JsonAliases, * see https://github.com/apache/druid/issues/7152. + * + * Extensions can define a custom node role. By default, such roles are not visible from the + * REST APIs or in the system tables. To make a custom role visible, register it using + * {@link NodeRoles#bindRole(com.google.inject.Binder, NodeRole)}. The extension must be loaded + * in the Broker, Coordinator and Router for the API code to see the custom node role defined + * above. */ public class NodeRole { diff --git a/server/src/main/java/org/apache/druid/discovery/NodeRoles.java b/server/src/main/java/org/apache/druid/discovery/NodeRoles.java new file mode 100644 index 000000000000..9bfff1d6b507 --- /dev/null +++ b/server/src/main/java/org/apache/druid/discovery/NodeRoles.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.discovery; + +import com.google.common.collect.Collections2; +import com.google.inject.Binder; +import com.google.inject.multibindings.Multibinder; +import org.apache.druid.guice.annotations.Global; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.Node; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +public class NodeRoles +{ + private static final Logger log = new Logger(NodeRoles.class); + + /** + * Simulate the Guice binding of all node roles, but using just + * the known roles. Primarily for testing. + */ + public static Set knownRoles() + { + return new HashSet<>(Arrays.asList(NodeRole.values())); + } + + public static void bindKnownRoles(Binder binder) + { + Multibinder roleBinder = binder(binder); + for (NodeRole role : NodeRole.values()) { + roleBinder.addBinding().toInstance(role); + } + } + + /** + * Bind a node role for an extension service. + */ + public static void bindRole(Binder binder, NodeRole role) + { + log.debug("Adding node role: %s", role.getJsonName()); + binder(binder) + .addBinding() + .toInstance(role); + } + + public static Multibinder binder(Binder binder) + { + return Multibinder.newSetBinder(binder, NodeRole.class, Global.class); + } + + public static Collection getNodes( + DruidNodeDiscoveryProvider provider, + NodeRole nodeRole, + boolean full) + { + if (full) { + return getDiscoveryNodesForRole(provider, nodeRole); + } else { + return getNodesForRole(provider, nodeRole); + } + } + + public static Collection getDiscoveryNodesForRole( + DruidNodeDiscoveryProvider provider, + NodeRole nodeRole) + { + return provider + .getForNodeRole(nodeRole) + .getAllNodes(); + } + + public static Collection getNodesForRole( + DruidNodeDiscoveryProvider provider, + NodeRole nodeRole) + { + return Collections2.transform( + getDiscoveryNodesForRole(provider, nodeRole), + (discoveryDruidNode) -> new Node(discoveryDruidNode.getDruidNode())); + } +} diff --git a/server/src/main/java/org/apache/druid/initialization/Initialization.java b/server/src/main/java/org/apache/druid/initialization/Initialization.java index 2db099325924..b001a0ef6a1c 100644 --- a/server/src/main/java/org/apache/druid/initialization/Initialization.java +++ b/server/src/main/java/org/apache/druid/initialization/Initialization.java @@ -141,6 +141,7 @@ static Map getLoadersMap() * elements in the returned collection is not specified and not guaranteed to be the same for different calls to * getFromExtensions(). */ + @SuppressWarnings("unchecked") public static Collection getFromExtensions(ExtensionsConfig config, Class serviceClass) { // It's not clear whether we should recompute modules even if they have been computed already for the serviceClass, @@ -495,9 +496,9 @@ public void addModule(Object input) if (!checkModuleClass((Class) input)) { return; } - if (DruidModule.class.isAssignableFrom((Class) input)) { + if (DruidModule.class.isAssignableFrom((Class) input)) { modules.add(registerJacksonModules(baseInjector.getInstance((Class) input))); - } else if (Module.class.isAssignableFrom((Class) input)) { + } else if (Module.class.isAssignableFrom((Class) input)) { modules.add(baseInjector.getInstance((Class) input)); return; } else { @@ -518,7 +519,13 @@ private boolean shouldLoadOnCurrentNodeType(Object object) Set rolesPredicate = Arrays.stream(loadScope.roles()) .map(NodeRole::fromJsonName) .collect(Collectors.toSet()); - return rolesPredicate.stream().anyMatch(nodeRoles::contains); + boolean shouldLoad = rolesPredicate.stream().anyMatch(nodeRoles::contains); + if (!shouldLoad) { + log.info( + "Not loading module [%s] - excluded per LoadScope", + object.getClass().getName()); + } + return shouldLoad; } private boolean checkModuleClass(Class moduleClass) diff --git a/server/src/main/java/org/apache/druid/server/DruidNode.java b/server/src/main/java/org/apache/druid/server/DruidNode.java index 1a3603936490..cf112a8564e7 100644 --- a/server/src/main/java/org/apache/druid/server/DruidNode.java +++ b/server/src/main/java/org/apache/druid/server/DruidNode.java @@ -55,7 +55,7 @@ public class DruidNode * Default is false, which means binding to all interfaces. */ @JsonProperty - private boolean bindOnHost = false; + private boolean bindOnHost; /** * This property is now deprecated, this is present just so that JsonConfigurator does not fail if this is set. @@ -78,7 +78,7 @@ public class DruidNode private int tlsPort = -1; @JsonProperty - private boolean enableTlsPort = false; + private boolean enableTlsPort; public DruidNode( String serviceName, diff --git a/server/src/main/java/org/apache/druid/server/Node.java b/server/src/main/java/org/apache/druid/server/Node.java new file mode 100644 index 000000000000..876a26f53c84 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/Node.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * This class exists only to support two REST endpoints: + *

    + *
  • {@code /druid/coordinator/v1/cluster}
  • + *
  • {@code /druid/router/v1/cluster}
  • + *

+ * + * The endpoints present similar information, as encapsulated here. + *

+ * {@code Node} is similar to {@code DruidNode}, + * but serializes to the specific form expected by those REST + * endpoints. This version omits {@code bindOnHost}, uses the name + * {@code service} where DruidNode uses {@code serviceName}, and + * omits the TLS or Plaintext port rather than using -1 and flags + * as in {@code DruidNode}. Think of this as a purpose-built facade + * onto a {@code DruidNode}. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class Node +{ + private final DruidNode node; + + @JsonCreator + public Node(DruidNode node) + { + this.node = node; + } + + @JsonProperty + public String getHost() + { + return node.getHost(); + } + + @JsonProperty + public String getService() + { + return node.getServiceName(); + } + + @JsonProperty + public Integer getPlaintextPort() + { + return node.isEnablePlaintextPort() ? node.getPlaintextPort() : null; + } + + @JsonProperty + public Integer getTlsPort() + { + return node.isEnableTlsPort() ? node.getTlsPort() : null; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || !(o instanceof Node)) { + return false; + } + return node.equals(((Node) o).node); + } + + @Override + public int hashCode() + { + return node.hashCode(); + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java index eabf51ec115e..fadec399dc67 100644 --- a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java +++ b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java @@ -19,18 +19,14 @@ package org.apache.druid.server.http; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; -import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; +import org.apache.druid.discovery.NodeRoles; import org.apache.druid.guice.LazySingleton; -import org.apache.druid.server.DruidNode; +import org.apache.druid.guice.annotations.Global; import org.apache.druid.server.http.security.StateResourceFilter; import javax.ws.rs.GET; @@ -40,8 +36,12 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; + import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; /** */ @@ -51,40 +51,60 @@ public class ClusterResource { private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final Set nodeRoles; @Inject - public ClusterResource(DruidNodeDiscoveryProvider discoveryProvider) + public ClusterResource( + final DruidNodeDiscoveryProvider discoveryProvider, + final @Global Set allNodeRoles) { this.druidNodeDiscoveryProvider = discoveryProvider; + this.nodeRoles = allNodeRoles; } @GET @Produces(MediaType.APPLICATION_JSON) public Response getClusterServers(@QueryParam("full") boolean full) { - ImmutableMap.Builder entityBuilder = new ImmutableMap.Builder<>(); + // Add Druid-defined roles in a defined order. + List definedOrder = Arrays.asList( + NodeRole.COORDINATOR, + NodeRole.OVERLORD, + NodeRole.BROKER, + NodeRole.HISTORICAL, + NodeRole.MIDDLE_MANAGER, + NodeRole.INDEXER, + NodeRole.ROUTER + ); + + // Omit role groups for optional services. + Set omitIfEmpty = new HashSet<>(Arrays.asList( + NodeRole.MIDDLE_MANAGER, + NodeRole.INDEXER, + NodeRole.ROUTER + )); - entityBuilder.put(NodeRole.COORDINATOR, getNodes(NodeRole.COORDINATOR, full)); - entityBuilder.put(NodeRole.OVERLORD, getNodes(NodeRole.OVERLORD, full)); - entityBuilder.put(NodeRole.BROKER, getNodes(NodeRole.BROKER, full)); - entityBuilder.put(NodeRole.HISTORICAL, getNodes(NodeRole.HISTORICAL, full)); - - Collection mmNodes = getNodes(NodeRole.MIDDLE_MANAGER, full); - if (!mmNodes.isEmpty()) { - entityBuilder.put(NodeRole.MIDDLE_MANAGER, mmNodes); - } - - Collection indexerNodes = getNodes(NodeRole.INDEXER, full); - if (!indexerNodes.isEmpty()) { - entityBuilder.put(NodeRole.INDEXER, indexerNodes); + ImmutableMap.Builder entityBuilder = new ImmutableMap.Builder<>(); + for (NodeRole role : definedOrder) { + Collection services = NodeRoles.getNodes(druidNodeDiscoveryProvider, role, full); + if (!omitIfEmpty.contains(role) || !services.isEmpty()) { + entityBuilder.put(role, services); + } } - Collection routerNodes = getNodes(NodeRole.ROUTER, full); - if (!routerNodes.isEmpty()) { - entityBuilder.put(NodeRole.ROUTER, routerNodes); + // Add any extension node roles, but only if service instances exist. + Set stockRoles = new HashSet<>(definedOrder); + for (NodeRole role : nodeRoles) { + if (stockRoles.contains(role)) { + continue; + } + Collection services = NodeRoles.getNodes(druidNodeDiscoveryProvider, role, full); + if (!services.isEmpty()) { + entityBuilder.put(role, services); + } } - return Response.status(Response.Status.OK).entity(entityBuilder.build()).build(); + return Response.ok().entity(entityBuilder.build()).build(); } @GET @@ -95,76 +115,17 @@ public Response getClusterServers(@PathParam("nodeRole") NodeRole nodeRole, @Que if (nodeRole == null) { return Response.serverError() .status(Response.Status.BAD_REQUEST) - .entity("Invalid nodeRole of null. Valid node roles are " + Arrays.toString(NodeRole.values())) + .entity( + ImmutableMap.of( + "error", + "Invalid nodeRole of null. Valid node roles are " + + Arrays.toString(NodeRole.values()))) .build(); - } else { - return Response.status(Response.Status.OK).entity(getNodes(nodeRole, full)).build(); - } - } - - private Collection getNodes(NodeRole nodeRole, boolean full) - { - Collection discoveryDruidNodes = druidNodeDiscoveryProvider.getForNodeRole(nodeRole) - .getAllNodes(); - if (full) { - return (Collection) discoveryDruidNodes; - } else { - return Collections2.transform( - discoveryDruidNodes, - (discoveryDruidNode) -> Node.from(discoveryDruidNode.getDruidNode()) - ); - } - } - - @JsonInclude(JsonInclude.Include.NON_NULL) - private static class Node - { - private final String host; - private final String service; - private final Integer plaintextPort; - private final Integer tlsPort; - - @JsonCreator - public Node(String host, String service, Integer plaintextPort, Integer tlsPort) - { - this.host = host; - this.service = service; - this.plaintextPort = plaintextPort; - this.tlsPort = tlsPort; - } - - @JsonProperty - public String getHost() - { - return host; - } - - @JsonProperty - public String getService() - { - return service; - } - - @JsonProperty - public Integer getPlaintextPort() - { - return plaintextPort; - } - - @JsonProperty - public Integer getTlsPort() - { - return tlsPort; - } - - public static Node from(DruidNode druidNode) - { - return new Node( - druidNode.getHost(), - druidNode.getServiceName(), - druidNode.getPlaintextPort() > 0 ? druidNode.getPlaintextPort() : null, - druidNode.getTlsPort() > 0 ? druidNode.getTlsPort() : null - ); } + return Response + .ok() + .entity( + NodeRoles.getNodes(druidNodeDiscoveryProvider, nodeRole, full)) + .build(); } } diff --git a/server/src/test/java/org/apache/druid/curator/discovery/NodeRolesTest.java b/server/src/test/java/org/apache/druid/curator/discovery/NodeRolesTest.java new file mode 100644 index 000000000000..c1249a0083d2 --- /dev/null +++ b/server/src/test/java/org/apache/druid/curator/discovery/NodeRolesTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator.discovery; + +import com.google.common.collect.Iterators; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.TypeLiteral; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.LocalDruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.discovery.NodeRoles; +import org.apache.druid.guice.annotations.Global; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.Node; +import org.junit.Test; + +import java.util.Collection; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Verifies that the NodeRoles class allows injecting a set of roles + * which includes custom roles defined by an extension. + */ +public class NodeRolesTest +{ + @Test + public void testNodeRoles() + { + Set knownRules = NodeRoles.knownRoles(); + assertEquals(NodeRole.values().length, knownRules.size()); + + NodeRole customRole = new NodeRole("custom"); + Injector injector = Guice.createInjector( + binder -> { + NodeRoles.bindKnownRoles(binder); + NodeRoles.bindRole(binder, customRole); + }); + Set roles = injector.getInstance( + Key.get(new TypeLiteral>(){}, Global.class)); + assertEquals(NodeRole.values().length + 1, roles.size()); + assertTrue(roles.contains(customRole)); + } + + @Test + public void testDiscovery() + { + // Provider with one node + LocalDruidNodeDiscoveryProvider provider = new LocalDruidNodeDiscoveryProvider(); + DruidNode druidNode = new DruidNode("broker", "localhost", false, 1000, -1, true, false); + DiscoveryDruidNode dn = new DiscoveryDruidNode(druidNode, NodeRole.BROKER, null); + provider.announce(dn); + + Collection discoveryNodes = NodeRoles.getDiscoveryNodesForRole(provider, NodeRole.BROKER); + assertEquals(1, discoveryNodes.size()); + assertEquals(dn, Iterators.getOnlyElement(discoveryNodes.iterator())); + assertEquals(discoveryNodes, NodeRoles.getNodes(provider, NodeRole.BROKER, true)); + + discoveryNodes = NodeRoles.getDiscoveryNodesForRole(provider, NodeRole.OVERLORD); + assertTrue(discoveryNodes.isEmpty()); + + Collection nodes = NodeRoles.getNodesForRole(provider, NodeRole.BROKER); + assertEquals(1, nodes.size()); + Node node = Iterators.getOnlyElement(nodes.iterator()); + assertEquals(node, new Node(druidNode)); + assertEquals(node, Iterators.getOnlyElement( + NodeRoles.getNodes(provider, NodeRole.BROKER, false).iterator())); + + nodes = NodeRoles.getNodesForRole(provider, NodeRole.OVERLORD); + assertTrue(nodes.isEmpty()); + } +} diff --git a/server/src/test/java/org/apache/druid/discovery/LocalDruidNodeDiscoveryTest.java b/server/src/test/java/org/apache/druid/discovery/LocalDruidNodeDiscoveryTest.java new file mode 100644 index 000000000000..e96e51142cdd --- /dev/null +++ b/server/src/test/java/org/apache/druid/discovery/LocalDruidNodeDiscoveryTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.discovery; + +import org.apache.druid.server.DruidNode; +import org.junit.Test; + +import java.util.Collection; +import java.util.function.BooleanSupplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class LocalDruidNodeDiscoveryTest +{ + public static class TestListener implements DruidNodeDiscovery.Listener + { + int netCount; + + @Override + public void nodesAdded(Collection nodes) + { + netCount++; + } + + @Override + public void nodesRemoved(Collection nodes) + { + netCount--; + } + } + + @Test + public void testBasics() + { + LocalDruidNodeDiscoveryProvider p = new LocalDruidNodeDiscoveryProvider(); + TestListener tl = new TestListener(); + p.getForNodeRole(NodeRole.BROKER).registerListener(tl); + + DruidNode node = new DruidNode("broker", "localhost", false, 1000, -1, true, false); + BooleanSupplier isPresent = p.getForNode(node, NodeRole.BROKER); + assertFalse(isPresent.getAsBoolean()); + + DiscoveryDruidNode dn = new DiscoveryDruidNode(node, NodeRole.BROKER, null); + p.announce(dn); + assertTrue(isPresent.getAsBoolean()); + assertEquals(1, tl.netCount); + p.unannounce(dn); + assertEquals(0, tl.netCount); + assertFalse(isPresent.getAsBoolean()); + } +} diff --git a/server/src/test/java/org/apache/druid/server/NodeTest.java b/server/src/test/java/org/apache/druid/server/NodeTest.java new file mode 100644 index 000000000000..da81bf75dfe8 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/NodeTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; + +public class NodeTest +{ + @Test + public void testNode() + { + DruidNode druidNode = new DruidNode("service", "host", true, 80, 81, true, true); + Node node = new Node(druidNode); + assertEquals("host", node.getHost()); + assertEquals("service", node.getService()); + assertEquals((Integer) 80, node.getPlaintextPort()); + assertEquals((Integer) 81, node.getTlsPort()); + assertEquals(node, node); + + DruidNode druidNode2 = new DruidNode("service", "host", true, 80, 81, true, false); + Node node2 = new Node(druidNode2); + assertEquals((Integer) 80, node2.getPlaintextPort()); + assertNull(node2.getTlsPort()); + assertNotEquals(node, node2); + + DruidNode druidNode3 = new DruidNode("service", "host", true, 80, 81, false, true); + Node node3 = new Node(druidNode3); + assertNull(node3.getPlaintextPort()); + assertEquals((Integer) 81, node.getTlsPort()); + assertNotEquals(node, node2); + } +} diff --git a/server/src/test/java/org/apache/druid/server/http/ClusterResourceTest.java b/server/src/test/java/org/apache/druid/server/http/ClusterResourceTest.java new file mode 100644 index 000000000000..94af8abe0f5f --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/http/ClusterResourceTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.google.common.collect.Iterators; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.LocalDruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.discovery.NodeRoles; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.Node; +import org.junit.Test; + +import javax.ws.rs.core.Response; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class ClusterResourceTest +{ + private void assertOk(Response resp) + { + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + } + + @SuppressWarnings("unchecked") + private Map> mapResult(Response resp) + { + assertOk(resp); + return (Map>) resp.getEntity(); + } + + @Test + public void testNoNodes() + { + LocalDruidNodeDiscoveryProvider provider = new LocalDruidNodeDiscoveryProvider(); + ClusterResource resource = new ClusterResource(provider, NodeRoles.knownRoles()); + + int emptyCount = 4; + { + Response resp = resource.getClusterServers(true); + Map> results = mapResult(resp); + assertEquals(emptyCount, results.size()); + assertTrue(results.containsKey(NodeRole.COORDINATOR)); + assertTrue(results.containsKey(NodeRole.OVERLORD)); + assertTrue(results.containsKey(NodeRole.BROKER)); + assertTrue(results.containsKey(NodeRole.HISTORICAL)); + } + + { + Response resp = resource.getClusterServers(false); + Map> results = mapResult(resp); + assertEquals(emptyCount, results.size()); + } + + { + Response resp = resource.getClusterServers(NodeRole.BROKER, true); + assertOk(resp); + @SuppressWarnings("unchecked") + Collection results = (Collection) resp.getEntity(); + assertTrue(results.isEmpty()); + } + + { + Response resp = resource.getClusterServers(NodeRole.BROKER, false); + assertOk(resp); + @SuppressWarnings("unchecked") + Collection results = (Collection) resp.getEntity(); + assertTrue(results.isEmpty()); + } + } + + @Test + public void testNullNodeRole() + { + LocalDruidNodeDiscoveryProvider provider = new LocalDruidNodeDiscoveryProvider(); + ClusterResource resource = new ClusterResource(provider, NodeRoles.knownRoles()); + + Response resp = resource.getClusterServers(null, true); + assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), resp.getStatus()); + @SuppressWarnings("unchecked") + Map results = (Map) resp.getEntity(); + assertNotNull(results.get("error")); + } + + private void announce(LocalDruidNodeDiscoveryProvider provider, NodeRole role, int count) + { + String service = StringUtils.toLowerCase(role.getJsonName()); + for (int i = 1; i < count + 1; i++) { + DruidNode druidNode = new DruidNode(service, service + i, false, 1000, -1, true, false); + DiscoveryDruidNode dn = new DiscoveryDruidNode(druidNode, role, null); + provider.announce(dn); + } + } + + @Test + public void testAllTypes() + { + LocalDruidNodeDiscoveryProvider provider = new LocalDruidNodeDiscoveryProvider(); + announce(provider, NodeRole.BROKER, 2); + announce(provider, NodeRole.ROUTER, 3); + announce(provider, NodeRole.COORDINATOR, 4); + announce(provider, NodeRole.OVERLORD, 5); + announce(provider, NodeRole.HISTORICAL, 6); + announce(provider, NodeRole.MIDDLE_MANAGER, 7); + announce(provider, NodeRole.INDEXER, 8); + ClusterResource resource = new ClusterResource(provider, NodeRoles.knownRoles()); + + { + Response resp = resource.getClusterServers(true); + Map> results = mapResult(resp); + assertEquals(7, results.size()); + assertEquals(2, results.get(NodeRole.BROKER).size()); + assertEquals(3, results.get(NodeRole.ROUTER).size()); + assertEquals(4, results.get(NodeRole.COORDINATOR).size()); + assertEquals(5, results.get(NodeRole.OVERLORD).size()); + assertEquals(6, results.get(NodeRole.HISTORICAL).size()); + assertEquals(7, results.get(NodeRole.MIDDLE_MANAGER).size()); + assertEquals(8, results.get(NodeRole.INDEXER).size()); + } + + { + Response resp = resource.getClusterServers(NodeRole.BROKER, false); + assertOk(resp); + @SuppressWarnings("unchecked") + Collection results = (Collection) resp.getEntity(); + assertEquals(2, results.size()); + } + } + + @Test + public void testExtensionService() + { + LocalDruidNodeDiscoveryProvider provider = new LocalDruidNodeDiscoveryProvider(); + announce(provider, NodeRole.BROKER, 1); + announce(provider, NodeRole.COORDINATOR, 1); + announce(provider, NodeRole.OVERLORD, 1); + NodeRole customRole = new NodeRole("custom"); + announce(provider, customRole, 1); + Set roles = new HashSet<>(NodeRoles.knownRoles()); + roles.add(customRole); + + ClusterResource resource = new ClusterResource(provider, roles); + + { + Response resp = resource.getClusterServers(true); + Map> results = mapResult(resp); + // 4 standard + custom + assertEquals(5, results.size()); + assertEquals(1, results.get(NodeRole.BROKER).size()); + assertEquals(1, results.get(customRole).size()); + } + + { + Response resp = resource.getClusterServers(customRole, false); + assertOk(resp); + @SuppressWarnings("unchecked") + Collection results = (Collection) resp.getEntity(); + assertEquals(1, results.size()); + } + + { + // Unknown role is the same as a known role with no instances + NodeRole bogus = new NodeRole("gogus"); + Response resp = resource.getClusterServers(bogus, false); + assertOk(resp); + @SuppressWarnings("unchecked") + Collection results = (Collection) resp.getEntity(); + assertTrue(results.isEmpty()); + } + } + + @Test + public void testFormat() + { + LocalDruidNodeDiscoveryProvider provider = new LocalDruidNodeDiscoveryProvider(); + announce(provider, NodeRole.BROKER, 1); + ClusterResource resource = new ClusterResource(provider, NodeRoles.knownRoles()); + + { + Response resp = resource.getClusterServers(true); + Map> results = mapResult(resp); + assertTrue(Iterators.getOnlyElement(results.get(NodeRole.BROKER).iterator()) instanceof DiscoveryDruidNode); + } + + { + Response resp = resource.getClusterServers(false); + Map> results = mapResult(resp); + assertTrue(Iterators.getOnlyElement(results.get(NodeRole.BROKER).iterator()) instanceof Node); + } + + { + Response resp = resource.getClusterServers(NodeRole.BROKER, true); + assertOk(resp); + @SuppressWarnings("unchecked") + Collection results = (Collection) resp.getEntity(); + assertTrue(Iterators.getOnlyElement(results.iterator()) instanceof DiscoveryDruidNode); + } + + { + Response resp = resource.getClusterServers(NodeRole.BROKER, false); + assertOk(resp); + @SuppressWarnings("unchecked") + Collection results = (Collection) resp.getEntity(); + assertTrue(Iterators.getOnlyElement(results.iterator()) instanceof Node); + } + } +} diff --git a/services/src/main/java/org/apache/druid/cli/GuiceRunnable.java b/services/src/main/java/org/apache/druid/cli/GuiceRunnable.java index 30e4bdb281df..7a069514bd59 100644 --- a/services/src/main/java/org/apache/druid/cli/GuiceRunnable.java +++ b/services/src/main/java/org/apache/druid/cli/GuiceRunnable.java @@ -32,6 +32,7 @@ import com.google.inject.multibindings.Multibinder; import org.apache.druid.discovery.DruidService; import org.apache.druid.discovery.NodeRole; +import org.apache.druid.discovery.NodeRoles; import org.apache.druid.guice.annotations.Self; import org.apache.druid.initialization.Initialization; import org.apache.druid.java.util.common.StringUtils; @@ -119,6 +120,8 @@ static Module registerNodeRoleModule(Set nodeRoles) new TypeLiteral(){}, new TypeLiteral>>(){} ); + + NodeRoles.bindKnownRoles(binder); }; } } diff --git a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index 3668543bf496..3ca3f6459576 100644 --- a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -245,7 +245,7 @@ protected void service(HttpServletRequest request, HttpServletResponse response) } else if (isNativeQueryEndpoint && HttpMethod.POST.is(method)) { // query request try { - Query inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); + Query inputQuery = objectMapper.readValue(request.getInputStream(), Query.class); if (inputQuery != null) { targetServer = hostFinder.pickServer(inputQuery); if (inputQuery.getId() == null) { @@ -396,7 +396,7 @@ protected void sendProxyRequest( proxyRequest.content(new BytesContentProvider(avaticaQuery)); } - final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE); + final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE); final SqlQuery sqlQuery = (SqlQuery) clientRequest.getAttribute(SQL_QUERY_ATTRIBUTE); if (query != null) { setProxyRequestContent(proxyRequest, clientRequest, query); @@ -449,7 +449,7 @@ private void setProxyRequestContent(Request proxyRequest, HttpServletRequest cli @Override protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response) { - final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE); + final Query query = (Query) request.getAttribute(QUERY_ATTRIBUTE); if (query != null) { return newMetricsEmittingProxyResponseListener(request, response, query, System.nanoTime()); } else { @@ -500,7 +500,7 @@ protected HttpClient createHttpClient() throws ServletException private Response.Listener newMetricsEmittingProxyResponseListener( HttpServletRequest request, HttpServletResponse response, - Query query, + Query query, long startNs ) { @@ -544,17 +544,18 @@ protected void onServerResponseHeaders( super.onServerResponseHeaders(clientRequest, proxyResponse, serverResponse); } + @SuppressWarnings("unchecked") @VisibleForTesting static String getAvaticaConnectionId(Map requestMap) { - // avatica commands always have a 'connectionId'. If commands are not part of a prepared statement, this appears at + // Avatica commands always have a 'connectionId'. If commands are not part of a prepared statement, this appears at // the top level of the request, but if it is part of a statement, then it will be nested in the 'statementHandle'. // see https://calcite.apache.org/avatica/docs/json_reference.html#requests for more details Object connectionIdObj = requestMap.get(AVATICA_CONNECTION_ID); if (connectionIdObj == null) { Object statementHandle = requestMap.get(AVATICA_STATEMENT_HANDLE); if (statementHandle != null && statementHandle instanceof Map) { - connectionIdObj = ((Map) statementHandle).get(AVATICA_CONNECTION_ID); + connectionIdObj = ((Map) statementHandle).get(AVATICA_CONNECTION_ID); } } @@ -660,7 +661,6 @@ static String getAvaticaProtobufConnectionId(Service.Request request) private class MetricsEmittingProxyResponseListener extends ProxyResponseListener { private final HttpServletRequest req; - private final HttpServletResponse res; private final Query query; private final long startNs; @@ -674,7 +674,6 @@ public MetricsEmittingProxyResponseListener( super(request, response); this.req = request; - this.res = response; this.query = query; this.startNs = startNs; } diff --git a/services/src/main/java/org/apache/druid/server/http/RouterResource.java b/services/src/main/java/org/apache/druid/server/http/RouterResource.java index 994d8b5e9b98..7813362ea95b 100644 --- a/services/src/main/java/org/apache/druid/server/http/RouterResource.java +++ b/services/src/main/java/org/apache/druid/server/http/RouterResource.java @@ -19,19 +19,31 @@ package org.apache.druid.server.http; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.client.selector.Server; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.discovery.NodeRoles; +import org.apache.druid.guice.annotations.Global; import org.apache.druid.server.http.security.StateResourceFilter; import org.apache.druid.server.router.TieredBrokerHostSelector; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.stream.Collectors; /** @@ -40,11 +52,18 @@ public class RouterResource { private final TieredBrokerHostSelector tieredBrokerHostSelector; + private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; + private final Set nodeRoles; @Inject - public RouterResource(TieredBrokerHostSelector tieredBrokerHostSelector) + public RouterResource( + final TieredBrokerHostSelector tieredBrokerHostSelector, + final DruidNodeDiscoveryProvider discoveryProvider, + final @Global Set allNodeRoles) { this.tieredBrokerHostSelector = tieredBrokerHostSelector; + this.druidNodeDiscoveryProvider = discoveryProvider; + this.nodeRoles = allNodeRoles; } @GET @@ -63,4 +82,34 @@ public Map> getBrokers() return brokersMap; } + + /** + * Returns a map of all services in the cluster, including extension services. + * Returns the map sorted by name so that output is stable. Visible on the + * router so clients can get a full cluster view, even if one of the services + * is down (assuming the router survives.) + * + * @see {@code /druid/coordinator/v1/cluster} for a similar API. + */ + @GET + @Path("/cluster") + @ResourceFilters(StateResourceFilter.class) + @Produces(MediaType.APPLICATION_JSON) + public Response getCluster(@QueryParam("full") boolean full) + { + SortedSet roles = new TreeSet<>( + (r1, r2) -> r1.getJsonName().compareTo(r2.getJsonName())); + roles.addAll(nodeRoles); + ImmutableMap.Builder entityBuilder = new ImmutableMap.Builder<>(); + for (NodeRole role : roles) { + Collection services = NodeRoles.getNodes(druidNodeDiscoveryProvider, role, full); + if (!services.isEmpty()) { + entityBuilder.put(role, services); + } + } + return Response + .ok() + .entity(entityBuilder.build()) + .build(); + } } diff --git a/services/src/test/java/org/apache/druid/server/http/RouterResourceTest.java b/services/src/test/java/org/apache/druid/server/http/RouterResourceTest.java new file mode 100644 index 000000000000..0e522f033410 --- /dev/null +++ b/services/src/test/java/org/apache/druid/server/http/RouterResourceTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.google.common.collect.Iterators; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.LocalDruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.discovery.NodeRoles; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.Node; +import org.junit.Test; + +import javax.ws.rs.core.Response; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RouterResourceTest +{ + private void announce(LocalDruidNodeDiscoveryProvider provider, NodeRole role, int count) + { + String service = StringUtils.toLowerCase(role.getJsonName()); + for (int i = 1; i < count + 1; i++) { + DruidNode druidNode = new DruidNode(service, service + i, false, 1000, -1, true, false); + DiscoveryDruidNode dn = new DiscoveryDruidNode(druidNode, role, null); + provider.announce(dn); + } + } + + /** + * Verify that the /cluster endpoint works, and includes extension roles. + */ + @Test + public void testCluster() + { + LocalDruidNodeDiscoveryProvider provider = new LocalDruidNodeDiscoveryProvider(); + announce(provider, NodeRole.BROKER, 1); + announce(provider, NodeRole.COORDINATOR, 1); + announce(provider, NodeRole.OVERLORD, 1); + NodeRole customRole = new NodeRole("custom"); + announce(provider, customRole, 1); + Set roles = new HashSet<>(NodeRoles.knownRoles()); + roles.add(customRole); + RouterResource resource = new RouterResource(null, provider, roles); + + { + Response resp = resource.getCluster(true); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + @SuppressWarnings("unchecked") + Map> map = (Map>) resp.getEntity(); + assertEquals(4, map.size()); + assertTrue(Iterators.getOnlyElement(map.get(customRole).iterator()) instanceof DiscoveryDruidNode); + assertEquals(1, map.get(NodeRole.BROKER).size()); + assertEquals(1, map.get(NodeRole.COORDINATOR).size()); + assertEquals(1, map.get(NodeRole.OVERLORD).size()); + } + + { + Response resp = resource.getCluster(false); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + @SuppressWarnings("unchecked") + Map> map = (Map>) resp.getEntity(); + assertEquals(4, map.size()); + assertTrue(Iterators.getOnlyElement(map.get(customRole).iterator()) instanceof Node); + assertEquals(1, map.get(NodeRole.BROKER).size()); + assertEquals(1, map.get(NodeRole.COORDINATOR).size()); + assertEquals(1, map.get(NodeRole.OVERLORD).size()); + } + } +} diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 2b58169c37d6..2b815f1e2dd9 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -58,6 +58,7 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.DruidService; import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.Global; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.ISE; @@ -87,10 +88,12 @@ import javax.annotation.Nullable; import javax.servlet.http.HttpServletResponse; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -214,13 +217,20 @@ public SystemSchema( final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient, final @IndexingService DruidLeaderClient overlordDruidLeaderClient, final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, + final @Global Set allNodeRoles, final ObjectMapper jsonMapper ) { Preconditions.checkNotNull(serverView, "serverView"); this.tableMap = ImmutableMap.of( SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, jsonMapper, authorizerMapper), - SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper, overlordDruidLeaderClient, coordinatorDruidLeaderClient), + SERVERS_TABLE, new ServersTable( + druidNodeDiscoveryProvider, + serverInventoryView, + authorizerMapper, + overlordDruidLeaderClient, + coordinatorDruidLeaderClient, + allNodeRoles), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper), SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, authorizerMapper) @@ -491,13 +501,15 @@ static class ServersTable extends AbstractTable implements ScannableTable private final FilteredServerInventoryView serverInventoryView; private final DruidLeaderClient overlordLeaderClient; private final DruidLeaderClient coordinatorLeaderClient; + private final Set allNodeRoles; public ServersTable( DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, FilteredServerInventoryView serverInventoryView, AuthorizerMapper authorizerMapper, DruidLeaderClient overlordLeaderClient, - DruidLeaderClient coordinatorLeaderClient + DruidLeaderClient coordinatorLeaderClient, + Set allNodeRoles ) { this.authorizerMapper = authorizerMapper; @@ -505,6 +517,7 @@ public ServersTable( this.serverInventoryView = serverInventoryView; this.overlordLeaderClient = overlordLeaderClient; this.coordinatorLeaderClient = coordinatorLeaderClient; + this.allNodeRoles = allNodeRoles; } @Override @@ -682,13 +695,37 @@ private static DruidServer toDruidServer(DiscoveryDruidNode discoveryDruidNode) } } - private static Iterator getDruidServers(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider) + private Iterator getDruidServers(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider) { - return Arrays.stream(NodeRole.values()) + return orderedNodeRoles().stream() .flatMap(nodeRole -> druidNodeDiscoveryProvider.getForNodeRole(nodeRole).getAllNodes().stream()) .collect(Collectors.toList()) .iterator(); } + + /** + * Preserve backward compatibility while adding custom node roles. Prior versions + * listed roles in the order defined in the NodeRole enum, now class. We add + * custom node roles at the end. We choose alphabetical order so the the custom + * roles, if more than one, return in a stable, if arbitrary, order. + */ + private List orderedNodeRoles() + { + // Add known roles in the order defined by the enum. + List roles = Arrays.asList(NodeRole.values()); + + // Compute the set of roles which are not known in the enum. + Set customRoles = new HashSet<>(allNodeRoles); + customRoles.removeAll(roles); + + // Sort the "extra" (extension) roles. + List extras = new ArrayList<>(customRoles); + extras.sort((o1, o2) -> o1.getJsonName().compareTo(o2.getJsonName())); + + // Extension roles go after the known roles. + roles.addAll(extras); + return roles; + } } /** diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java index 7387fbe2463b..1c3e78be5d0b 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java @@ -34,6 +34,7 @@ import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRoles; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.annotations.Json; @@ -123,6 +124,7 @@ public void setUp() binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(objectMapper); binder.bindScope(LazySingleton.class, Scopes.SINGLETON); binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(lookupReferencesManager); + NodeRoles.bindKnownRoles(binder); }, new LifecycleModule(), target); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 4b76fbf8e21a..1e9f4e403db8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -48,6 +48,7 @@ import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; +import org.apache.druid.discovery.NodeRoles; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -269,6 +270,7 @@ public void setUp() throws Exception client, client, druidNodeDiscoveryProvider, + NodeRoles.knownRoles(), mapper ); } @@ -730,7 +732,8 @@ public void testServersTable() serverInventoryView, authMapper, overlordClient, - coordinatorClient + coordinatorClient, + NodeRoles.knownRoles() ) .createMock(); EasyMock.replay(serversTable); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 54d3aea6f565..08b6a206bb4a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -56,6 +56,7 @@ import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeRole; +import org.apache.druid.discovery.NodeRoles; import org.apache.druid.guice.ExpressionModule; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.DateTimes; @@ -1179,6 +1180,7 @@ public String findCurrentLeader() druidLeaderClient, overlordLeaderClient, provider, + NodeRoles.knownRoles(), getJsonMapper() ); } diff --git a/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java b/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java index 4da433e46e6a..a7122e446088 100644 --- a/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java @@ -34,6 +34,7 @@ import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRoles; import org.apache.druid.guice.DruidGuiceExtensions; import org.apache.druid.guice.JsonConfigurator; import org.apache.druid.guice.LazySingleton; @@ -148,7 +149,7 @@ public void testDefaultViewManagerBind() Assert.assertNotNull(viewManager); Assert.assertTrue(viewManager instanceof NoopViewManager); } - + @Test public void testNonDefaultViewManagerBind() { @@ -198,6 +199,7 @@ private Injector makeInjectorWithProperties(final Properties props) binder.bind(QueryScheduler.class) .toProvider(QuerySchedulerProvider.class) .in(LazySingleton.class); + NodeRoles.bindKnownRoles(binder); }, new SqlModule(props), new TestViewManagerModule()