diff --git a/.travis.yml b/.travis.yml index e7582d5ae716..d91d92497c13 100644 --- a/.travis.yml +++ b/.travis.yml @@ -344,7 +344,7 @@ jobs: name: "(Compile=openjdk8, Run=openjdk8) other integration test" jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion' JVM_RUNTIME='-Djvm.runtime=8' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index' JVM_RUNTIME='-Djvm.runtime=8' script: *run_integration_test after_failure: *integration_test_diags # END - Integration tests for Compile with Java 8 and Run with Java 8 @@ -383,7 +383,7 @@ jobs: - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk11) other integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion' JVM_RUNTIME='-Djvm.runtime=11' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index' JVM_RUNTIME='-Djvm.runtime=11' # END - Integration tests for Compile with Java 8 and Run with Java 11 - name: "security vulnerabilities" diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index bb54e4be9cdd..e0fcc2d00730 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -137,7 +137,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon | `indexSpecForIntermediatePersists` | | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values. | no (default = same as indexSpec) | | `reportParseExceptions` | Boolean | If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. | no (default == false) | | `handoffConditionTimeout` | Long | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. | no (default == 0) | -| `resetOffsetAutomatically` | Boolean | Controls behavior when Druid needs to read Kinesis messages that are no longer available.

If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.html#supervisors). This mode is useful for production, since it will make you aware of issues with ingestion.

If true, Druid will automatically reset to the earlier or latest sequence number available in Kinesis, based on the value of the `useEarliestOffset` property (earliest if true, latest if false). Please note that this can lead to data being _DROPPED_ (if `useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data. | no (default == false) | +| `resetOffsetAutomatically` | Boolean | Controls behavior when Druid needs to read Kinesis messages that are no longer available.

If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.html#supervisors). This mode is useful for production, since it will make you aware of issues with ingestion.

If true, Druid will automatically reset to the earlier or latest sequence number available in Kinesis, based on the value of the `useEarliestSequenceNumber` property (earliest if true, latest if false). Please note that this can lead to data being _DROPPED_ (if `useEarliestSequenceNumber` is false) or _DUPLICATED_ (if `useEarliestSequenceNumber` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data. | no (default == false) | | `skipSequenceNumberAvailabilityCheck` | Boolean | Whether to enable checking if the current sequence number is still available in a particular Kinesis shard. If set to false, the indexing task will attempt to reset the current sequence number (or not), depending on the value of `resetOffsetAutomatically`. | no (default == false) | | `workerThreads` | Integer | The number of threads that will be used by the supervisor for asynchronous operations. | no (default == min(10, taskCount)) | | `chatThreads` | Integer | The number of threads that will be used for communicating with indexing tasks. | no (default == min(10, taskCount * replicas)) | diff --git a/integration-tests/README.md b/integration-tests/README.md index ec782d2ceb7b..b8a03c7b1259 100644 --- a/integration-tests/README.md +++ b/integration-tests/README.md @@ -68,6 +68,21 @@ can either be 8 or 11. Druid's configuration (using Docker) can be overrided by providing -Doverride.config.path=. The file must contain one property per line, the key must start with `druid_` and the format should be snake case. +## Debugging Druid while running tests + +For your convenience, Druid processes running inside Docker have debugging enabled and the following ports have +been made available to attach your remote debugger (such as via IntelliJ IDEA's Remote Configuration): + +- Overlord process at port 5009 +- Middlemanager process at port 5008 +- Historical process at port 5007 +- Coordinator process at port 5006 +- Broker process at port 5005 +- Router process at port 5004 +- Router with custom check tls process at port 5003 +- Router with no client auth tls process at port 5002 +- Router with permissive tls process at port 5001 + Running Tests Using A Quickstart Cluster ------------------- @@ -152,20 +167,26 @@ The integration test that indexes from Cloud or uses Cloud as deep storage is no of the integration test run discussed above. Running these tests requires the user to provide their own Cloud. -Currently, the integration test supports Google Cloud Storage, Amazon S3, and Microsoft Azure. -These can be run by providing "gcs-deep-storage", "s3-deep-storage", or "azure-deep-storage" -to -Dgroups for Google Cloud Storage, Amazon S3, and Microsoft Azure respectively. Note that only +Currently, the integration test supports Amazon Kinesis, Google Cloud Storage, Amazon S3, and Microsoft Azure. +These can be run by providing "kinesis-index", "gcs-deep-storage", "s3-deep-storage", or "azure-deep-storage" +to -Dgroups for Amazon Kinesis, Google Cloud Storage, Amazon S3, and Microsoft Azure respectively. Note that only one group should be run per mvn command. -In addition to specifying the -Dgroups to mvn command, the following will need to be provided: +For all of the Cloud Integration tests, the following will also need to be provided: +1) Provide -Doverride.config.path= with your Cloud credentials/configs set. See +integration-tests/docker/environment-configs/override-examples/ directory for env vars to provide for each Cloud. + +For Amazon Kinesis, the following will also need to be provided: +1) Provide -Ddruid.test.config.streamEndpoint= with the endpoint of your stream set. +For example, kinesis.us-east-1.amazonaws.com + +For Google Cloud Storage, Amazon S3, and Microsoft Azure, the following will also need to be provided: 1) Set the bucket and path for your test data. This can be done by setting -Ddruid.test.config.cloudBucket and -Ddruid.test.config.cloudPath in the mvn command or setting "cloud_bucket" and "cloud_path" in the config file. 2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json located in integration-tests/src/test/resources/data/batch_index to your Cloud storage at the location set in step 1. -3) Provide -Doverride.config.path= with your Cloud credentials/configs set. See -integration-tests/docker/environment-configs/override-examples/ directory for env vars to provide for each Cloud storage. -For running Google Cloud Storage, in addition to the above, you will also have to: +For Google Cloud Storage, in addition to the above, you will also have to: 1) Provide -Dresource.file.dir.path= with folder that contains GOOGLE_APPLICATION_CREDENTIALS file For example, to run integration test for Google Cloud Storage: diff --git a/integration-tests/docker/environment-configs/broker b/integration-tests/docker/environment-configs/broker index b8794d4c0bd7..ae2d5611f241 100644 --- a/integration-tests/docker/environment-configs/broker +++ b/integration-tests/docker/environment-configs/broker @@ -21,7 +21,7 @@ DRUID_SERVICE=broker DRUID_LOG_PATH=/shared/logs/broker.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC +SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 # Druid configs druid_processing_buffer_sizeBytes=25000000 diff --git a/integration-tests/docker/environment-configs/coordinator b/integration-tests/docker/environment-configs/coordinator index de779f62db7d..6bd0260b8131 100644 --- a/integration-tests/docker/environment-configs/coordinator +++ b/integration-tests/docker/environment-configs/coordinator @@ -21,7 +21,7 @@ DRUID_SERVICE=coordinator DRUID_LOG_PATH=/shared/logs/coordinator.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC +SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006 # Druid configs druid_metadata_storage_type=mysql diff --git a/integration-tests/docker/environment-configs/historical b/integration-tests/docker/environment-configs/historical index 1f74b0ce2678..a2fcf33a6665 100644 --- a/integration-tests/docker/environment-configs/historical +++ b/integration-tests/docker/environment-configs/historical @@ -21,7 +21,7 @@ DRUID_SERVICE=historical DRUID_LOG_PATH=/shared/logs/historical.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC +SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007 # Druid configs druid_processing_buffer_sizeBytes=25000000 diff --git a/integration-tests/docker/environment-configs/middlemanager b/integration-tests/docker/environment-configs/middlemanager index c37c3fee8859..9cbe41bce8f8 100644 --- a/integration-tests/docker/environment-configs/middlemanager +++ b/integration-tests/docker/environment-configs/middlemanager @@ -21,7 +21,7 @@ DRUID_SERVICE=middleManager DRUID_LOG_PATH=/shared/logs/middlemanager.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC +SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008 # Druid configs druid_server_http_numThreads=100 diff --git a/integration-tests/docker/environment-configs/overlord b/integration-tests/docker/environment-configs/overlord index d86eb196f566..ebb3d5bf18e4 100644 --- a/integration-tests/docker/environment-configs/overlord +++ b/integration-tests/docker/environment-configs/overlord @@ -21,7 +21,7 @@ DRUID_SERVICE=overlord DRUID_LOG_PATH=/shared/logs/overlord.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC +SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009 # Druid configs druid_metadata_storage_type=mysql diff --git a/integration-tests/docker/environment-configs/override-examples/kinesis b/integration-tests/docker/environment-configs/override-examples/kinesis new file mode 100644 index 000000000000..33d4bac48ebe --- /dev/null +++ b/integration-tests/docker/environment-configs/override-examples/kinesis @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +druid_kinesis_accessKey= +druid_kinesis_secretKey= +AWS_REGION= +druid_extensions_loadList=["druid-kinesis-indexing-service"] \ No newline at end of file diff --git a/integration-tests/docker/environment-configs/router b/integration-tests/docker/environment-configs/router index b3636b72ceb8..f25b23ee8cfd 100644 --- a/integration-tests/docker/environment-configs/router +++ b/integration-tests/docker/environment-configs/router @@ -21,7 +21,7 @@ DRUID_SERVICE=router DRUID_LOG_PATH=/shared/logs/router.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseG1GC +SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5004 # Druid configs druid_auth_basic_common_cacheDirectory=/tmp/authCache/router diff --git a/integration-tests/docker/environment-configs/router-custom-check-tls b/integration-tests/docker/environment-configs/router-custom-check-tls index 07b072495a1e..ece8531d677c 100644 --- a/integration-tests/docker/environment-configs/router-custom-check-tls +++ b/integration-tests/docker/environment-configs/router-custom-check-tls @@ -21,7 +21,7 @@ DRUID_SERVICE=router DRUID_LOG_PATH=/shared/logs/router-custom-check-tls.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails +SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5003 # Druid configs druid_plaintextPort=8891 diff --git a/integration-tests/docker/environment-configs/router-no-client-auth-tls b/integration-tests/docker/environment-configs/router-no-client-auth-tls index bc6959cf6134..4b703bac5ee7 100644 --- a/integration-tests/docker/environment-configs/router-no-client-auth-tls +++ b/integration-tests/docker/environment-configs/router-no-client-auth-tls @@ -21,7 +21,7 @@ DRUID_SERVICE=router DRUID_LOG_PATH=/shared/logs/router-no-client-auth-tls.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails +SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5002 # Druid configs druid_plaintextPort=8890 diff --git a/integration-tests/docker/environment-configs/router-permissive-tls b/integration-tests/docker/environment-configs/router-permissive-tls index b4beb9fc0808..41346cb15610 100644 --- a/integration-tests/docker/environment-configs/router-permissive-tls +++ b/integration-tests/docker/environment-configs/router-permissive-tls @@ -21,7 +21,7 @@ DRUID_SERVICE=router DRUID_LOG_PATH=/shared/logs/router-permissive-tls.log # JAVA OPTS -SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails +SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5001 # Druid configs druid_plaintextPort=8889 diff --git a/integration-tests/docker/tls/generate-server-certs-and-keystores.sh b/integration-tests/docker/tls/generate-server-certs-and-keystores.sh index 8f38be303a8d..6c40a0704a38 100755 --- a/integration-tests/docker/tls/generate-server-certs-and-keystores.sh +++ b/integration-tests/docker/tls/generate-server-certs-and-keystores.sh @@ -17,6 +17,12 @@ cd /tls +FILE_CHECK_IF_RAN=/tls/server.key +if [ -f "$FILE_CHECK_IF_RAN" ]; then + echo "Using existing certs/keys since /tls/server.key exists. Skipping generation (most likely this script was ran previously). To generate new certs, delete /tls/server.key" + exit +fi + rm -f cert_db.txt touch cert_db.txt diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index d9cc62d6a8c3..b86a1d41f9cb 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -32,6 +32,32 @@ + + com.amazonaws + amazon-kinesis-producer + 0.13.1 + + + com.amazonaws + aws-java-sdk-kinesis + ${aws.sdk.version} + + + + com.amazonaws + aws-java-sdk-core + + + + + com.amazonaws + aws-java-sdk-core + ${aws.sdk.version} + + + commons-codec + commons-codec + org.apache.druid druid-core @@ -49,6 +75,12 @@ ${project.parent.version} runtime + + org.apache.druid.extensions + druid-kinesis-indexing-service + ${project.parent.version} + runtime + org.apache.druid.extensions druid-azure-extensions @@ -207,6 +239,38 @@ com.google.code.findbugs jsr305 + + com.github.docker-java + docker-java + 3.2.0 + + + com.github.docker-java + docker-java-transport-jersey + + + io.netty + netty-transport-native-kqueue + + + + + com.github.docker-java + docker-java-transport-netty + 3.2.0 + + + com.github.docker-java + docker-java-api + 3.2.0 + + + io.netty + netty-transport-native-kqueue + ${netty4.version} + osx-x86_64 + runtime + diff --git a/integration-tests/run_cluster.sh b/integration-tests/run_cluster.sh index 87fd8540a482..04d166df7dde 100755 --- a/integration-tests/run_cluster.sh +++ b/integration-tests/run_cluster.sh @@ -71,6 +71,9 @@ $ For druid-hdfs-storage mkdir -p $SHARED_DIR/docker/extensions/druid-hdfs-storage mv $SHARED_DIR/docker/lib/druid-hdfs-storage-* $SHARED_DIR/docker/extensions/druid-hdfs-storage + # For druid-kinesis-indexing-service + mkdir -p $SHARED_DIR/docker/extensions/druid-kinesis-indexing-service + mv $SHARED_DIR/docker/lib/druid-kinesis-indexing-service-* $SHARED_DIR/docker/extensions/druid-kinesis-indexing-service # Pull Hadoop dependency if needed if [ -n "$DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER" == true ] @@ -200,32 +203,32 @@ fi docker run -d --privileged --net druid-it-net --ip 172.172.172.2 ${COMMON_ENV} --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -p 9093:9093 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $SERVICE_SUPERVISORDS_DIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster # Start MYSQL - docker run -d --privileged --net druid-it-net --ip 172.172.172.3 ${COMMON_ENV} --name druid-metadata-storage -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster + docker run -d --privileged --net druid-it-net --ip 172.172.172.3 ${COMMON_ENV} --name druid-metadata-storage -p 3306:3306 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster # Start Overlord - docker run -d --privileged --net druid-it-net --ip 172.172.172.4 ${COMMON_ENV} ${OVERLORD_ENV} ${OVERRIDE_ENV} --name druid-overlord -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster + docker run -d --privileged --net druid-it-net --ip 172.172.172.4 ${COMMON_ENV} ${OVERLORD_ENV} ${OVERRIDE_ENV} --name druid-overlord -p 5009:5009 -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Coordinator - docker run -d --privileged --net druid-it-net --ip 172.172.172.5 ${COMMON_ENV} ${COORDINATOR_ENV} ${OVERRIDE_ENV} --name druid-coordinator -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster + docker run -d --privileged --net druid-it-net --ip 172.172.172.5 ${COMMON_ENV} ${COORDINATOR_ENV} ${OVERRIDE_ENV} --name druid-coordinator -p 5006:5006 -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Historical - docker run -d --privileged --net druid-it-net --ip 172.172.172.6 ${COMMON_ENV} ${HISTORICAL_ENV} ${OVERRIDE_ENV} --name druid-historical -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster + docker run -d --privileged --net druid-it-net --ip 172.172.172.6 ${COMMON_ENV} ${HISTORICAL_ENV} ${OVERRIDE_ENV} --name druid-historical -p 5007:5007 -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Middlemanger - docker run -d --privileged --net druid-it-net --ip 172.172.172.7 ${COMMON_ENV} ${MIDDLEMANAGER_ENV} ${OVERRIDE_ENV} --name druid-middlemanager -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster + docker run -d --privileged --net druid-it-net --ip 172.172.172.7 ${COMMON_ENV} ${MIDDLEMANAGER_ENV} ${OVERRIDE_ENV} --name druid-middlemanager -p 5008:5008 -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster # Start Broker - docker run -d --privileged --net druid-it-net --ip 172.172.172.8 ${COMMON_ENV} ${BROKER_ENV} ${OVERRIDE_ENV} --name druid-broker -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster + docker run -d --privileged --net druid-it-net --ip 172.172.172.8 ${COMMON_ENV} ${BROKER_ENV} ${OVERRIDE_ENV} --name druid-broker -p 5005:5005 -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster # Start Router - docker run -d --privileged --net druid-it-net --ip 172.172.172.9 ${COMMON_ENV} ${ROUTER_ENV} ${OVERRIDE_ENV} --name druid-router -p 8888:8888 -p 9088:9088 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster + docker run -d --privileged --net druid-it-net --ip 172.172.172.9 ${COMMON_ENV} ${ROUTER_ENV} ${OVERRIDE_ENV} --name druid-router -p 8888:8888 -p 5004:5004 -p 9088:9088 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster # Start Router with permissive TLS settings (client auth enabled, no hostname verification, no revocation check) - docker run -d --privileged --net druid-it-net --ip 172.172.172.10 ${COMMON_ENV} ${ROUTER_PERMISSIVE_TLS_ENV} ${OVERRIDE_ENV} --name druid-router-permissive-tls -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster + docker run -d --privileged --net druid-it-net --ip 172.172.172.10 ${COMMON_ENV} ${ROUTER_PERMISSIVE_TLS_ENV} ${OVERRIDE_ENV} --name druid-router-permissive-tls -p 5001:5001 -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster # Start Router with TLS but no client auth - docker run -d --privileged --net druid-it-net --ip 172.172.172.11 ${COMMON_ENV} ${ROUTER_NO_CLIENT_AUTH_TLS_ENV} ${OVERRIDE_ENV} --name druid-router-no-client-auth-tls -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster + docker run -d --privileged --net druid-it-net --ip 172.172.172.11 ${COMMON_ENV} ${ROUTER_NO_CLIENT_AUTH_TLS_ENV} ${OVERRIDE_ENV} --name druid-router-no-client-auth-tls -p 5002:5002 -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster # Start Router with custom TLS cert checkers - docker run -d --privileged --net druid-it-net --ip 172.172.172.12 ${COMMON_ENV} ${ROUTER_CUSTOM_CHECK_TLS_ENV} ${OVERRIDE_ENV} --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster + docker run -d --privileged --net druid-it-net --ip 172.172.172.12 ${COMMON_ENV} ${ROUTER_CUSTOM_CHECK_TLS_ENV} ${OVERRIDE_ENV} --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 5003:5003 -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster } \ No newline at end of file diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java index 769b71216114..1fec42c0e445 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java @@ -57,6 +57,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide private String password; private String cloudBucket; private String cloudPath; + private String streamEndpoint; @JsonCreator ConfigFileConfigProvider(@JsonProperty("configFile") String configFile) @@ -192,6 +193,7 @@ private void loadProperties(String configFile) cloudBucket = props.get("cloud_bucket"); cloudPath = props.get("cloud_path"); + streamEndpoint = props.get("stream_endpoint"); LOG.info("router: [%s], [%s]", routerUrl, routerTLSUrl); LOG.info("broker: [%s], [%s]", brokerUrl, brokerTLSUrl); @@ -354,6 +356,12 @@ public String getCloudPath() return cloudPath; } + @Override + public String getStreamEndpoint() + { + return streamEndpoint; + } + @Override public Map getProperties() { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java index 83d80e7870a8..e33e12188d5d 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java @@ -46,6 +46,9 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider @JsonProperty private String cloudBucket; + @JsonProperty + private String streamEndpoint; + @Override public IntegrationTestingConfig get() { @@ -229,6 +232,12 @@ public String getCloudPath() { return cloudPath; } + + @Override + public String getStreamEndpoint() + { + return streamEndpoint; + } }; } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java index d178f90dd1d4..17f2aab844a7 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java @@ -88,4 +88,6 @@ default String getKafkaInternalHost() String getCloudBucket(); String getCloudPath(); + + String getStreamEndpoint(); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 6ad64116351f..20f6a76540d5 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -26,6 +26,7 @@ import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; @@ -287,6 +288,100 @@ public void shutdownSupervisor(String id) } } + public SupervisorStateManager.BasicState getSupervisorStatus(String id) + { + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.GET, + new URL(StringUtils.format( + "%ssupervisor/%s/status", + getIndexerURL(), + StringUtils.urlEncode(id) + )) + ), + StatusResponseHandler.getInstance() + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while getting supervisor status, response [%s %s]", + response.getStatus(), + response.getContent() + ); + } + Map responseData = jsonMapper.readValue( + response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + Map payload = jsonMapper.convertValue( + responseData.get("payload"), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + String state = (String) payload.get("state"); + LOG.info("Supervisor id[%s] has state [%s]", id, state); + return SupervisorStateManager.BasicState.valueOf(state); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void suspendSupervisor(String id) + { + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.POST, + new URL(StringUtils.format( + "%ssupervisor/%s/suspend", + getIndexerURL(), + StringUtils.urlEncode(id) + )) + ), + StatusResponseHandler.getInstance() + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while suspending supervisor, response [%s %s]", + response.getStatus(), + response.getContent() + ); + } + LOG.info("Suspended supervisor with id[%s]", id); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void resumeSupervisor(String id) + { + try { + StatusResponseHolder response = httpClient.go( + new Request( + HttpMethod.POST, + new URL(StringUtils.format( + "%ssupervisor/%s/resume", + getIndexerURL(), + StringUtils.urlEncode(id) + )) + ), + StatusResponseHandler.getInstance() + ).get(); + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while resuming supervisor, response [%s %s]", + response.getStatus(), + response.getContent() + ); + } + LOG.info("Resumed supervisor with id[%s]", id); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + private StatusResponseHolder makeRequest(HttpMethod method, String url) { try { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java new file mode 100644 index 000000000000..4c6518d535bc --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.model.Container; +import com.github.dockerjava.core.DockerClientBuilder; +import com.github.dockerjava.netty.NettyDockerCmdExecFactory; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.StatusResponseHandler; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.guice.TestClient; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.net.URL; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +public class DruidClusterAdminClient +{ + private static final Logger LOG = new Logger(DruidClusterAdminClient.class); + private static final String COORDINATOR_DOCKER_CONTAINER_NAME = "/druid-coordinator"; + private static final String HISTORICAL_DOCKER_CONTAINER_NAME = "/druid-historical"; + private static final String INDEXER_DOCKER_CONTAINER_NAME = "/druid-overlord"; + private static final String BROKERR_DOCKER_CONTAINER_NAME = "/druid-broker"; + private static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router"; + private static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager"; + + private final HttpClient httpClient; + private IntegrationTestingConfig config; + + @Inject + DruidClusterAdminClient( + @TestClient HttpClient httpClient, + IntegrationTestingConfig config + ) + { + this.httpClient = httpClient; + this.config = config; + } + + public void restartCoordinatorContainer() + { + restartDockerContainer(COORDINATOR_DOCKER_CONTAINER_NAME); + } + + public void restartHistoricalContainer() + { + restartDockerContainer(HISTORICAL_DOCKER_CONTAINER_NAME); + } + + public void restartIndexerContainer() + { + restartDockerContainer(INDEXER_DOCKER_CONTAINER_NAME); + } + + public void restartBrokerContainer() + { + restartDockerContainer(BROKERR_DOCKER_CONTAINER_NAME); + } + + public void restartRouterContainer() + { + restartDockerContainer(ROUTER_DOCKER_CONTAINER_NAME); + } + + public void restartMiddleManagerContainer() + { + restartDockerContainer(MIDDLEMANAGER_DOCKER_CONTAINER_NAME); + } + + public void waitUntilCoordinatorReady() + { + waitUntilInstanceReady(config.getCoordinatorUrl()); + } + + public void waitUntilHistoricalReady() + { + waitUntilInstanceReady(config.getHistoricalUrl()); + } + + public void waitUntilIndexerReady() + { + waitUntilInstanceReady(config.getIndexerUrl()); + } + + public void waitUntilBrokerReady() + { + waitUntilInstanceReady(config.getBrokerUrl()); + } + + public void waitUntilRouterReady() + { + waitUntilInstanceReady(config.getRouterUrl()); + } + + private void restartDockerContainer(String serviceName) + { + DockerClient dockerClient = DockerClientBuilder.getInstance() + .withDockerCmdExecFactory((new NettyDockerCmdExecFactory()) + .withConnectTimeout(10 * 1000)) + .build(); + List containers = dockerClient.listContainersCmd().exec(); + Optional containerName = containers.stream() + .filter(container -> Arrays.asList(container.getNames()).contains(serviceName)) + .findFirst() + .map(container -> container.getId()); + + if (!containerName.isPresent()) { + LOG.error("Cannot find docker container for " + serviceName); + throw new ISE("Cannot find docker container for " + serviceName); + } + dockerClient.restartContainerCmd(containerName.get()).exec(); + } + + private void waitUntilInstanceReady(final String host) + { + ITRetryUtil.retryUntilTrue( + () -> { + try { + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.GET, new URL(StringUtils.format("%s/status/health", host))), + StatusResponseHandler.getInstance() + ).get(); + + LOG.info("%s %s", response.getStatus(), response.getContent()); + return response.getStatus().equals(HttpResponseStatus.OK); + } + catch (Throwable e) { + LOG.error(e, ""); + return false; + } + }, + "Waiting for instance to be ready: [" + host + "]" + ); + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java new file mode 100644 index 000000000000..bc5ace2d1b48 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest; +import com.amazonaws.services.kinesis.model.AddTagsToStreamResult; +import com.amazonaws.services.kinesis.model.CreateStreamResult; +import com.amazonaws.services.kinesis.model.DeleteStreamResult; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.ScalingType; +import com.amazonaws.services.kinesis.model.StreamDescription; +import com.amazonaws.services.kinesis.model.StreamStatus; +import com.amazonaws.services.kinesis.model.UpdateShardCountRequest; +import com.amazonaws.services.kinesis.model.UpdateShardCountResult; +import com.amazonaws.util.AwsHostNameUtils; +import org.apache.druid.java.util.common.ISE; + +import java.io.FileInputStream; +import java.util.Map; +import java.util.Properties; + +public class KinesisAdminClient +{ + private AmazonKinesis amazonKinesis; + + public KinesisAdminClient(String endpoint) throws Exception + { + String pathToConfigFile = System.getProperty("override.config.path"); + Properties prop = new Properties(); + prop.load(new FileInputStream(pathToConfigFile)); + + AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider( + new BasicAWSCredentials( + prop.getProperty("druid_kinesis_accessKey"), + prop.getProperty("druid_kinesis_secretKey") + ) + ); + amazonKinesis = AmazonKinesisClientBuilder.standard() + .withCredentials(credentials) + .withClientConfiguration(new ClientConfiguration()) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + endpoint, + AwsHostNameUtils.parseRegion( + endpoint, + null + ) + )).build(); + } + + public void createStream(String streamName, int shardCount, Map tags) + { + CreateStreamResult createStreamResult = amazonKinesis.createStream(streamName, shardCount); + if (createStreamResult.getSdkHttpMetadata().getHttpStatusCode() != 200) { + throw new ISE("Cannot create stream for integration test"); + } + if (tags != null && !tags.isEmpty()) { + AddTagsToStreamRequest addTagsToStreamRequest = new AddTagsToStreamRequest(); + addTagsToStreamRequest.setStreamName(streamName); + addTagsToStreamRequest.setTags(tags); + AddTagsToStreamResult addTagsToStreamResult = amazonKinesis.addTagsToStream(addTagsToStreamRequest); + if (addTagsToStreamResult.getSdkHttpMetadata().getHttpStatusCode() != 200) { + throw new ISE("Cannot tag stream for integration test"); + } + } + + } + + public void deleteStream(String streamName) + { + DeleteStreamResult deleteStreamResult = amazonKinesis.deleteStream(streamName); + if (deleteStreamResult.getSdkHttpMetadata().getHttpStatusCode() != 200) { + throw new ISE("Cannot delete stream for integration test"); + } + } + + /** + * This method updates the shard count of {@param streamName} to have a final shard count of {@param newShardCount} + * If {@param blocksUntilStarted} is set to true, then this method will blocks until the resharding + * started (but not nessesary finished), otherwise, the method will returns right after issue the reshard command + */ + public void updateShardCount(String streamName, int newShardCount, boolean blocksUntilStarted) + { + int originalShardCount = getStreamShardCount(streamName); + UpdateShardCountRequest updateShardCountRequest = new UpdateShardCountRequest(); + updateShardCountRequest.setStreamName(streamName); + updateShardCountRequest.setTargetShardCount(newShardCount); + updateShardCountRequest.setScalingType(ScalingType.UNIFORM_SCALING); + UpdateShardCountResult updateShardCountResult = amazonKinesis.updateShardCount(updateShardCountRequest); + if (updateShardCountResult.getSdkHttpMetadata().getHttpStatusCode() != 200) { + throw new ISE("Cannot update stream's shard count for integration test"); + } + if (blocksUntilStarted) { + // Wait until the resharding started (or finished) + ITRetryUtil.retryUntil( + () -> { + StreamDescription streamDescription = getStreamDescription(streamName); + int updatedShardCount = getStreamShardCount(streamDescription); + return verifyStreamStatus(streamDescription, StreamStatus.UPDATING) || + (verifyStreamStatus(streamDescription, StreamStatus.ACTIVE) && updatedShardCount > originalShardCount); + }, + true, + 30, + 30, + "Kinesis stream resharding to start (or finished)" + ); + } + } + + public boolean isStreamActive(String streamName) + { + StreamDescription streamDescription = getStreamDescription(streamName); + return verifyStreamStatus(streamDescription, StreamStatus.ACTIVE); + } + + public int getStreamShardCount(String streamName) + { + StreamDescription streamDescription = getStreamDescription(streamName); + return getStreamShardCount(streamDescription); + } + + private boolean verifyStreamStatus(StreamDescription streamDescription, StreamStatus streamStatusToCheck) + { + return streamStatusToCheck.toString().equals(streamDescription.getStreamStatus()); + } + + private int getStreamShardCount(StreamDescription streamDescription) + { + return streamDescription.getShards().size(); + } + + private StreamDescription getStreamDescription(String streamName) + { + DescribeStreamResult describeStreamResult = amazonKinesis.describeStream(streamName); + if (describeStreamResult.getSdkHttpMetadata().getHttpStatusCode() != 200) { + throw new ISE("Cannot get stream description for integration test"); + } + return describeStreamResult.getStreamDescription(); + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java new file mode 100644 index 000000000000..09950f132047 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.util.AwsHostNameUtils; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.FileInputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +public class KinesisEventWriter implements StreamEventWriter +{ + private static final Logger LOG = new Logger(KinesisEventWriter.class); + + private final KinesisProducer kinesisProducer; + + public KinesisEventWriter(String endpoint, boolean aggregate) throws Exception + { + String pathToConfigFile = System.getProperty("override.config.path"); + Properties prop = new Properties(); + prop.load(new FileInputStream(pathToConfigFile)); + + AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider( + new BasicAWSCredentials( + prop.getProperty("druid_kinesis_accessKey"), + prop.getProperty("druid_kinesis_secretKey") + ) + ); + + KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration() + .setCredentialsProvider(credentials) + .setRegion(AwsHostNameUtils.parseRegion(endpoint, null)) + .setRequestTimeout(600000L) + .setConnectTimeout(300000L) + .setRecordTtl(9223372036854775807L) + .setMetricsLevel("none") + .setAggregationEnabled(aggregate); + + this.kinesisProducer = new KinesisProducer(kinesisProducerConfiguration); + } + + @Override + public void write(String streamName, String event) + { + kinesisProducer.addUserRecord( + streamName, + DigestUtils.sha1Hex(event), + ByteBuffer.wrap(event.getBytes(StandardCharsets.UTF_8)) + ); + } + + @Override + public void shutdown() + { + LOG.info("Shutting down Kinesis client"); + kinesisProducer.flushSync(); + } + + @Override + public void flush() + { + kinesisProducer.flushSync(); + ITRetryUtil.retryUntil( + () -> kinesisProducer.getOutstandingRecordsCount() == 0, + true, + 10000, + 30, + "Waiting for all Kinesis writes to be flushed" + ); + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java new file mode 100644 index 000000000000..1bfd6b675919 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +public interface StreamEventWriter +{ + void write(String topic, String event); + + void shutdown(); + + void flush(); +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java new file mode 100644 index 000000000000..a232c59a8d65 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import org.joda.time.DateTime; + +public interface StreamGenerator +{ + void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds); + + void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime); + + void shutdown(); +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java new file mode 100644 index 000000000000..bb56c794a152 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import org.apache.druid.java.util.common.DateTimes; +import org.joda.time.DateTime; + +import java.util.UUID; + +public class StreamVerifierEventGenerator extends SyntheticStreamGenerator +{ + public StreamVerifierEventGenerator(int eventsPerSeconds, long cyclePaddingMs) + { + super(eventsPerSeconds, cyclePaddingMs); + } + + @Override + Object getEvent(int i, DateTime timestamp) + { + return StreamVerifierSyntheticEvent.of( + UUID.randomUUID().toString(), + timestamp.getMillis(), + DateTimes.nowUtc().getMillis(), + i, + i == getEventsPerSecond() ? getSumOfEventSequence(getEventsPerSecond()) : null, + i == 1 + ); + } + + + /** + * Assumes the first number in the sequence is 1, incrementing by 1, until numEvents. + */ + private long getSumOfEventSequence(int numEvents) + { + return (numEvents * (1 + numEvents)) / 2; + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java new file mode 100644 index 000000000000..e8c314a6b4bd --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class StreamVerifierSyntheticEvent +{ + private String id; + private long groupingTimestamp; + private long insertionTimestamp; + private long sequenceNumber; + private Long expectedSequenceNumberSum; + private boolean firstEvent; + + public StreamVerifierSyntheticEvent( + String id, + long groupingTimestamp, + long insertionTimestamp, + long sequenceNumber, + Long expectedSequenceNumberSum, + boolean firstEvent + ) + { + this.id = id; + this.groupingTimestamp = groupingTimestamp; + this.insertionTimestamp = insertionTimestamp; + this.sequenceNumber = sequenceNumber; + this.expectedSequenceNumberSum = expectedSequenceNumberSum; + this.firstEvent = firstEvent; + } + + @JsonProperty + public String getId() + { + return id; + } + + @JsonProperty + public long getGroupingTimestamp() + { + return groupingTimestamp; + } + + @JsonProperty + public long getInsertionTimestamp() + { + return insertionTimestamp; + } + + @JsonProperty + public long getSequenceNumber() + { + return sequenceNumber; + } + + @JsonProperty + public Long getExpectedSequenceNumberSum() + { + return expectedSequenceNumberSum; + } + + @JsonProperty + public Integer getFirstEventFlag() + { + return firstEvent ? 1 : null; + } + + public static StreamVerifierSyntheticEvent of( + String id, + long groupingTimestamp, + long insertionTimestamp, + long sequenceNumber, + Long expectedSequenceNumberSum, + boolean firstEvent + ) + { + return new StreamVerifierSyntheticEvent( + id, + groupingTimestamp, + insertionTimestamp, + sequenceNumber, + expectedSequenceNumberSum, + firstEvent + ); + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java new file mode 100644 index 000000000000..748a6ed2d3bc --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.logger.Logger; +import org.joda.time.DateTime; + +public abstract class SyntheticStreamGenerator implements StreamGenerator +{ + private static final Logger log = new Logger(SyntheticStreamGenerator.class); + static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + static { + MAPPER.setInjectableValues( + new InjectableValues.Std() + .addValue(ObjectMapper.class.getName(), MAPPER) + ); + MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); + } + + public int getEventsPerSecond() + { + return eventsPerSecond; + } + + private final int eventsPerSecond; + + // When calculating rates, leave this buffer to minimize overruns where we're still writing messages from the previous + // second. If the generator finishes sending [eventsPerSecond] events and the second is not up, it will wait for the next + // second to begin. + private final long cyclePaddingMs; + + public SyntheticStreamGenerator(int eventsPerSecond, long cyclePaddingMs) + { + this.eventsPerSecond = eventsPerSecond; + this.cyclePaddingMs = cyclePaddingMs; + } + + abstract Object getEvent(int row, DateTime timestamp); + + @Override + public void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds) + { + start(streamTopic, streamEventWriter, totalNumberOfSeconds, null); + } + + @Override + public void start(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime) + { + // The idea here is that we will send [eventsPerSecond] events that will either use [nowFlooredToSecond] + // or the [overrrideFirstEventTime] as the primary timestamp. + // Having a fixed number of events that use the same timestamp will help in allowing us to determine if any events + // were dropped or duplicated. We will try to space the event generation over the remainder of the second so that it + // roughly completes at the top of the second, but if it doesn't complete, it will still send the remainder of the + // events with the original timestamp, even after wall time has moved onto the next second. + DateTime nowCeilingToSecond = DateTimes.nowUtc().secondOfDay().roundCeilingCopy(); + DateTime eventTimestamp = overrrideFirstEventTime == null ? nowCeilingToSecond : overrrideFirstEventTime; + int seconds = 0; + + while (true) { + try { + long sleepMillis = nowCeilingToSecond.getMillis() - DateTimes.nowUtc().getMillis(); + if (sleepMillis > 0) { + log.info("Waiting %s ms for next run cycle (at %s)", sleepMillis, nowCeilingToSecond); + Thread.sleep(sleepMillis); + continue; + } + + log.info( + "Beginning run cycle with %s events, target completion time: %s", + eventsPerSecond, + nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs) + ); + + for (int i = 1; i <= eventsPerSecond; i++) { + streamEventWriter.write(streamTopic, MAPPER.writeValueAsString(getEvent(i, eventTimestamp))); + + long sleepTime = calculateSleepTimeMs(eventsPerSecond - i, nowCeilingToSecond); + if ((i <= 100 && i % 10 == 0) || i % 100 == 0) { + log.info("Event: %s/%s, sleep time: %s ms", i, eventsPerSecond, sleepTime); + } + + if (sleepTime > 0) { + Thread.sleep(sleepTime); + } + } + + nowCeilingToSecond = nowCeilingToSecond.plusSeconds(1); + eventTimestamp = eventTimestamp.plusSeconds(1); + seconds++; + + log.info( + "Finished writing %s events, current time: %s - updating next timestamp to: %s", + eventsPerSecond, + DateTimes.nowUtc(), + nowCeilingToSecond + ); + + if (seconds >= totalNumberOfSeconds) { + streamEventWriter.flush(); + log.info( + "Finished writing %s seconds", + seconds + ); + break; + } + } + catch (Exception e) { + throw new RuntimeException("Exception in event generation loop", e); + } + } + } + + @Override + public void shutdown() + { + } + + /** + * Dynamically adjust delay between messages to spread them out over the remaining time left in the second. + */ + private long calculateSleepTimeMs(long eventsRemaining, DateTime secondBeingProcessed) + { + if (eventsRemaining == 0) { + return 0; + } + + DateTime now = DateTimes.nowUtc(); + DateTime nextSecondToProcessMinusBuffer = secondBeingProcessed.plusSeconds(1).minus(cyclePaddingMs); + + if (nextSecondToProcessMinusBuffer.isBefore(now)) { + return 0; // We're late!! Write messages as fast as you can + } + + return (nextSecondToProcessMinusBuffer.getMillis() - now.getMillis()) / eventsRemaining; + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java new file mode 100644 index 000000000000..4fea67d7be57 --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.util.HashMap; +import java.util.Map; + +public class WikipediaStreamEventStreamGenerator extends SyntheticStreamGenerator +{ + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"); + + public WikipediaStreamEventStreamGenerator(int eventsPerSeconds, long cyclePaddingMs) + { + super(eventsPerSeconds, cyclePaddingMs); + } + + @Override + Object getEvent(int i, DateTime timestamp) + { + Map event = new HashMap<>(); + event.put("page", "Gypsy Danger"); + event.put("language", "en"); + event.put("user", "nuclear"); + event.put("unpatrolled", "true"); + event.put("newPage", "true"); + event.put("robot", "false"); + event.put("anonymous", "false"); + event.put("namespace", "article"); + event.put("continent", "North America"); + event.put("country", "United States"); + event.put("region", "Bay Area"); + event.put("city", "San Francisco"); + event.put("timestamp", DATE_TIME_FORMATTER.print(timestamp)); + event.put("added", i); + event.put("deleted", 0); + event.put("delta", i); + return event; + } +} diff --git a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java b/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java index d6b244813bd0..53ef663b5c33 100644 --- a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java +++ b/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java @@ -20,25 +20,15 @@ package /*CHECKSTYLE.OFF: PackageName*/org.testng/*CHECKSTYLE.ON: PackageName*/; import com.google.inject.Injector; -import com.google.inject.Key; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.http.client.HttpClient; -import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.StatusResponseHandler; -import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.guice.TestClient; -import org.apache.druid.testing.utils.ITRetryUtil; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.apache.druid.testing.utils.DruidClusterAdminClient; import org.testng.internal.IConfiguration; import org.testng.internal.annotations.IAnnotationFinder; import org.testng.xml.XmlTest; -import java.net.URL; import java.util.List; /** @@ -87,14 +77,14 @@ public void run() { Injector injector = DruidTestModuleFactory.getInjector(); IntegrationTestingConfig config = injector.getInstance(IntegrationTestingConfig.class); - HttpClient client = injector.getInstance(Key.get(HttpClient.class, TestClient.class)); + DruidClusterAdminClient druidClusterAdminClient = injector.getInstance(DruidClusterAdminClient.class); - waitUntilInstanceReady(client, config.getCoordinatorUrl()); - waitUntilInstanceReady(client, config.getIndexerUrl()); - waitUntilInstanceReady(client, config.getBrokerUrl()); + druidClusterAdminClient.waitUntilCoordinatorReady(); + druidClusterAdminClient.waitUntilIndexerReady(); + druidClusterAdminClient.waitUntilBrokerReady(); String routerHost = config.getRouterUrl(); if (null != routerHost) { - waitUntilInstanceReady(client, config.getRouterUrl()); + druidClusterAdminClient.waitUntilRouterReady(); } Lifecycle lifecycle = injector.getInstance(Lifecycle.class); try { @@ -115,27 +105,5 @@ private void runTests() { super.run(); } - - public void waitUntilInstanceReady(final HttpClient client, final String host) - { - ITRetryUtil.retryUntilTrue( - () -> { - try { - StatusResponseHolder response = client.go( - new Request(HttpMethod.GET, new URL(StringUtils.format("%s/status/health", host))), - StatusResponseHandler.getInstance() - ).get(); - - LOG.info("%s %s", response.getStatus(), response.getContent()); - return response.getStatus().equals(HttpResponseStatus.OK); - } - catch (Throwable e) { - LOG.error(e, ""); - return false; - } - }, - "Waiting for instance to be ready: [" + host + "]" - ); - } } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index b076f7cf2d81..c0116f9149e4 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -26,12 +26,20 @@ public class TestNGGroup { public static final String BATCH_INDEX = "batch-index"; + public static final String HADOOP_INDEX = "hadoop-index"; + public static final String KAFKA_INDEX = "kafka-index"; + public static final String OTHER_INDEX = "other-index"; + public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index"; - // This group can only be run individually using -Dgroups=query since it requires specific test data setup. + + /** + * This group can only be run individually using -Dgroups=query since it requires specific test data setup. + */ public static final String QUERY = "query"; + public static final String REALTIME_INDEX = "realtime-index"; /** @@ -88,4 +96,12 @@ public class TestNGGroup * get the tests to work to this project's README. */ public static final String QUICKSTART_COMPATIBLE = "quickstart-compatible"; + + /** + * This group is not part of CI. To run this group, AWS kinesis configs/credentials for your AWS kinesis must be + * provided in a file. The path of the file must then be pass to mvn with -Doverride.config.path= + * See integration-tests/docker/environment-configs/override-examples/kinesis for env vars to provide. + * Kinesis stream endpoint for a region must also be pass to mvn with -Ddruid.test.config.streamEndpoint= + */ + public static final String KINESIS_INDEX = "kinesis-index"; } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java index 87a2cceece20..eea0f7fb2ae5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java @@ -55,9 +55,9 @@ abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest { private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class); - protected static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/kafka_supervisor_spec_legacy_parser.json"; - protected static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/kafka_supervisor_spec_input_format.json"; - private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json"; + protected static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json"; + protected static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json"; + private static final String QUERIES_FILE = "/indexer/stream_index_queries.json"; private static final String TOPIC_NAME = "kafka_indexing_service_topic"; private static final int NUM_EVENTS_TO_SEND = 60; @@ -137,8 +137,12 @@ void doKafkaIndexTest(String dataSourceName, String supervisorSpecPath, boolean spec = getResourceAsString(supervisorSpecPath); spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName); - spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME); - spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties)); + spec = StringUtils.replace(spec, "%%STREAM_TYPE%%", "kafka"); + spec = StringUtils.replace(spec, "%%TOPIC_KEY%%", "topic"); + spec = StringUtils.replace(spec, "%%TOPIC_VALUE%%", TOPIC_NAME); + spec = StringUtils.replace(spec, "%%USE_EARLIEST_KEY%%", "useEarliestOffset"); + spec = StringUtils.replace(spec, "%%STREAM_PROPERTIES_KEY%%", "consumerProperties"); + spec = StringUtils.replace(spec, "%%STREAM_PROPERTIES_VALUE%%", jsonMapper.writeValueAsString(consumerProperties)); LOG.info("supervisorSpec: [%s]\n", spec); } catch (Exception e) { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java index 04c52b2f97d7..f32b82433f1a 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java @@ -54,7 +54,7 @@ public void testKafka(String param) ? INDEXER_FILE_LEGACY_PARSER : INDEXER_FILE_INPUT_FORMAT; LOG.info("Starting test: ITKafkaIndexingServiceTransactionalTest"); - doKafkaIndexTest(StringUtils.format("%s_%s", DATASOURCE, param), supervisorSpecPath, false); + doKafkaIndexTest(StringUtils.format("%s_%s", DATASOURCE, param), supervisorSpecPath, true); } @AfterMethod diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java new file mode 100644 index 000000000000..b539b5d547c2 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceTest.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.testing.utils.DruidClusterAdminClient; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.KinesisAdminClient; +import org.apache.druid.testing.utils.KinesisEventWriter; +import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator; +import org.apache.druid.tests.TestNGGroup; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.Closeable; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +@Test(groups = TestNGGroup.KINESIS_INDEX) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKinesisIndexingServiceTest extends AbstractITBatchIndexTest +{ + private static final Logger LOG = new Logger(ITKinesisIndexingServiceTest.class); + private static final int KINESIS_SHARD_COUNT = 2; + // Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created + // to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method) + // The value to this tag is a timestamp that can be used by a lambda function to remove unused stream. + private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after"; + private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L; + private static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0); + private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json"; + private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json"; + private static final String QUERIES_FILE = "/indexer/stream_index_queries.json"; + // format for the querying interval + private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'"); + // format for the expected timestamp in a query response + private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); + private static final int EVENTS_PER_SECOND = 6; + private static final long CYCLE_PADDING_MS = 100; + private static final int TOTAL_NUMBER_OF_SECOND = 10; + + @Inject + private DruidClusterAdminClient druidClusterAdminClient; + + private String streamName; + private String fullDatasourceName; + private KinesisAdminClient kinesisAdminClient; + private KinesisEventWriter kinesisEventWriter; + private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator; + private Function kinesisIngestionPropsTransform; + private Function kinesisQueryPropsTransform; + private String supervisorId; + private int secondsToGenerateRemaining; + + @BeforeClass + public void beforeClass() throws Exception + { + kinesisAdminClient = new KinesisAdminClient(config.getStreamEndpoint()); + kinesisEventWriter = new KinesisEventWriter(config.getStreamEndpoint(), false); + wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS); + } + + @AfterClass + public void tearDown() + { + wikipediaStreamEventGenerator.shutdown(); + kinesisEventWriter.shutdown(); + } + + @BeforeMethod + public void before() + { + streamName = "kinesis_index_test_" + UUID.randomUUID(); + String datasource = "kinesis_indexing_service_test_" + UUID.randomUUID(); + Map tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis())); + kinesisAdminClient.createStream(streamName, KINESIS_SHARD_COUNT, tags); + ITRetryUtil.retryUntil( + () -> kinesisAdminClient.isStreamActive(streamName), + true, + 10000, + 30, + "Wait for stream active" + ); + secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND; + fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix(); + kinesisIngestionPropsTransform = spec -> { + try { + spec = StringUtils.replace( + spec, + "%%DATASOURCE%%", + fullDatasourceName + ); + spec = StringUtils.replace( + spec, + "%%STREAM_TYPE%%", + "kinesis" + ); + spec = StringUtils.replace( + spec, + "%%TOPIC_KEY%%", + "stream" + ); + spec = StringUtils.replace( + spec, + "%%TOPIC_VALUE%%", + streamName + ); + spec = StringUtils.replace( + spec, + "%%USE_EARLIEST_KEY%%", + "useEarliestSequenceNumber" + ); + spec = StringUtils.replace( + spec, + "%%STREAM_PROPERTIES_KEY%%", + "endpoint" + ); + return StringUtils.replace( + spec, + "%%STREAM_PROPERTIES_VALUE%%", + jsonMapper.writeValueAsString(config.getStreamEndpoint()) + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + kinesisQueryPropsTransform = spec -> { + try { + spec = StringUtils.replace( + spec, + "%%DATASOURCE%%", + fullDatasourceName + ); + spec = StringUtils.replace( + spec, + "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1)) + ); + spec = StringUtils.replace( + spec, + "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_QUERY_START%%", + INTERVAL_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_QUERY_END%%", + INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2)) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_RESPONSE_TIMESTAMP%%", + TIMESTAMP_FMT.print(FIRST_EVENT_TIME) + ); + spec = StringUtils.replace( + spec, + "%%TIMESERIES_ADDED%%", + Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND) + ); + return StringUtils.replace( + spec, + "%%TIMESERIES_NUMEVENTS%%", + Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND) + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + } + + @AfterMethod + public void teardown() + { + try { + kinesisEventWriter.flush(); + indexer.shutdownSupervisor(supervisorId); + } + catch (Exception e) { + // Best effort cleanup as the supervisor may have already went Bye-Bye + } + try { + unloader(fullDatasourceName); + } + catch (Exception e) { + // Best effort cleanup as the datasource may have already went Bye-Bye + } + try { + kinesisAdminClient.deleteStream(streamName); + } + catch (Exception e) { + // Best effort cleanup as the stream may have already went Bye-Bye + } + } + + @Test + public void testKinesisIndexDataWithLegacyParserStableState() throws Exception + { + try ( + final Closeable ignored1 = unloader(fullDatasourceName) + ) { + final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + supervisorId = indexer.submitSupervisor(taskSpec); + LOG.info("Submitted supervisor"); + // Start Kinesis data generator + wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME); + verifyIngestedData(supervisorId); + } + } + + @Test + public void testKinesisIndexDataWithInputFormatStableState() throws Exception + { + try ( + final Closeable ignored1 = unloader(fullDatasourceName) + ) { + final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + supervisorId = indexer.submitSupervisor(taskSpec); + LOG.info("Submitted supervisor"); + // Start Kinesis data generator + wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME); + verifyIngestedData(supervisorId); + } + } + + @Test + public void testKinesisIndexDataWithLosingCoordinator() throws Exception + { + testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady()); + } + + @Test + public void testKinesisIndexDataWithLosingOverlord() throws Exception + { + testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady()); + } + + @Test + public void testKinesisIndexDataWithLosingHistorical() throws Exception + { + testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady()); + } + + @Test + public void testKinesisIndexDataWithStartStopSupervisor() throws Exception + { + try ( + final Closeable ignored1 = unloader(fullDatasourceName) + ) { + final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + supervisorId = indexer.submitSupervisor(taskSpec); + LOG.info("Submitted supervisor"); + // Start generating half of the data + int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; + wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); + // Verify supervisor is healthy before suspension + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Suspend the supervisor + indexer.suspendSupervisor(supervisorId); + // Start generating remainning half of the data + wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); + // Resume the supervisor + indexer.resumeSupervisor(supervisorId); + // Verify supervisor is healthy after suspension + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Verify that supervisor can catch up with the stream + verifyIngestedData(supervisorId); + } + } + + @Test + public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception + { + // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT * 2 + testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT * 2); + } + + @Test + public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception + { + // Reshard the supervisor by split from KINESIS_SHARD_COUNT to KINESIS_SHARD_COUNT / 2 + testIndexWithKinesisReshardHelper(KINESIS_SHARD_COUNT / 2); + } + + private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception + { + try ( + final Closeable ignored1 = unloader(fullDatasourceName) + ) { + final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + supervisorId = indexer.submitSupervisor(taskSpec); + LOG.info("Submitted supervisor"); + // Start generating one third of the data (before restarting) + int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; + wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); + // Verify supervisor is healthy before restart + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Restart Druid process + LOG.info("Restarting Druid process"); + restartRunnable.run(); + LOG.info("Restarted Druid process"); + // Start generating one third of the data (while restarting) + int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound; + wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); + // Wait for Druid process to be available + LOG.info("Waiting for Druid process to be available"); + waitForReadyRunnable.run(); + LOG.info("Druid process is now available"); + // Start generating remainding data (after restarting) + wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)); + // Verify supervisor is healthy + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Verify that supervisor ingested all data + verifyIngestedData(supervisorId); + } + } + + private void testIndexWithKinesisReshardHelper(int newShardCount) throws Exception + { + try ( + final Closeable ignored1 = unloader(fullDatasourceName) + ) { + final String taskSpec = kinesisIngestionPropsTransform.apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + supervisorId = indexer.submitSupervisor(taskSpec); + LOG.info("Submitted supervisor"); + // Start generating one third of the data (before resharding) + int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; + wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME); + // Verify supervisor is healthy before resahrding + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Reshard the supervisor by split from KINESIS_SHARD_COUNT to newShardCount and waits until the resharding starts + kinesisAdminClient.updateShardCount(streamName, newShardCount, true); + // Start generating one third of the data (while resharding) + int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound; + wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)); + // Wait for kinesis stream to finish resharding + ITRetryUtil.retryUntil( + () -> kinesisAdminClient.isStreamActive(streamName), + true, + 10000, + 30, + "Waiting for Kinesis stream to finish resharding" + ); + // Start generating remainding data (after resharding) + wikipediaStreamEventGenerator.start(streamName, kinesisEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)); + // Verify supervisor is healthy after resahrding + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(supervisorId)), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + // Verify that supervisor can catch up with the stream + verifyIngestedData(supervisorId); + } + } + + private void verifyIngestedData(String supervisorId) throws Exception + { + // Wait for supervisor to consume events + LOG.info("Waiting for [%s] millis for Kinesis indexing tasks to consume events", WAIT_TIME_MILLIS); + Thread.sleep(WAIT_TIME_MILLIS); + // Query data + final String querySpec = kinesisQueryPropsTransform.apply(getResourceAsString(QUERIES_FILE)); + // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing + this.queryHelper.testQueriesFromString(querySpec, 2); + LOG.info("Shutting down supervisor"); + indexer.shutdownSupervisor(supervisorId); + // wait for all Kinesis indexing tasks to finish + LOG.info("Waiting for all indexing tasks to finish"); + ITRetryUtil.retryUntilTrue( + () -> (indexer.getPendingTasks().size() + + indexer.getRunningTasks().size() + + indexer.getWaitingTasks().size()) == 0, + "Waiting for Tasks Completion" + ); + // wait for segments to be handed off + ITRetryUtil.retryUntil( + () -> coordinator.areSegmentsLoaded(fullDatasourceName), + true, + 10000, + 30, + "Real-time generated segments loaded" + ); + + // this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4 + this.queryHelper.testQueriesFromString(querySpec, 2); + } + private long getSumOfEventSequence(int numEvents) + { + return (numEvents * (1 + numEvents)) / 2; + } +} diff --git a/integration-tests/src/test/resources/indexer/kafka_index_queries.json b/integration-tests/src/test/resources/indexer/stream_index_queries.json similarity index 100% rename from integration-tests/src/test/resources/indexer/kafka_index_queries.json rename to integration-tests/src/test/resources/indexer/stream_index_queries.json diff --git a/integration-tests/src/test/resources/indexer/kafka_supervisor_spec_input_format.json b/integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json similarity index 83% rename from integration-tests/src/test/resources/indexer/kafka_supervisor_spec_input_format.json rename to integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json index 4ba59afdcac7..ce9bedc84431 100644 --- a/integration-tests/src/test/resources/indexer/kafka_supervisor_spec_input_format.json +++ b/integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json @@ -1,5 +1,5 @@ { - "type": "kafka", + "type": "%%STREAM_TYPE%%", "dataSchema": { "dataSource": "%%DATASOURCE%%", "timestampSpec": { @@ -39,18 +39,18 @@ } }, "tuningConfig": { - "type": "kafka", + "type": "%%STREAM_TYPE%%", "intermediatePersistPeriod": "PT30S", "maxRowsPerSegment": 5000000, "maxRowsInMemory": 500000 }, "ioConfig": { - "topic": "%%TOPIC%%", - "consumerProperties": %%CONSUMER_PROPERTIES%%, + "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%", + "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%, "taskCount": 2, "replicas": 1, - "taskDuration": "PT2M", - "useEarliestOffset": true, + "taskDuration": "PT5M", + "%%USE_EARLIEST_KEY%%": true, "inputFormat" : { "type" : "json" } diff --git a/integration-tests/src/test/resources/indexer/kafka_supervisor_spec_legacy_parser.json b/integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json similarity index 84% rename from integration-tests/src/test/resources/indexer/kafka_supervisor_spec_legacy_parser.json rename to integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json index 511b65dcffc8..623aadf6583b 100644 --- a/integration-tests/src/test/resources/indexer/kafka_supervisor_spec_legacy_parser.json +++ b/integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json @@ -1,5 +1,5 @@ { - "type": "kafka", + "type": "%%STREAM_TYPE%%", "dataSchema": { "dataSource": "%%DATASOURCE%%", "parser": { @@ -45,17 +45,17 @@ } }, "tuningConfig": { - "type": "kafka", + "type": "%%STREAM_TYPE%%", "intermediatePersistPeriod": "PT30S", "maxRowsPerSegment": 5000000, "maxRowsInMemory": 500000 }, "ioConfig": { - "topic": "%%TOPIC%%", - "consumerProperties": %%CONSUMER_PROPERTIES%%, + "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%", + "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%, "taskCount": 2, "replicas": 1, - "taskDuration": "PT2M", - "useEarliestOffset": true + "taskDuration": "PT5M", + "%%USE_EARLIEST_KEY%%": true } }