diff --git a/.github/scripts/display_dependency_analysis_error_message.sh b/.github/scripts/analyze_dependencies_script.sh similarity index 92% rename from .github/scripts/display_dependency_analysis_error_message.sh rename to .github/scripts/analyze_dependencies_script.sh index 3899d5917b15..c92d90030f71 100755 --- a/.github/scripts/display_dependency_analysis_error_message.sh +++ b/.github/scripts/analyze_dependencies_script.sh @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +#!bin/bash + +${MVN} ${MAVEN_SKIP} dependency:analyze -DoutputXML=true -DignoreNonCompile=true -DfailOnWarning=true ${HADOOP_PROFILE} || { echo " The dependency analysis has found a dependency that is either: diff --git a/.github/scripts/license_checks_script.sh b/.github/scripts/license_checks_script.sh new file mode 100755 index 000000000000..410ac60375fa --- /dev/null +++ b/.github/scripts/license_checks_script.sh @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash + +set -e + +./.github/scripts/setup_generate_license.sh +${MVN} apache-rat:check -Prat --fail-at-end \ +-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn \ +-Drat.consoleOutput=true ${HADOOP_PROFILE} +# Generate dependency reports and checks they are valid. +mkdir -p target +distribution/bin/generate-license-dependency-reports.py . target --clean-maven-artifact-transfer --parallel 2 +distribution/bin/check-licenses.py licenses.yaml target/license-reports diff --git a/.github/scripts/setup_generate_license.sh b/.github/scripts/setup_generate_license.sh index 9e6c2eae68c6..71583bfb2b26 100755 --- a/.github/scripts/setup_generate_license.sh +++ b/.github/scripts/setup_generate_license.sh @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +#!/bin/bash + +set -e + sudo apt-get update && sudo apt-get install python3 -y curl https://bootstrap.pypa.io/pip/3.5/get-pip.py | sudo -H python3 pip3 install wheel # install wheel first explicitly diff --git a/.github/workflows/static-checks.yml b/.github/workflows/static-checks.yml index 7f4437f0129e..37e7a2225857 100644 --- a/.github/workflows/static-checks.yml +++ b/.github/workflows/static-checks.yml @@ -35,160 +35,139 @@ env: MAVEN_OPTS: -Xmx3000m jobs: - build: + static-checks: + strategy: + matrix: + java: [ 'jdk8', 'jdk11', 'jdk17' ] runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - name: setup java 8 + - name: checkout branch + uses: actions/checkout@v3 + + - name: set java version + run: | + export jdk=${{ matrix.java }} + echo "java_version=${jdk:3}" >> $GITHUB_ENV + + - name: setup ${{ matrix.java }} uses: actions/setup-java@v3 with: distribution: 'zulu' - java-version: '8' + java-version: ${{ env.java_version }} cache: 'maven' - - run: | + + - name: packaging check + run: | + ./.github/scripts/setup_generate_license.sh + ${MVN} clean install -Prat -Pdist -Pbundle-contrib-exts --fail-at-end \ + -pl '!benchmarks' ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} -Dweb.console.skip=false -T1C + + - name: script checks + # who watches the watchers? + if: ${{ matrix.java == 'jdk8' }} + run: ./check_test_suite_test.py + + - name: (openjdk11) strict compilation + if: ${{ matrix.java == 'jdk11' }} + # errorprone requires JDK 11 + # Strict compilation requires more than 2 GB + run: ${MVN} clean -DstrictCompile compile test-compile --fail-at-end ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} + + - name: maven install + if: ${{ matrix.java == 'jdk8' }} + run: | echo 'Running Maven install...' && ${MVN} clean install -q -ff -pl '!distribution,!:druid-it-image,!:druid-it-cases' ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} -T1C && ${MVN} install -q -ff -pl 'distribution' ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} - animal_sniffer_checks: - runs-on: ubuntu-latest - needs: [build] - steps: - - uses: actions/checkout@v3 - - name: setup java 8 - uses: actions/setup-java@v3 - with: - distribution: 'zulu' - java-version: '8' - cache: 'maven' - - run: ${MVN} animal-sniffer:check --fail-at-end + - name: license checks + if: ${{ matrix.java == 'jdk8' }} + run: ./.github/scripts/license_checks_script.sh - checkstyle: - runs-on: ubuntu-latest - needs: [build] - steps: - - uses: actions/checkout@v3 - - name: setup java 8 - uses: actions/setup-java@v3 - with: - distribution: 'zulu' - java-version: '8' - cache: 'maven' - - run: ${MVN} checkstyle:checkstyle --fail-at-end + - name: license checks for hadoop3 + if: ${{ matrix.java == 'jdk8' }} + env: + HADOOP_PROFILE: -Phadoop3 + run: ./.github/scripts/license_checks_script.sh - enforcer_checks: - runs-on: ubuntu-latest - needs: [build] - steps: - - uses: actions/checkout@v3 - - name: setup java 8 - uses: actions/setup-java@v3 - with: - distribution: 'zulu' - java-version: '8' - cache: 'maven' - - run: ${MVN} enforcer:enforce --fail-at-end + - name: analyze dependencies + if: ${{ matrix.java == 'jdk8' }} + run: | + ./.github/scripts/analyze_dependencies_script.sh - forbidden_api_checks: - runs-on: ubuntu-latest - needs: [build] - steps: - - uses: actions/checkout@v3 - - name: setup java 8 - uses: actions/setup-java@v3 - with: - distribution: 'zulu' - java-version: '8' - cache: 'maven' - - run: ${MVN} forbiddenapis:check forbiddenapis:testCheck --fail-at-end + - name: analyze dependencies for hadoop3 + if: ${{ matrix.java == 'jdk8' }} + env: + HADOOP_PROFILE: -Phadoop3 + run: | + ./.github/scripts/analyze_dependencies_script.sh - pmd_checks: - runs-on: ubuntu-latest - needs: [build] - steps: - - uses: actions/checkout@v3 - - name: setup java 8 - uses: actions/setup-java@v3 - with: - distribution: 'zulu' - java-version: '8' - cache: 'maven' - - run: ${MVN} pmd:check --fail-at-end # TODO: consider adding pmd:cpd-check + - name: animal sniffer checks + if: ${{ matrix.java == 'jdk8' }} + run: ${MVN} animal-sniffer:check --fail-at-end - spotbugs_checks: - runs-on: ubuntu-latest - needs: [build] - steps: - - uses: actions/checkout@v3 - - name: setup java 8 - uses: actions/setup-java@v3 - with: - distribution: 'zulu' - java-version: '8' - cache: 'maven' - - run: ${MVN} spotbugs:check --fail-at-end -pl '!benchmarks' + - name: checkstyle + if: ${{ matrix.java == 'jdk8' }} + run: ${MVN} checkstyle:checkstyle --fail-at-end - license_checks: - runs-on: ubuntu-latest - needs: [build] - strategy: - matrix: - HADOOP_PROFILE: ['', '-Phadoop3'] - steps: - - uses: actions/checkout@v3 - - name: setup java 8 - uses: actions/setup-java@v3 - with: - distribution: 'zulu' - java-version: '8' - cache: 'maven' - - run: | - ./.github/scripts/setup_generate_license.sh - ${MVN} apache-rat:check -Prat --fail-at-end \ - -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn \ - -Drat.consoleOutput=true ${{ matrix.HADOOP_PROFILE }} - # Generate dependency reports and checks they are valid. - mkdir -p target - distribution/bin/generate-license-dependency-reports.py . target --clean-maven-artifact-transfer --parallel 2 - distribution/bin/check-licenses.py licenses.yaml target/license-reports - - script_checks: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - # who watches the watchers? - - run: ./check_test_suite_test.py + - name: enforcer checks + if: ${{ matrix.java == 'jdk8' }} + run: ${MVN} enforcer:enforce --fail-at-end - analyze_dependencies: - runs-on: ubuntu-latest - needs: [build] - strategy: - matrix: - HADOOP_PROFILE: [ '', '-Phadoop3' ] - steps: - - uses: actions/checkout@v3 - - name: setup java 8 - uses: actions/setup-java@v3 - with: - distribution: 'zulu' - java-version: '8' - cache: 'maven' - - run: |- - ${MVN} ${MAVEN_SKIP} dependency:analyze -DoutputXML=true -DignoreNonCompile=true -DfailOnWarning=true ${{ matrix.HADOOP_PROFILE }} || - ./.github/scripts/display_dependency_analysis_error_message.sh + - name: forbidden api checks + if: ${{ matrix.java == 'jdk8' }} + run: ${MVN} forbiddenapis:check forbiddenapis:testCheck --fail-at-end - openjdk11_strict_compilation: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: setup java 11 - uses: actions/setup-java@v3 + - name: pmd checks + if: ${{ matrix.java == 'jdk8' }} + run: ${MVN} pmd:check --fail-at-end # TODO: consider adding pmd:cpd-check + + - name: spotbugs checks + if: ${{ matrix.java == 'jdk8' }} + run: ${MVN} spotbugs:check --fail-at-end -pl '!benchmarks' + + - name: intellij inspections + if: ${{ matrix.java == 'jdk8' }} + run: | + docker run --rm \ + -v $(pwd):/project \ + -v ~/.m2:/home/inspect/.m2 \ + ccaominh/intellij-inspect:1.0.0 \ + /project/pom.xml \ + /project/.idea/inspectionProfiles/Druid.xml \ + --levels ERROR \ + --scope JavaInspectionsScope + + - name: setup node + if: ${{ matrix.java == 'jdk8' }} + uses: actions/setup-node@v3 with: - distribution: 'zulu' - java-version: '11' - cache: 'maven' + node-version: 16.17.0 - # errorprone requires JDK 11 - # Strict compilation requires more than 2 GB - - run: | - ${MVN} clean -DstrictCompile compile test-compile --fail-at-end ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} + - name: docs + if: ${{ matrix.java == 'jdk8' }} + run: | + (cd website && npm install) + cd website + npm run link-lint + npm run spellcheck + + - name: web console + if: ${{ matrix.java == 'jdk8' }} + run: | + ${MVN} test -pl 'web-console' + cd web-console + { for i in 1 2 3; do npm run codecov && break || sleep 15; done } + + - name: web console end-to-end test + if: ${{ matrix.java == 'jdk8' }} + run: | + ./.github/scripts/setup_generate_license.sh + sudo apt-get update && sudo apt-get install python3 -y + curl https://bootstrap.pypa.io/pip/3.5/get-pip.py | sudo -H python3 + pip3 install wheel # install wheel first explicitly + pip3 install --upgrade pyyaml + web-console/script/druid build + web-console/script/druid start + (cd web-console && npm run test-e2e) + web-console/script/druid stop diff --git a/.travis.yml b/.travis.yml index 750d9e7ce3b6..0c7a73290b1d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -70,120 +70,6 @@ stages: jobs: include: - - name: "animal sniffer checks" - stage: Tests - phase 1 - script: ${MVN} animal-sniffer:check --fail-at-end - - - name: "checkstyle" - script: ${MVN} checkstyle:checkstyle --fail-at-end - - - name: "enforcer checks" - script: ${MVN} enforcer:enforce --fail-at-end - - - name: "forbidden api checks" - script: ${MVN} forbiddenapis:check forbiddenapis:testCheck --fail-at-end - - - name: "pmd checks" - script: ${MVN} pmd:check --fail-at-end # TODO: consider adding pmd:cpd-check - - - name: "spotbugs checks" - script: ${MVN} spotbugs:check --fail-at-end -pl '!benchmarks' - - - &license_checks - name: "license checks" - before_script: &setup_generate_license - - sudo apt-get update && sudo apt-get install python3 -y - - curl https://bootstrap.pypa.io/pip/3.5/get-pip.py | sudo -H python3 - - ./check_test_suite.py && travis_terminate 0 || echo 'Continuing setup' - - pip3 install wheel # install wheel first explicitly - - pip3 install --upgrade pyyaml - script: - - > - ${MVN} apache-rat:check -Prat --fail-at-end - -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn - -Drat.consoleOutput=true ${HADOOP_PROFILE} - # Generate dependency reports and checks they are valid. When running on Travis CI, 2 cores are available - # (https://docs.travis-ci.com/user/reference/overview/#virtualisation-environment-vs-operating-system). - - mkdir -p target - - distribution/bin/generate-license-dependency-reports.py . target --clean-maven-artifact-transfer --parallel 2 - - distribution/bin/check-licenses.py licenses.yaml target/license-reports - - - <<: *license_checks - name: "license checks with Hadoop3" - env: - - HADOOP_PROFILE='-Phadoop3' - - - name: "script checks" - install: skip - # who watches the watchers? - script: ./check_test_suite_test.py - - - name: "(openjdk11) strict compilation" - install: skip - # errorprone requires JDK 11 - jdk: openjdk11 - # Strict compilation requires more than 2 GB - script: > - ./check_test_suite.py && travis_terminate 0 || MAVEN_OPTS='-Xmx3000m' ${MVN} clean -DstrictCompile compile test-compile --fail-at-end - ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} - - - &analyze_dependencies - name: "analyze dependencies" - script: |- - MAVEN_OPTS='-Xmx3000m' ${MVN} ${MAVEN_SKIP} dependency:analyze -DoutputXML=true -DignoreNonCompile=true -DfailOnWarning=true ${HADOOP_PROFILE} || { echo " - - The dependency analysis has found a dependency that is either: - - 1) Used and undeclared: These are available as a transitive dependency but should be explicitly - added to the POM to ensure the dependency version. The XML to add the dependencies to the POM is - shown above. - - 2) Unused and declared: These are not needed and removing them from the POM will speed up the build - and reduce the artifact size. The dependencies to remove are shown above. - - If there are false positive dependency analysis warnings, they can be suppressed: - https://maven.apache.org/plugins/maven-dependency-plugin/analyze-mojo.html#usedDependencies - https://maven.apache.org/plugins/maven-dependency-plugin/examples/exclude-dependencies-from-dependency-analysis.html - - For more information, refer to: - https://maven.apache.org/plugins/maven-dependency-plugin/analyze-mojo.html - - " && false; } - - - <<: *analyze_dependencies - name: "analyze hadoop 3 dependencies" - env: - - HADOOP_PROFILE='-Phadoop3' - - - name: "intellij inspections" - script: > - ./check_test_suite.py && travis_terminate 0 || docker run --rm - -v $(pwd):/project - -v ~/.m2:/home/inspect/.m2 - ccaominh/intellij-inspect:1.0.0 - /project/pom.xml - /project/.idea/inspectionProfiles/Druid.xml - --levels ERROR - --scope JavaInspectionsScope - - - &package - name: "(openjdk8) packaging check" - install: skip - before_script: *setup_generate_license - script: > - MAVEN_OPTS='-Xmx3000m' ${MVN} clean install -Prat -Pdist -Pbundle-contrib-exts --fail-at-end - -pl '!benchmarks' ${MAVEN_SKIP} ${MAVEN_SKIP_TESTS} -Dweb.console.skip=false -T1C - - - <<: *package - name: "(openjdk11) packaging check" - stage: Tests - phase 2 - jdk: openjdk11 - - - <<: *package - name: "(openjdk17) packaging check" - stage: Tests - phase 2 - jdk: openjdk17 - - &test_processing_module name: "(openjdk8) processing module test" stage: Tests - phase 1 @@ -366,36 +252,6 @@ jobs: stage: Tests - phase 2 jdk: openjdk17 - - name: "web console" - install: skip - stage: Tests - phase 1 - script: - - ./check_test_suite.py && travis_terminate 0 || ${MVN} test -pl 'web-console' - after_success: - - (cd web-console && travis_retry npm run codecov) # retry in case of network error - - - name: "web console end-to-end test" - stage: Tests - phase 1 - before_install: *setup_generate_license - install: web-console/script/druid build - before_script: - - ./check_test_suite.py && travis_terminate 0 || echo 'Starting nvm install...' - - nvm install 16.17.0 - - web-console/script/druid start - script: (cd web-console && npm run test-e2e) - after_script: web-console/script/druid stop - - - name: "docs" - stage: Tests - phase 1 - before_script: - - ./check_test_suite.py && travis_terminate 0 || echo 'Starting nvm install...' - - nvm install 16.17.0 - - (cd website && npm install) - script: - - cd website - - npm run link-lint - - npm run spellcheck - - name: "Build and test on ARM64 CPU architecture (1)" stage: Tests - phase 2 arch: arm64-graviton2 diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 842fe65f86ce..39590175727e 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -27,7 +27,7 @@ org.apache.druid druid - 25.0.0-SNAPSHOT + 26.0.0-SNAPSHOT diff --git a/cloud/aws-common/pom.xml b/cloud/aws-common/pom.xml index 73c0288dfad3..ebfe95bcfe84 100644 --- a/cloud/aws-common/pom.xml +++ b/cloud/aws-common/pom.xml @@ -28,7 +28,7 @@ org.apache.druid druid - 25.0.0-SNAPSHOT + 26.0.0-SNAPSHOT ../../pom.xml diff --git a/cloud/gcp-common/pom.xml b/cloud/gcp-common/pom.xml index d755b7cf844e..2bfa4ec33165 100644 --- a/cloud/gcp-common/pom.xml +++ b/cloud/gcp-common/pom.xml @@ -28,7 +28,7 @@ org.apache.druid druid - 25.0.0-SNAPSHOT + 26.0.0-SNAPSHOT ../../pom.xml diff --git a/core/pom.xml b/core/pom.xml index 2f8b9dc0b1d2..6727aa4ca0e3 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -29,7 +29,7 @@ druid org.apache.druid - 25.0.0-SNAPSHOT + 26.0.0-SNAPSHOT diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java index 40a2521b1186..310c66904619 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSourceConfig.java @@ -76,7 +76,7 @@ public int hashCode() public String toString() { return "HttpInputSourceConfig{" + - ", allowedProtocols=" + allowedProtocols + + "allowedProtocols=" + allowedProtocols + '}'; } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index 688f8b1140bf..e811de7cc308 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -228,4 +228,14 @@ public int hashCode() { return Objects.hash(baseDir, filter, files); } + + @Override + public String toString() + { + return "LocalInputSource{" + + "baseDir=\"" + baseDir + + "\", filter=" + filter + + ", files=" + files + + "}"; + } } diff --git a/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java b/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java index d2a3fd0d52fb..a22ef051457b 100644 --- a/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java +++ b/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java @@ -36,6 +36,7 @@ import javax.validation.Validator; import java.util.Properties; +@LazySingleton public class DruidSecondaryModule implements Module { private final Properties properties; diff --git a/core/src/main/java/org/apache/druid/java/util/common/Intervals.java b/core/src/main/java/org/apache/druid/java/util/common/Intervals.java index b7a1f37cf1c3..96f858fd4be2 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/Intervals.java +++ b/core/src/main/java/org/apache/druid/java/util/common/Intervals.java @@ -20,10 +20,13 @@ package org.apache.druid.java.util.common; import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; +import javax.annotation.Nullable; + public final class Intervals { public static final Interval ETERNITY = utc(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT); @@ -68,6 +71,32 @@ public static boolean isEternity(final Interval interval) return ETERNITY.equals(interval); } + /** + * Finds an interval from the given set of sortedIntervals which overlaps with + * the searchInterval. If multiple candidate intervals overlap with the + * searchInterval, the "smallest" interval based on the + * {@link Comparators#intervalsByStartThenEnd()} is returned. + * + * @param searchInterval Interval which should overlap with the result + * @param sortedIntervals Candidate overlapping intervals, sorted in ascending + * order, using {@link Comparators#intervalsByStartThenEnd()}. + * @return The first overlapping interval, if one exists, otherwise null. + */ + @Nullable + public static Interval findOverlappingInterval(Interval searchInterval, Interval[] sortedIntervals) + { + for (Interval interval : sortedIntervals) { + if (interval.overlaps(searchInterval)) { + return interval; + } else if (interval.getStart().isAfter(searchInterval.getEnd())) { + // Intervals after this cannot have an overlap + return null; + } + } + + return null; + } + private Intervals() { } diff --git a/core/src/main/java/org/apache/druid/java/util/common/Pair.java b/core/src/main/java/org/apache/druid/java/util/common/Pair.java index 4b2acad6f1b9..69720192b8f0 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/Pair.java +++ b/core/src/main/java/org/apache/druid/java/util/common/Pair.java @@ -26,7 +26,6 @@ */ public class Pair { - public static Pair of(@Nullable T1 lhs, @Nullable T2 rhs) { return new Pair<>(lhs, rhs); @@ -56,7 +55,7 @@ public boolean equals(Object o) if (!(o instanceof Pair)) { return false; } - Pair pair = (Pair) o; + Pair pair = (Pair) o; return Objects.equals(lhs, pair.lhs) && Objects.equals(rhs, pair.rhs); } diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index 41078961a0c8..26d6f0c80dc4 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -20,9 +20,13 @@ package org.apache.druid.java.util.common; import com.google.common.base.Strings; +import org.apache.commons.io.IOUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; @@ -797,4 +801,18 @@ public static String fastLooseChop(@Nullable final String s, final int maxBytes) return s.substring(0, maxBytes); } } + + public static String getResource(Object ref, String resource) + { + try { + InputStream is = ref.getClass().getResourceAsStream(resource); + if (is == null) { + throw new ISE("Resource not found: [%s]", resource); + } + return IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "Cannot load resource: [%s]", resource); + } + } } diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 7bb80aae8988..da7904240680 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -76,6 +76,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase private final BinaryOperator combineFn; private final int queueSize; private final boolean hasTimeout; + private final long startTimeNanos; private final long timeoutAtNanos; private final int yieldAfter; private final int batchSize; @@ -105,12 +106,13 @@ public ParallelMergeCombiningSequence( this.orderingFn = orderingFn; this.combineFn = combineFn; this.hasTimeout = hasTimeout; - this.timeoutAtNanos = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS); + this.startTimeNanos = System.nanoTime(); + this.timeoutAtNanos = startTimeNanos + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS); this.parallelism = parallelism; this.yieldAfter = yieldAfter; this.batchSize = batchSize; this.targetTimeNanos = TimeUnit.NANOSECONDS.convert(targetTimeMillis, TimeUnit.MILLISECONDS); - this.queueSize = 4 * (yieldAfter / batchSize); + this.queueSize = (1 << 15) / batchSize; // each queue can by default hold ~32k rows this.metricsReporter = reporter; this.cancellationGizmo = new CancellationGizmo(); } @@ -121,8 +123,9 @@ public Yielder toYielder(OutType initValue, YieldingAccumulat if (inputSequences.isEmpty()) { return Sequences.empty().toYielder(initValue, accumulator); } - - final BlockingQueue> outputQueue = new ArrayBlockingQueue<>(queueSize); + // we make final output queue larger than the merging queues so if downstream readers are slower to read there is + // less chance of blocking the merge + final BlockingQueue> outputQueue = new ArrayBlockingQueue<>(4 * queueSize); final MergeCombineMetricsAccumulator metricsAccumulator = new MergeCombineMetricsAccumulator(inputSequences.size()); MergeCombinePartitioningAction mergeCombineAction = new MergeCombinePartitioningAction<>( inputSequences, @@ -147,6 +150,7 @@ public Yielder toYielder(OutType initValue, YieldingAccumulat cancellationGizmo ).withBaggage(() -> { if (metricsReporter != null) { + metricsAccumulator.setTotalWallTime(System.nanoTime() - startTimeNanos); metricsReporter.accept(metricsAccumulator.build()); } }); @@ -698,6 +702,8 @@ private static class PrepareMergeCombineInputsAction extends RecursiveAction private final MergeCombineActionMetricsAccumulator metricsAccumulator; private final CancellationGizmo cancellationGizmo; + private final long startTime; + private PrepareMergeCombineInputsAction( List> partition, QueuePusher> outputQueue, @@ -719,6 +725,7 @@ private PrepareMergeCombineInputsAction( this.targetTimeNanos = targetTimeNanos; this.metricsAccumulator = metricsAccumulator; this.cancellationGizmo = cancellationGizmo; + this.startTime = System.nanoTime(); } @SuppressWarnings("unchecked") @@ -736,7 +743,6 @@ protected void compute() cursor.close(); } } - if (cursors.size() > 0) { getPool().execute(new MergeCombineAction( cursors, @@ -753,6 +759,7 @@ protected void compute() } else { outputQueue.offer(ResultBatch.TERMINAL); } + metricsAccumulator.setPartitionInitializedTime(System.nanoTime() - startTime); } catch (Throwable t) { closeAllCursors(partition); @@ -1195,6 +1202,9 @@ public static class MergeCombineMetrics private final long outputRows; private final long taskCount; private final long totalCpuTime; + private final long totalWallTime; + private final long fastestPartitionInitializedTime; + private final long slowestPartitionInitializedTime; MergeCombineMetrics( int parallelism, @@ -1202,7 +1212,10 @@ public static class MergeCombineMetrics long inputRows, long outputRows, long taskCount, - long totalCpuTime + long totalCpuTime, + long totalWallTime, + long fastestPartitionInitializedTime, + long slowestPartitionInitializedTime ) { this.parallelism = parallelism; @@ -1211,6 +1224,9 @@ public static class MergeCombineMetrics this.outputRows = outputRows; this.taskCount = taskCount; this.totalCpuTime = totalCpuTime; + this.totalWallTime = totalWallTime; + this.fastestPartitionInitializedTime = fastestPartitionInitializedTime; + this.slowestPartitionInitializedTime = slowestPartitionInitializedTime; } /** @@ -1263,6 +1279,21 @@ public long getTotalCpuTime() { return totalCpuTime; } + + public long getTotalTime() + { + return totalWallTime; + } + + public long getFastestPartitionInitializedTime() + { + return fastestPartitionInitializedTime; + } + + public long getSlowestPartitionInitializedTime() + { + return slowestPartitionInitializedTime; + } } /** @@ -1274,6 +1305,9 @@ static class MergeCombineMetricsAccumulator { List partitionMetrics; MergeCombineActionMetricsAccumulator mergeMetrics; + + private long totalWallTime; + private final int inputSequences; MergeCombineMetricsAccumulator(int inputSequences) @@ -1291,6 +1325,11 @@ void setPartitions(List partitionMetrics) this.partitionMetrics = partitionMetrics; } + void setTotalWallTime(long time) + { + this.totalWallTime = time; + } + MergeCombineMetrics build() { long numInputRows = 0; @@ -1299,11 +1338,20 @@ MergeCombineMetrics build() // partition long totalPoolTasks = 1 + 1 + partitionMetrics.size(); + long fastestPartInitialized = partitionMetrics.size() > 0 ? Long.MAX_VALUE : mergeMetrics.getPartitionInitializedtime(); + long slowestPartInitialied = partitionMetrics.size() > 0 ? Long.MIN_VALUE : mergeMetrics.getPartitionInitializedtime(); + // accumulate input row count, cpu time, and total number of tasks from each partition for (MergeCombineActionMetricsAccumulator partition : partitionMetrics) { numInputRows += partition.getInputRows(); cpuTimeNanos += partition.getTotalCpuTimeNanos(); totalPoolTasks += partition.getTaskCount(); + if (partition.getPartitionInitializedtime() < fastestPartInitialized) { + fastestPartInitialized = partition.getPartitionInitializedtime(); + } + if (partition.getPartitionInitializedtime() > slowestPartInitialied) { + slowestPartInitialied = partition.getPartitionInitializedtime(); + } } // if serial merge done, only mergeMetrics is populated, get input rows from there instead. otherwise, ignore the // value as it is only the number of intermediary input rows to the layer 2 task @@ -1322,7 +1370,10 @@ MergeCombineMetrics build() numInputRows, numOutputRows, totalPoolTasks, - cpuTimeNanos + cpuTimeNanos, + totalWallTime, + fastestPartInitialized, + slowestPartInitialied ); } } @@ -1337,6 +1388,8 @@ static class MergeCombineActionMetricsAccumulator private long outputRows = 0; private long totalCpuTimeNanos = 0; + private long partitionInitializedtime = 0L; + void incrementTaskCount() { taskCount++; @@ -1357,6 +1410,11 @@ void incrementCpuTimeNanos(long nanos) totalCpuTimeNanos += nanos; } + void setPartitionInitializedTime(long nanos) + { + partitionInitializedtime = nanos; + } + long getTaskCount() { return taskCount; @@ -1376,6 +1434,11 @@ long getTotalCpuTimeNanos() { return totalCpuTimeNanos; } + + long getPartitionInitializedtime() + { + return partitionInitializedtime; + } } private static void closeAllCursors(final Collection> cursors) diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java index 4e955a4d50b8..98f087333a55 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java +++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java @@ -29,6 +29,7 @@ import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -250,15 +251,23 @@ default Function makeJsonTreeExtractor(List nodes) */ default Map toMap(T obj) { - return (Map) toPlainJavaType(obj); + final Object mapOrNull = toPlainJavaType(obj); + if (mapOrNull == null) { + return Collections.emptyMap(); + } + return (Map) mapOrNull; } /** * Recursively traverse "json" object using a {@link JsonProvider}, converting to Java {@link Map} and {@link List}, * potentially transforming via {@link #finalizeConversionForMap} as we go */ + @Nullable default Object toPlainJavaType(Object o) { + if (o == null) { + return null; + } final JsonProvider jsonProvider = getJsonProvider(); if (jsonProvider.isMap(o)) { Map actualMap = new HashMap<>(); @@ -287,7 +296,7 @@ default Object toPlainJavaType(Object o) return finalizeConversionForMap(actualList); } // unknown, just pass it through - return o; + return finalizeConversionForMap(o); } /** diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/core/SwitchingEmitter.java b/core/src/main/java/org/apache/druid/java/util/emitter/core/SwitchingEmitter.java new file mode 100644 index 000000000000..1dedd548294e --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/emitter/core/SwitchingEmitter.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * An emitter than that offers the ability to direct an event to multiple emitters based on the event's feed. + */ +public class SwitchingEmitter implements Emitter +{ + + private static final Logger log = new Logger(SwitchingEmitter.class); + + private final Emitter[] defaultEmitters; + + private final Map> feedToEmitters; + private final Set knownEmitters; + + /** + * Constructor for the SwitchingEmitter + * + * @param feedToEmitters Map of feed to a list of emitters that correspond to each feed, + * @param defaultEmitter A list of emitters to use if there isn't a match of feed to an emitter + */ + public SwitchingEmitter(Map> feedToEmitters, Emitter[] defaultEmitter) + { + this.feedToEmitters = feedToEmitters; + this.defaultEmitters = defaultEmitter; + ImmutableSet.Builder emittersSetBuilder = new ImmutableSet.Builder<>(); + emittersSetBuilder.addAll(Arrays.stream(defaultEmitter).iterator()); + for (List emitterList : feedToEmitters.values()) { + for (Emitter emitter : emitterList) { + emittersSetBuilder.add(emitter); + } + } + this.knownEmitters = emittersSetBuilder.build(); + } + + /** + * Start the emitter. This will start all the emitters the SwitchingEmitter uses. + */ + @Override + @LifecycleStart + public void start() + { + log.info("Starting Switching Emitter."); + + for (Emitter e : knownEmitters) { + log.info("Starting emitter %s.", e.getClass().getName()); + e.start(); + } + } + + /** + * Emit an event. This method must not throw exceptions or block. The emitters that this uses must also not throw + * exceptions or block. + *

+ * This emitter will direct events based on feed to a list of emitters specified. If there is no match the event will + * use a list of default emitters instead. + *

+ * Emitters that this emitter uses that receive too many events and internal queues fill up, should drop events rather + * than blocking or consuming excessive memory. + *

+ * If an emitter that this emitter uses receives input it considers to be invalid, or has an internal problem, it + * should deal with that by logging a warning rather than throwing an exception. Emitters that log warnings + * should consider throttling warnings to avoid excessive logs, since a busy Druid cluster can emit a high volume of + * events. + * + * @param event The event that will be emitted. + */ + @Override + public void emit(Event event) + { + // linear search is likely faster than hashed lookup + for (Map.Entry> feedToEmitters : feedToEmitters.entrySet()) { + if (feedToEmitters.getKey().equals(event.getFeed())) { + for (Emitter emitter : feedToEmitters.getValue()) { + emitter.emit(event); + } + return; + } + } + for (Emitter emitter : defaultEmitters) { + emitter.emit(event); + } + } + + /** + * Triggers this emitter to tell all emitters that this uses to flush. + * @throws IOException + */ + @Override + public void flush() throws IOException + { + boolean fail = false; + log.info("Flushing Switching Emitter."); + + for (Emitter e : knownEmitters) { + try { + log.info("Flushing emitter %s.", e.getClass().getName()); + e.flush(); + } + catch (IOException ex) { + log.error(ex, "Failed to flush emitter [%s]", e.getClass().getName()); + fail = true; + } + } + + if (fail) { + throw new IOException("failed to flush one or more emitters"); + } + } + + /** + * Closes all emitters that the SwitchingEmitter uses + * @throws IOException + */ + @Override + @LifecycleStop + public void close() throws IOException + { + boolean fail = false; + log.info("Closing Switching Emitter."); + + for (Emitter e : knownEmitters) { + try { + log.info("Closing emitter %s.", e.getClass().getName()); + e.close(); + } + catch (IOException ex) { + log.error(ex, "Failed to close emitter [%s]", e.getClass().getName()); + fail = true; + } + } + + if (fail) { + throw new IOException("failed to close one or more emitters"); + } + } +} diff --git a/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java b/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java index 54a15b1dbdf3..20507c597b0f 100644 --- a/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java +++ b/core/src/main/java/org/apache/druid/segment/column/TypeStrategies.java @@ -236,8 +236,6 @@ public static void checkMaxSize(int available, int maxSizeBytes, TypeSignature { - private static final Comparator COMPARATOR = Longs::compare; - @Override public int estimateSizeBytes(Long value) { @@ -276,9 +274,9 @@ public int write(ByteBuffer buffer, Long value, int maxSizeBytes) } @Override - public int compare(Long o1, Long o2) + public int compare(Object o1, Object o2) { - return COMPARATOR.compare(o1, o2); + return Longs.compare(((Number) o1).longValue(), ((Number) o2).longValue()); } } @@ -289,8 +287,6 @@ public int compare(Long o1, Long o2) */ public static final class FloatTypeStrategy implements TypeStrategy { - private static final Comparator COMPARATOR = Floats::compare; - @Override public int estimateSizeBytes(Float value) { @@ -329,9 +325,9 @@ public int write(ByteBuffer buffer, Float value, int maxSizeBytes) } @Override - public int compare(Float o1, Float o2) + public int compare(Object o1, Object o2) { - return COMPARATOR.compare(o1, o2); + return Floats.compare(((Number) o1).floatValue(), ((Number) o2).floatValue()); } } @@ -342,7 +338,6 @@ public int compare(Float o1, Float o2) */ public static final class DoubleTypeStrategy implements TypeStrategy { - private static final Comparator COMPARATOR = Double::compare; @Override public int estimateSizeBytes(Double value) @@ -382,9 +377,9 @@ public int write(ByteBuffer buffer, Double value, int maxSizeBytes) } @Override - public int compare(Double o1, Double o2) + public int compare(Object o1, Object o2) { - return COMPARATOR.compare(o1, o2); + return Double.compare(((Number) o1).doubleValue(), ((Number) o2).doubleValue()); } } @@ -437,7 +432,7 @@ public int write(ByteBuffer buffer, String value, int maxSizeBytes) } @Override - public int compare(String s, String s2) + public int compare(Object s, Object s2) { // copy of lexicographical string comparator in druid processing // Avoid comparisons for equal references @@ -447,7 +442,7 @@ public int compare(String s, String s2) return 0; } - return ORDERING.compare(s, s2); + return ORDERING.compare((String) s, (String) s2); } } @@ -521,8 +516,11 @@ public int write(ByteBuffer buffer, Object[] value, int maxSizeBytes) } @Override - public int compare(@Nullable Object[] o1, @Nullable Object[] o2) + public int compare(@Nullable Object o1Obj, @Nullable Object o2Obj) { + Object[] o1 = (Object[]) o1Obj; + Object[] o2 = (Object[]) o2Obj; + //noinspection ArrayEquality if (o1 == o2) { return 0; diff --git a/core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java b/core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java index 8a97882d54df..e4856f889714 100644 --- a/core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java +++ b/core/src/main/java/org/apache/druid/segment/column/TypeStrategy.java @@ -55,8 +55,15 @@ * Implementations of this interface should be thread safe, but may not use {@link ByteBuffer} in a thread safe manner, * potentially modifying positions and limits, either temporarily or permanently depending on which set of methods is * called. + * + * This interface extends {@code Comparator} instead of {@code Comparator} because trying to specialize the + * type of the comparison method can run into issues for comparators of objects that can sometimes be of a different + * java class type. For example, {@code Comparator} cannot accept Integer objects in its comparison method + * and there is no easy way for this interface definition to allow {@code TypeStrategy} to actually be a + * {@code Comparator}. So, we fall back to effectively erasing the generic type and having them all be + * {@code Comparator}. */ -public interface TypeStrategy extends Comparator +public interface TypeStrategy extends Comparator { /** * Estimate the size in bytes that writing this value to memory would require. This method is not required to be diff --git a/core/src/main/java/org/apache/druid/utils/RuntimeInfo.java b/core/src/main/java/org/apache/druid/utils/RuntimeInfo.java index c8dadfd44d19..86571eff9456 100644 --- a/core/src/main/java/org/apache/druid/utils/RuntimeInfo.java +++ b/core/src/main/java/org/apache/druid/utils/RuntimeInfo.java @@ -19,10 +19,12 @@ package org.apache.druid.utils; +import org.apache.druid.guice.LazySingleton; import org.apache.druid.java.util.common.UOE; import java.lang.reflect.InvocationTargetException; +@LazySingleton public class RuntimeInfo { public int getAvailableProcessors() diff --git a/core/src/test/java/org/apache/druid/java/util/common/IntervalsTest.java b/core/src/test/java/org/apache/druid/java/util/common/IntervalsTest.java new file mode 100644 index 000000000000..59eac8d5a991 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/common/IntervalsTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import org.apache.druid.java.util.common.guava.Comparators; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class IntervalsTest +{ + + @Test + public void testFindOverlappingInterval() + { + final Interval[] sortedIntervals = new Interval[]{ + Intervals.of("2019/2020"), + Intervals.of("2021/2022"), + Intervals.of("2021-04-01/2021-05-01"), + Intervals.of("2022/2023") + }; + Arrays.sort(sortedIntervals, Comparators.intervalsByStartThenEnd()); + + // Search interval outside the bounds of the sorted intervals + Assert.assertNull( + Intervals.findOverlappingInterval(Intervals.of("2018/2019"), sortedIntervals) + ); + Assert.assertNull( + Intervals.findOverlappingInterval(Intervals.of("2023/2024"), sortedIntervals) + ); + + // Search interval within bounds, overlap exists + // Fully overlapping interval + Assert.assertEquals( + Intervals.of("2021/2022"), + Intervals.findOverlappingInterval(Intervals.of("2021/2022"), sortedIntervals) + ); + + // Partially overlapping interval + Assert.assertEquals( + Intervals.of("2022/2023"), + Intervals.findOverlappingInterval(Intervals.of("2022-01-01/2022-01-02"), sortedIntervals) + ); + + Assert.assertEquals( + Intervals.of("2021/2022"), + Intervals.findOverlappingInterval(Intervals.of("2021-06-01/2021-07-01"), sortedIntervals) + ); + + // Overlap with multiple intervals, "smallest" one is returned + Assert.assertEquals( + Intervals.of("2021/2022"), + Intervals.findOverlappingInterval(Intervals.of("2021-03-01/2021-04-01"), sortedIntervals) + ); + + // Search interval within bounds, no overlap + Assert.assertNull( + Intervals.findOverlappingInterval(Intervals.of("2020-01-02/2020-03-03"), sortedIntervals) + ); + } + +} diff --git a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java index 2b610690db0a..e0b0fbcc510c 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.junit.Assert; import org.junit.Test; @@ -32,12 +33,14 @@ public class ObjectFlattenersTest { private static final String SOME_JSON = "{\"foo\": null, \"bar\": 1}"; + + private static final ObjectFlatteners.FlattenerMaker FLATTENER_MAKER = new JSONFlattenerMaker(true); private static final ObjectFlattener FLATTENER = ObjectFlatteners.create( new JSONPathSpec( true, ImmutableList.of(new JSONPathFieldSpec(JSONPathFieldType.PATH, "extract", "$.bar")) ), - new JSONFlattenerMaker(true) + FLATTENER_MAKER ); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -62,4 +65,13 @@ public void testToMap() throws JsonProcessingException Assert.assertNull(flat.get("foo")); Assert.assertEquals(1, flat.get("bar")); } + + @Test + public void testToMapNull() throws JsonProcessingException + { + JsonNode node = OBJECT_MAPPER.readTree("null"); + Map flat = FLATTENER.toMap(node); + Assert.assertNull(FLATTENER_MAKER.toPlainJavaType(node)); + Assert.assertEquals(ImmutableMap.of(), flat); + } } diff --git a/core/src/test/java/org/apache/druid/java/util/emitter/core/SwitchingEmitterTest.java b/core/src/test/java/org/apache/druid/java/util/emitter/core/SwitchingEmitterTest.java new file mode 100644 index 000000000000..5183b47947af --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/emitter/core/SwitchingEmitterTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.emitter.core; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class SwitchingEmitterTest +{ + + private static final String FEED_1 = "feed1"; + private static final String FEED_2 = "feed2"; + private static final String FEED_3 = "feed3"; + private SwitchingEmitter switchingEmitter; + + private Map> emitters; + private List defaultEmitters; + + private Emitter feed1Emitter1; + private Emitter feed1Emitter2; + private Emitter feed2Emitter1; + private Emitter feed1AndFeed3Emitter; + + private Set allEmitters; + + @Before + public void setup() + { + this.defaultEmitters = ImmutableList.of( + EasyMock.createMock(Emitter.class), + EasyMock.createMock(Emitter.class) + ); + this.feed1Emitter1 = EasyMock.createMock(Emitter.class); + this.feed1Emitter2 = EasyMock.createMock(Emitter.class); + this.feed2Emitter1 = EasyMock.createMock(Emitter.class); + this.feed1AndFeed3Emitter = EasyMock.createMock(Emitter.class); + this.emitters = ImmutableMap.of(FEED_1, ImmutableList.of(feed1Emitter1, feed1Emitter2, feed1AndFeed3Emitter), + FEED_2, ImmutableList.of(feed2Emitter1), + FEED_3, ImmutableList.of(feed1AndFeed3Emitter)); + + allEmitters = new HashSet<>(); + allEmitters.addAll(defaultEmitters); + for (List feedEmitters : emitters.values()) { + allEmitters.addAll(feedEmitters); + } + this.switchingEmitter = new SwitchingEmitter(emitters, defaultEmitters.toArray(new Emitter[0])); + } + + @Test + public void testStart() + { + for (Emitter emitter : allEmitters) { + emitter.start(); + EasyMock.replay(emitter); + } + + switchingEmitter.start(); + } + + @Test + public void testEmit() + { + // test emitting events to all 3 feeds and default emitter + Event feed1Event = EasyMock.createMock(Event.class); + Event feed2Event = EasyMock.createMock(Event.class); + Event feed3Event = EasyMock.createMock(Event.class); + Event eventWithNoMatchingFeed = EasyMock.createMock(Event.class); + + EasyMock.expect(feed1Event.getFeed()).andReturn(FEED_1).anyTimes(); + EasyMock.expect(feed2Event.getFeed()).andReturn(FEED_2).anyTimes(); + EasyMock.expect(feed3Event.getFeed()).andReturn(FEED_3).anyTimes(); + EasyMock.expect(eventWithNoMatchingFeed.getFeed()).andReturn("no-real-feed").anyTimes(); + EasyMock.replay(feed1Event, feed2Event, feed3Event, eventWithNoMatchingFeed); + + for (Emitter emitter : defaultEmitters) { + emitter.emit(eventWithNoMatchingFeed); + } + for (Emitter emitter : emitters.get("feed1")) { + emitter.emit(feed1Event); + } + for (Emitter emitter : emitters.get("feed2")) { + emitter.emit(feed2Event); + } + for (Emitter emitter : emitters.get("feed3")) { + emitter.emit(feed3Event); + } + for (Emitter emitter : allEmitters) { + EasyMock.replay(emitter); + } + + switchingEmitter.emit(feed1Event); + switchingEmitter.emit(feed2Event); + switchingEmitter.emit(feed3Event); + switchingEmitter.emit(eventWithNoMatchingFeed); + } + + @Test + public void testFlush() throws IOException + { + for (Emitter emitter : allEmitters) { + emitter.flush(); + EasyMock.replay(emitter); + } + + switchingEmitter.flush(); + } + + @Test + public void testClose() throws IOException + { + for (Emitter emitter : allEmitters) { + emitter.close(); + EasyMock.replay(emitter); + } + + switchingEmitter.close(); + } + + @After + public void tearDown() + { + for (Emitter emitter : allEmitters) { + EasyMock.verify(emitter); + } + } +} diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java b/core/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java new file mode 100644 index 000000000000..c0f2eaeaf15f --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/metrics/MetricsVerifier.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import org.junit.Assert; + +import java.util.List; +import java.util.Map; + +/** + * Test utility to extract and verify metric values. + */ +public interface MetricsVerifier +{ + /** + * Verifies that no event has been emitted for the given metric. + */ + default void verifyNotEmitted(String metricName) + { + verifyEmitted(metricName, 0); + } + + /** + * Verifies that the metric was emitted the expected number of times. + */ + default void verifyEmitted(String metricName, int times) + { + verifyEmitted(metricName, null, times); + } + + /** + * Verifies that the metric was emitted for the given dimension filters the + * expected number of times. + */ + default void verifyEmitted(String metricName, Map dimensionFilters, int times) + { + Assert.assertEquals( + "Metric was emitted unexpected number of times.", + times, + getMetricValues(metricName, dimensionFilters).size() + ); + } + + /** + * Verifies the value of the specified metric emitted in the previous run. + */ + default void verifyValue(String metricName, Number expectedValue) + { + verifyValue(metricName, null, expectedValue); + } + + /** + * Verifies the value of the event corresponding to the specified metric and + * dimensionFilters emitted in the previous run. + */ + default void verifyValue(String metricName, Map dimensionFilters, Number expectedValue) + { + Assert.assertEquals(expectedValue, getValue(metricName, dimensionFilters)); + } + + /** + * Gets the value of the event corresponding to the specified metric and + * dimensionFilters. + */ + default Number getValue(String metricName, Map dimensionFilters) + { + List values = getMetricValues(metricName, dimensionFilters); + Assert.assertEquals( + "Metric must have been emitted exactly once for the given dimensions.", + 1, + values.size() + ); + return values.get(0); + } + + /** + * Gets the metric values for the specified dimension filters. + */ + List getMetricValues(String metricName, Map dimensionFilters); + +} diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java index 653dc8a08aae..395245815792 100644 --- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java +++ b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java @@ -24,12 +24,15 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; -public class StubServiceEmitter extends ServiceEmitter +public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifier { private final List events = new ArrayList<>(); - private final List metricEvents = new ArrayList<>(); + private final Map> metricEvents = new HashMap<>(); public StubServiceEmitter(String service, String host) { @@ -40,7 +43,9 @@ public StubServiceEmitter(String service, String host) public void emit(Event event) { if (event instanceof ServiceMetricEvent) { - metricEvents.add((ServiceMetricEvent) event); + ServiceMetricEvent metricEvent = (ServiceMetricEvent) event; + metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>()) + .add(metricEvent); } events.add(event); } @@ -53,12 +58,29 @@ public List getEvents() return events; } - /** - * Gets all the metric events emitted since the previous {@link #flush()}. - */ - public List getMetricEvents() + @Override + public List getMetricValues( + String metricName, + Map dimensionFilters + ) { - return metricEvents; + final List values = new ArrayList<>(); + final List events = + metricEvents.getOrDefault(metricName, Collections.emptyList()); + final Map filters = + dimensionFilters == null ? Collections.emptyMap() : dimensionFilters; + for (ServiceMetricEvent event : events) { + final Map userDims = event.getUserDims(); + boolean match = filters.keySet().stream() + .map(d -> filters.get(d).equals(userDims.get(d))) + .reduce((a, b) -> a && b) + .orElse(true); + if (match) { + values.add(event.getValue()); + } + } + + return values; } @Override diff --git a/core/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java b/core/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java index fa6d86d21f50..66f4adcdd02d 100644 --- a/core/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java +++ b/core/src/test/java/org/apache/druid/segment/column/TypeStrategiesTest.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.column; +import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Pair; @@ -103,7 +104,7 @@ public int write(ByteBuffer buffer, String value, int maxSizeBytes) } @Override - public int compare(String o1, String o2) + public int compare(Object o1, Object o2) { return 0; } @@ -639,10 +640,13 @@ public int compareTo(NullableLongPair o) public static class NullableLongPairTypeStrategy implements TypeStrategy { + + private Ordering ordering = Comparators.naturalNullsFirst(); + @Override - public int compare(NullableLongPair o1, NullableLongPair o2) + public int compare(Object o1, Object o2) { - return Comparators.naturalNullsFirst().compare(o1, o2); + return ordering.compare((NullableLongPair) o1, (NullableLongPair) o2); } @Override diff --git a/distribution/docker/docker-compose.yml b/distribution/docker/docker-compose.yml index 58b7a47a4b4a..ea9adcea0d70 100644 --- a/distribution/docker/docker-compose.yml +++ b/distribution/docker/docker-compose.yml @@ -49,7 +49,7 @@ services: - ZOO_MY_ID=1 coordinator: - image: apache/druid:0.24.0 + image: apache/druid:24.0.1 container_name: coordinator volumes: - druid_shared:/opt/shared diff --git a/distribution/pom.xml b/distribution/pom.xml index e2b7773b09d4..7b92e3436b45 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -30,7 +30,7 @@ druid org.apache.druid - 25.0.0-SNAPSHOT + 26.0.0-SNAPSHOT diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 6fb3201d98f4..a14b3beb56c7 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -418,6 +418,7 @@ There are several emitters available: - [`parametrized`](#parametrized-http-emitter-module) operates like the `http` emitter but fine-tunes the recipient URL based on the event feed. - [`composing`](#composing-emitter-module) initializes multiple emitter modules. - [`graphite`](#graphite-emitter) emits metrics to a [Graphite](https://graphiteapp.org/) Carbon service. +- [`switching`](#switching-emitter) initializes and emits to multiple emitter modules based on the event feed. ##### Logging Emitter Module @@ -483,6 +484,14 @@ Instead use `recipientBaseUrlPattern` described in the table below. To use graphite as emitter set `druid.emitter=graphite`. For configuration details, see [Graphite emitter](../development/extensions-contrib/graphite.md) for the Graphite emitter Druid extension. +##### Switching Emitter + +To use switching as emitter set `druid.emitter=switching`. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.emitter.switching.emitters`|JSON map of feed to list of emitter modules that will be used for the mapped feed, e.g., {"metrics":["http"], "alerts":["logging"]}|{}| +|`druid.emitter.switching.defaultEmitters`|JSON list of emitter modules to load that will be used if there is no emitter specifically designated for that event's feed, e.g., ["logging","http"].|[]| ### Metadata storage @@ -939,7 +948,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| -|`useBatchedSegmentSampler`|Boolean flag for whether or not we should use the Reservoir Sampling with a reservoir of size k instead of fixed size 1 to pick segments to move. This option can be enabled to speed up segment balancing process, especially if there are huge number of segments in the cluster or if there are too many segments to move.|false| +|`useBatchedSegmentSampler`|Deprecated. Boolean flag for whether or not we should use the Reservoir Sampling with a reservoir of size k instead of fixed size 1 to pick segments to move. This option can be enabled to speed up the sampling of segments to be balanced, especially if there is a large number of segments in the cluster or if there are too many segments to move.|true| |`percentOfSegmentsToConsiderPerMove`|Deprecated. This will eventually be phased out by the batched segment sampler. You can enable the batched segment sampler now by setting the dynamic Coordinator config, `useBatchedSegmentSampler`, to `true`. Note that if you choose to enable the batched segment sampler, `percentOfSegmentsToConsiderPerMove` will no longer have any effect on balancing. If `useBatchedSegmentSampler == false`, this config defines the percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| @@ -948,6 +957,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none| |`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none| |`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 100. |100| +|`useRoundRobinSegmentAssignment`|Boolean flag for whether segments should be assigned to historicals in a round robin fashion. When disabled, segment assignment is done using the chosen balancer strategy. When enabled, this can speed up segment assignments leaving balancing to move the segments to their optimal locations (based on the balancer strategy) lazily. |false| |`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none| |`decommissioningMaxPercentOfMaxSegmentsToMove`| Upper limit of segments the Coordinator can move from decommissioning servers to active non-decommissioning servers during a single run. This value is relative to the total maximum number of segments that can be moved at any given time based upon the value of `maxSegmentsToMove`.

If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, the Coordinator does not move segments to decommissioning servers, effectively putting them in a type of "maintenance" mode. In this case, decommissioning servers do not participate in balancing or assignment by load rules. The Coordinator still considers segments on decommissioning servers as candidates to replicate on active servers.

Decommissioning can stall if there are no available active servers to move the segments to. You can use the maximum percent of decommissioning segment movements to prioritize balancing or to decrease commissioning time to prevent active servers from being overloaded. The value must be between 0 and 100.|70| |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false| @@ -1102,6 +1112,8 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro |`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. "local" is mainly for internal testing while "metadata" is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|local| |`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|PT24H| |`druid.indexer.tasklock.forceTimeChunkLock`|_**Setting this to false is still experimental**_
If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context). See [Task Locking & Priority](../ingestion/tasks.md#context) for more details about locking in tasks.|true| +|`druid.indexer.tasklock.batchSegmentAllocation`| If set to true, Druid performs segment allocate actions in batches to improve throughput and reduce the average `task/action/run/time`. See [batching `segmentAllocate` actions](../ingestion/tasks.md#batching-segmentallocate-actions) for details.|false| +|`druid.indexer.tasklock.batchAllocationWaitTime`|Number of milliseconds after Druid adds the first segment allocate action to a batch, until it executes the batch. Allows the batch to add more requests and improve the average segment allocation run time. This configuration takes effect only if `batchSegmentAllocation` is enabled.|500| |`druid.indexer.task.default.context`|Default task context that is applied to all tasks submitted to the Overlord. Any default in this config does not override neither the context values the user provides nor `druid.indexer.tasklock.forceTimeChunkLock`.|empty context| |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE| |`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M| @@ -1375,7 +1387,7 @@ For GCE's properties, please refer to the [gce-extensions](../development/extens This section contains the configuration options for the processes that reside on Data servers (MiddleManagers/Peons and Historicals) in the suggested [three-server configuration](../design/processes.md#server-types). -Configuration options for the experimental [Indexer process](../design/indexer.md) are also provided here. +Configuration options for the [Indexer process](../design/indexer.md) are also provided here. ### MiddleManager and Peons @@ -2188,7 +2200,7 @@ Supported query contexts: |Key|Description|Default| |---|-----------|-------| |`druid.expressions.useStrictBooleans`|Controls the behavior of Druid boolean operators and functions, if set to `true` all boolean values will be either a `1` or `0`. See [expression documentation](../misc/math-expr.md#logical-operator-modes)|false| -|`druid.expressions.allowNestedArrays`|If enabled, Druid array expressions can create nested arrays. This is experimental and should be used with caution.|false| +|`druid.expressions.allowNestedArrays`|If enabled, Druid array expressions can create nested arrays.|false| ### Router #### Router Process Configs diff --git a/docs/development/experimental-features.md b/docs/development/experimental-features.md new file mode 100644 index 000000000000..30d8c2f77c2b --- /dev/null +++ b/docs/development/experimental-features.md @@ -0,0 +1,60 @@ +--- +id: experimental-features +title: "Experimental features" +--- + + + +The following features are marked [experimental](./experimental.md) in the Druid docs. + +This document includes each page that mentions an experimental feature. To graduate a feature, remove all mentions of its experimental status on all relevant pages. + +Note that this document does not track the status of contrib extensions, all of which are considered experimental. + +## SQL-based ingestion + +- [SQL-based ingestion](../multi-stage-query/index.md) +- [SQL-based ingestion concepts](../multi-stage-query/concepts.md) +- [SQL-based ingestion and multi-stage query task API](../multi-stage-query/api.md) + +## Indexer process + +- [Indexer process](../design/indexer.md) +- [Processes and servers](../design/processes.md#indexer-process-optional) + +## Kubernetes + +- [Kubernetes](../development/extensions-core/kubernetes.md) + +## Segment locking + +- [Configuration reference](../configuration/index.md#overlord-operations) +- [Task reference](../ingestion/tasks.md#locking) +- [Design](../design/architecture.md#availability-and-consistency) + +## Front coding + +- [Ingestion spec reference](../ingestion/ingestion-spec.md#front-coding) + +## Other configuration properties + +- [Configuration reference](../configuration/index.md) + - `CLOSED_SEGMENTS_SINKS` mode + - Expression processing configuration `druid.expressions.allowNestedArrays` diff --git a/docs/development/extensions-core/datasketches-tuple.md b/docs/development/extensions-core/datasketches-tuple.md index fc4f74d5c81d..c9a05b5ab197 100644 --- a/docs/development/extensions-core/datasketches-tuple.md +++ b/docs/development/extensions-core/datasketches-tuple.md @@ -39,19 +39,52 @@ druid.extensions.loadList=["druid-datasketches"] "name" : , "fieldName" : , "nominalEntries": , - "numberOfValues" : , - "metricColumns" : + "metricColumns" : , + "numberOfValues" : } ``` |property|description|required?| |--------|-----------|---------| |type|This String should always be "arrayOfDoublesSketch"|yes| -|name|A String for the output (result) name of the calculation.|yes| +|name|String representing the output column to store sketch values.|yes| |fieldName|A String for the name of the input field.|yes| |nominalEntries|Parameter that determines the accuracy and size of the sketch. Higher k means higher accuracy but more space to store sketches. Must be a power of 2. See the [Theta sketch accuracy](https://datasketches.apache.org/docs/Theta/ThetaErrorTable) for details. |no, defaults to 16384| -|numberOfValues|Number of values associated with each distinct key. |no, defaults to 1| -|metricColumns|If building sketches from raw data, an array of names of the input columns containing numeric values to be associated with each distinct key.|no, defaults to empty array| +|metricColumns|When building sketches from raw data, an array input column that contain numeric values to associate with each distinct key. If not provided, assumes `fieldName` is an `arrayOfDoublesSketch`|no, if not provided `fieldName` is assumed to be an arrayOfDoublesSketch| +|numberOfValues|Number of values associated with each distinct key. |no, defaults to the length of `metricColumns` if provided and 1 otherwise| + +You can use the `arrayOfDoublesSketch` aggregator to: + +- Build a sketch from raw data. In this case, set `metricColumns` to an array. +- Build a sketch from an existing `ArrayOfDoubles` sketch . In this case, leave `metricColumns` unset and set the `fieldName` to an `ArrayOfDoubles` sketch with `numberOfValues` doubles. At ingestion time, you must base64 encode `ArrayOfDoubles` sketches at ingestion time. + +#### Example on top of raw data + +Compute a theta of unique users. For each user store the `added` and `deleted` scores. The new sketch column will be called `users_theta`. + +```json +{ + "type": "arrayOfDoublesSketch", + "name": "users_theta", + "fieldName": "user", + "nominalEntries": 16384, + "metricColumns": ["added", "deleted"], +} +``` + +#### Example ingesting a precomputed sketch column + +Ingest a sketch column called `user_sketches` that has a base64 encoded value of two doubles in its array and store it in a column called `users_theta`. + +```json +{ + "type": "arrayOfDoublesSketch", + "name": "users_theta", + "fieldName": "user_sketches", + "nominalEntries": 16384, + "numberOfValues": 2, +} +``` ### Post Aggregators diff --git a/docs/development/extensions-core/druid-basic-security.md b/docs/development/extensions-core/druid-basic-security.md index b6698b4f153d..4e042b4405f3 100644 --- a/docs/development/extensions-core/druid-basic-security.md +++ b/docs/development/extensions-core/druid-basic-security.md @@ -53,12 +53,29 @@ To set the value for the configuration properties, add them to the common runtim ### General properties -|Property|Description|Default|required| -|--------|-----------|-------|--------| -|`druid.auth.basic.common.pollingPeriod`|Defines in milliseconds how often processes should poll the Coordinator for the current Druid metadata store authenticator/authorizer state.|60000|No| -|`druid.auth.basic.common.maxRandomDelay`|Defines in milliseconds the amount of random delay to add to the pollingPeriod, to spread polling requests across time.|6000|No| -|`druid.auth.basic.common.maxSyncRetries`|Determines how many times a service will retry if the authentication/authorization Druid metadata store state sync with the Coordinator fails.|10|No| -|`druid.auth.basic.common.cacheDirectory`|If defined, snapshots of the basic Authenticator and Authorizer Druid metadata store caches will be stored on disk in this directory. If this property is defined, when a service is starting, it will attempt to initialize its caches from these on-disk snapshots, if the service is unable to initialize its state by communicating with the Coordinator.|null|No| +**`druid.auth.basic.common.pollingPeriod`** + +Defines in milliseconds how often processes should poll the Coordinator for the current Druid metadata store authenticator/authorizer state.
+         **Required**: No
+         **Default**: 60000 + +**`druid.auth.basic.common.maxRandomDelay`** + +Defines in milliseconds the amount of random delay to add to the pollingPeriod, to spread polling requests across time.
+         **Required**: No
+         **Default**: 6000 + +**`druid.auth.basic.common.maxSyncRetries`** + +Determines how many times a service will retry if the authentication/authorization Druid metadata store state sync with the Coordinator fails.
+         **Required**: No
+         **Default**: 10 + +**`druid.auth.basic.common.cacheDirectory`** + +If defined, snapshots of the basic Authenticator and Authorizer Druid metadata store caches will be stored on disk in this directory. If this property is defined, when a service is starting, it will attempt to initialize its caches from these on-disk snapshots, if the service is unable to initialize its state by communicating with the Coordinator.
+         **Required**: No
+         **Default**: null ### Authenticator @@ -96,16 +113,55 @@ The remaining examples of authenticator configuration use either `MyBasicMetadat #### Properties for Druid metadata store user authentication -|Property|Description|Default|required| -|--------|-----------|-------|--------| -|`druid.auth.authenticator.MyBasicMetadataAuthenticator.initialAdminPassword`|Initial [Password Provider](../../operations/password-provider.md) for the automatically created default admin user. If no password is specified, the default admin user will not be created. If the default admin user already exists, setting this property will not affect its password.|null|No| -|`druid.auth.authenticator.MyBasicMetadataAuthenticator.initialInternalClientPassword`|Initial [Password Provider](../../operations/password-provider.md) for the default internal system user, used for internal process communication. If no password is specified, the default internal system user will not be created. If the default internal system user already exists, setting this property will not affect its password.|null|No| -|`druid.auth.authenticator.MyBasicMetadataAuthenticator.enableCacheNotifications`|If true, the Coordinator will notify Druid processes whenever a configuration change to this Authenticator occurs, allowing them to immediately update their state without waiting for polling.|true|No| -|`druid.auth.authenticator.MyBasicMetadataAuthenticator.cacheNotificationTimeout`|The timeout in milliseconds for the cache notifications.|5000|No| -|`druid.auth.authenticator.MyBasicMetadataAuthenticator.credentialIterations`|Number of iterations to use for password hashing. See [Credential iterations and API performance](#credential-iterations-and-api-performance)|10000|No| -|`druid.auth.authenticator.MyBasicMetadataAuthenticator.credentialsValidator.type`|The type of credentials store (metadata) to validate requests credentials.|metadata|No| -|`druid.auth.authenticator.MyBasicMetadataAuthenticator.skipOnFailure`|If true and the request credential doesn't exists or isn't fully configured in the credentials store, the request will proceed to next Authenticator in the chain.|false|No| -|`druid.auth.authenticator.MyBasicMetadataAuthenticator.authorizerName`|Authorizer that requests should be directed to|N/A|Yes| + +**`druid.auth.authenticator.MyBasicMetadataAuthenticator.initialAdminPassword`** + +Initial [Password Provider](../../operations/password-provider.md) for the automatically created default admin user. If no password is specified, the default admin user will not be created. If the default admin user already exists, setting this property will not affect its password.
+         **Required**: No
+         **Default**: null + +**`druid.auth.authenticator.MyBasicMetadataAuthenticator.initialInternalClientPassword`** + +Initial [Password Provider](../../operations/password-provider.md) for the default internal system user, used for internal process communication. If no password is specified, the default internal system user will not be created. If the default internal system user already exists, setting this property will not affect its password.
+         **Required**: No
+         **Default**: null + +**`druid.auth.authenticator.MyBasicMetadataAuthenticator.enableCacheNotifications`** + +If true, the Coordinator will notify Druid processes whenever a configuration change to this Authenticator occurs, allowing them to immediately update their state without waiting for polling.
+         **Required**: No
+         **Default**: True + +**`druid.auth.authenticator.MyBasicMetadataAuthenticator.cacheNotificationTimeout`** + +The timeout in milliseconds for the cache notifications.
+         **Required**: No
+         **Default**: 5000 + +**`druid.auth.authenticator.MyBasicMetadataAuthenticator.credentialIterations`** + +Number of iterations to use for password hashing. See [Credential iterations and API performance](#credential-iterations-and-api-performance)
+         **Required**: No
+         **Default**: 10000 + +**`druid.auth.authenticator.MyBasicMetadataAuthenticator.credentialsValidator.type`** + +The type of credentials store (metadata) to validate requests credentials.
+         **Required**: No
+         **Default**: metadata + +**`druid.auth.authenticator.MyBasicMetadataAuthenticator.skipOnFailure`** + +If true and the request credential doesn't exists or isn't fully configured in the credentials store, the request will proceed to next Authenticator in the chain.
+         **Required**: No
+         **Default**: false + +**`druid.auth.authenticator.MyBasicMetadataAuthenticator.authorizerName`** + +Authorizer that requests should be directed to.
+         **Required**: Yes
+         **Default**: N/A + ##### Credential iterations and API performance @@ -121,25 +177,107 @@ If Druid uses the default credentials validator (i.e., `credentialsValidator.typ #### Properties for LDAP user authentication -|Property|Description|Default|required| -|--------|-----------|-------|--------| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.initialAdminPassword`|Initial [Password Provider](../../operations/password-provider.md) for the automatically created default admin user. If no password is specified, the default admin user will not be created. If the default admin user already exists, setting this property will not affect its password.|null|No| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.initialInternalClientPassword`|Initial [Password Provider](../../operations/password-provider.md) for the default internal system user, used for internal process communication. If no password is specified, the default internal system user will not be created. If the default internal system user already exists, setting this property will not affect its password.|null|No| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.enableCacheNotifications`|If true, the Coordinator will notify Druid processes whenever a configuration change to this Authenticator occurs, allowing them to immediately update their state without waiting for polling.|true|No| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.cacheNotificationTimeout`|The timeout in milliseconds for the cache notifications.|5000|No| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialIterations`|Number of iterations to use for password hashing.|10000|No| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.type`|The type of credentials store (ldap) to validate requests credentials.|metadata|No| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.url`|URL of the LDAP server.|null|Yes| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.bindUser`|LDAP bind user username.|null|Yes| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.bindPassword`|[Password Provider](../../operations/password-provider.md) LDAP bind user password.|null|Yes| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.baseDn`|The point from where the LDAP server will search for users.|null|Yes| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.userSearch`|The filter/expression to use for the search. For example, (&(sAMAccountName=%s)(objectClass=user))|null|Yes| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.userAttribute`|The attribute id identifying the attribute that will be returned as part of the search. For example, sAMAccountName. |null|Yes| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.credentialVerifyDuration`|The duration in seconds for how long valid credentials are verifiable within the cache when not requested.|600|No| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.credentialMaxDuration`|The max duration in seconds for valid credentials that can reside in cache regardless of how often they are requested.|3600|No| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.credentialCacheSize`|The valid credentials cache size. The cache uses a LRU policy.|100|No| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.skipOnFailure`|If true and the request credential doesn't exists or isn't fully configured in the credentials store, the request will proceed to next Authenticator in the chain.|false|No| -|`druid.auth.authenticator.MyBasicLDAPAuthenticator.authorizerName`|Authorizer that requests should be directed to.|N/A|Yes| +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.initialAdminPassword`** + +Initial [Password Provider](../../operations/password-provider.md) for the automatically created default admin user. If no password is specified, the default admin user will not be created. If the default admin user already exists, setting this property will not affect its password.
+         **Required**: No
+         **Default**: null + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.initialInternalClientPassword`** + +Initial [Password Provider](../../operations/password-provider.md) for the default internal system user, used for internal process communication. If no password is specified, the default internal system user will not be created. If the default internal system user already exists, setting this property will not affect its password.
+         **Required**: No
+         **Default**: null + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.enableCacheNotifications`** + +If true, the Coordinator will notify Druid processes whenever a configuration change to this Authenticator occurs, allowing them to immediately update their state without waiting for polling.
+         **Required**: No
+         **Default**: true + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.cacheNotificationTimeout`** + +The timeout in milliseconds for the cache notifications.
+         **Required**: No
+         **Default**: 5000 + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialIterations`** + +Number of iterations to use for password hashing.
+         **Required**: No
+         **Default**: 10000 + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.type`** + +The type of credentials store (ldap) to validate requests credentials.
+         **Required**: No
+         **Default**: metadata + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.url`** + +URL of the LDAP server.
+         **Required**: Yes
+         **Default**: null + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.bindUser`** + +LDAP bind user username.
+         **Required**: Yes
+         **Default**: null + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.bindPassword`** + +[Password Provider](../../operations/password-provider.md) LDAP bind user password.
+         **Required**: Yes
+         **Default**: null + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.baseDn`** + +The point from where the LDAP server will search for users.
+         **Required**: Yes
+         **Default**: null + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.userSearch`** + +The filter/expression to use for the search. For example, (&(sAMAccountName=%s)(objectClass=user))
+         **Required**: Yes
+         **Default**: null + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.userAttribute`** + +The attribute id identifying the attribute that will be returned as part of the search. For example, sAMAccountName.
+         **Required**: Yes
+         **Default**: null + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.credentialVerifyDuration`** + +The duration in seconds for how long valid credentials are verifiable within the cache when not requested.
+         **Required**: No
+         **Default**: 600 + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.credentialMaxDuration`** + +The max duration in seconds for valid credentials that can reside in cache regardless of how often they are requested.
+         **Required**: No
+         **Default**: 3600 + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.credentialsValidator.credentialCacheSize`** + +The valid credentials cache size. The cache uses a LRU policy.
+         **Required**: No
+         **Default**: 100 + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.skipOnFailure`** + +If true and the request credential doesn't exists or isn't fully configured in the credentials store, the request will proceed to next Authenticator in the chain.
+         **Required**: No
+         **Default**: false + +**`druid.auth.authenticator.MyBasicLDAPAuthenticator.authorizerName`** + +Authorizer that requests should be directed to.
+         **Required**: Yes
+         **Default**: N/A ### Escalator @@ -155,11 +293,24 @@ druid.escalator.authorizerName=MyBasicMetadataAuthorizer ``` #### Properties -|Property|Description|Default|required| -|--------|-----------|-------|--------| -|`druid.escalator.internalClientUsername`|The escalator will use this username for requests made as the internal system user.|n/a|Yes| -|`druid.escalator.internalClientPassword`|The escalator will use this [Password Provider](../../operations/password-provider.md) for requests made as the internal system user.|n/a|Yes| -|`druid.escalator.authorizerName`|Authorizer that requests should be directed to.|n/a|Yes| + +**`druid.escalator.internalClientUsername`** + +The escalator will use this username for requests made as the internal system user.
+         **Required**: Yes
+         **Default**: N/A + +**`druid.escalator.internalClientPassword`** + +The escalator will use this [Password Provider](../../operations/password-provider.md) for requests made as the internal system user.
+         **Required**: Yes
+         **Default**: N/A + +**`druid.escalator.authorizerName`** + +Authorizer that requests should be directed to.
+         **Required**: Yes
+         **Default**: N/A ### Authorizer @@ -182,24 +333,131 @@ druid.auth.authorizer.MyBasicMetadataAuthorizer.type=basic The examples in the rest of this article use `MyBasicMetadataAuthorizer` or `MyBasicLDAPAuthorizer` as the authorizer name. #### Properties for Druid metadata store user authorization -|Property|Description|Default|required| -|--------|-----------|-------|--------| -|`druid.auth.authorizer.MyBasicMetadataAuthorizer.enableCacheNotifications`|If true, the Coordinator will notify Druid processes whenever a configuration change to this Authorizer occurs, allowing them to immediately update their state without waiting for polling.|true|No| -|`druid.auth.authorizer.MyBasicMetadataAuthorizer.cacheNotificationTimeout`|The timeout in milliseconds for the cache notifications.|5000|No| -|`druid.auth.authorizer.MyBasicMetadataAuthorizer.initialAdminUser`|The initial admin user with role defined in initialAdminRole property if specified, otherwise the default admin role will be assigned.|admin|No| -|`druid.auth.authorizer.MyBasicMetadataAuthorizer.initialAdminRole`|The initial admin role to create if it doesn't already exists.|admin|No| -|`druid.auth.authorizer.MyBasicMetadataAuthorizer.roleProvider.type`|The type of role provider to authorize requests credentials.|metadata|No + +**`druid.auth.authorizer.MyBasicMetadataAuthorizer.enableCacheNotifications`** + +If true, the Coordinator will notify Druid processes whenever a configuration change to this Authorizer occurs, allowing them to immediately update their state without waiting for polling.
+         **Required**: No
+         **Default**: true + +**`druid.auth.authorizer.MyBasicMetadataAuthorizer.cacheNotificationTimeout`** + +The timeout in milliseconds for the cache notifications.
+         **Required**: No
+         **Default**: 5000 + +**`druid.auth.authorizer.MyBasicMetadataAuthorizer.initialAdminUser`** + +The initial admin user with role defined in initialAdminRole property if specified, otherwise the default admin role will be assigned.
+         **Required**: No
+         **Default**: admin + +**`druid.auth.authorizer.MyBasicMetadataAuthorizer.initialAdminRole`** + +The initial admin role to create if it doesn't already exists.
+         **Required**: No
+         **Default**: admin + +**`druid.auth.authorizer.MyBasicMetadataAuthorizer.roleProvider.type`** + +The type of role provider to authorize requests credentials.
+         **Required**: No
+         **Default**: metadata #### Properties for LDAP user authorization -|Property|Description|Default|required| -|--------|-----------|-------|--------| -|`druid.auth.authorizer.MyBasicLDAPAuthorizer.enableCacheNotifications`|If true, the Coordinator will notify Druid processes whenever a configuration change to this Authorizer occurs, allowing them to immediately update their state without waiting for polling.|true|No| -|`druid.auth.authorizer.MyBasicLDAPAuthorizer.cacheNotificationTimeout`|The timeout in milliseconds for the cache notifications.|5000|No| -|`druid.auth.authorizer.MyBasicLDAPAuthorizer.initialAdminUser`|The initial admin user with role defined in initialAdminRole property if specified, otherwise the default admin role will be assigned.|admin|No| -|`druid.auth.authorizer.MyBasicLDAPAuthorizer.initialAdminRole`|The initial admin role to create if it doesn't already exists.|admin|No| -|`druid.auth.authorizer.MyBasicLDAPAuthorizer.initialAdminGroupMapping`|The initial admin group mapping with role defined in initialAdminRole property if specified, otherwise the default admin role will be assigned. The name of this initial admin group mapping will be set to adminGroupMapping|null|No| -|`druid.auth.authorizer.MyBasicLDAPAuthorizer.roleProvider.type`|The type of role provider (ldap) to authorize requests credentials.|metadata|No -|`druid.auth.authorizer.MyBasicLDAPAuthorizer.roleProvider.groupFilters`|Array of LDAP group filters used to filter out the allowed set of groups returned from LDAP search. Filters can be begin with *, or end with ,* to provide configurational flexibility to limit or filter allowed set of groups available to LDAP Authorizer.|null|No| + +**`druid.auth.authorizer.MyBasicLDAPAuthorizer.enableCacheNotifications`** + +If true, the Coordinator will notify Druid processes whenever a configuration change to this Authorizer occurs, allowing them to immediately update their state without waiting for polling.
+         **Required**: No
+         **Default**: true + +**`druid.auth.authorizer.MyBasicLDAPAuthorizer.cacheNotificationTimeout`** + +The timeout in milliseconds for the cache notifications.
+         **Required**: No
+         **Default**: 5000 + +**`druid.auth.authorizer.MyBasicLDAPAuthorizer.initialAdminUser`** + +The initial admin user with role defined in initialAdminRole property if specified, otherwise the default admin role will be assigned.
+         **Required**: No
+         **Default**: admin + +**`druid.auth.authorizer.MyBasicLDAPAuthorizer.initialAdminRole`** + +The initial admin role to create if it doesn't already exists.
+         **Required**: No
+         **Default**: admin + +**`druid.auth.authorizer.MyBasicLDAPAuthorizer.initialAdminGroupMapping`** + +The initial admin group mapping with role defined in initialAdminRole property if specified, otherwise the default admin role will be assigned. The name of this initial admin group mapping will be set to adminGroupMapping
+         **Required**: No
+         **Default**: null + +**`druid.auth.authorizer.MyBasicLDAPAuthorizer.roleProvider.type`** + +The type of role provider (ldap) to authorize requests credentials.
+         **Required**: No
+         **Default**: metadata + +**`druid.auth.authorizer.MyBasicLDAPAuthorizer.roleProvider.groupFilters`** + +Array of LDAP group filters used to filter out the allowed set of groups returned from LDAP search. Filters can be begin with *, or end with ,* to provide configurational flexibility to limit or filter allowed set of groups available to LDAP Authorizer.
+         **Required**: No
+         **Default**: null + +#### Properties for LDAPS + +Use the following properties to configure Druid authentication with LDAP over TLS (LDAPS). See [Configure LDAP authentication](../../operations/auth-ldap.md) for more information. + +**`druid.auth.basic.ssl.protocol`** + +SSL protocol to use. The TLS version is 1.2.
+         **Required**: Yes
+         **Default**: tls + +**`druid.auth.basic.ssl.trustStorePath`** + +Path to the trust store file.
+         **Required**: Yes
+         **Default**: N/A + +**`druid.auth.basic.ssl.trustStorePassword`** + +Password to access the trust store file.
+         **Required**: Yes
+         **Default**: N/A + +**`druid.auth.basic.ssl.trustStoreType`** + +Format of the trust store file. For Java the format is jks.
+         **Required**: No
+         **Default**: jks + +**`druid.auth.basic.ssl.trustStoreAlgorithm`** + +Algorithm used by the trust manager to validate certificate chains.
+         **Required**: No
+         **Default**: N/A + +**`druid.auth.basic.ssl.trustStorePassword`** + +Password details that enable access to the truststore.
+         **Required**: No
+         **Default**: N/A + +Example LDAPS configuration: + +```json +druid.auth.basic.ssl.protocol=tls +druid.auth.basic.ssl.trustStorePath=/usr/local/druid-path/certs/truststore.jks +druid.auth.basic.ssl.trustStorePassword=xxxxx +druid.auth.basic.ssl.trustStoreType=jks +druid.auth.basic.ssl.trustStoreAlgorithm=PKIX +``` +You can configure `druid.auth.basic.ssl.trustStorePassword` to be a plain text password or you can set the password as an environment variable. See [Password providers](../../operations/password-provider.md) for more information. ## Usage @@ -213,19 +471,19 @@ Root path: `/druid-ext/basic-security/authentication` Each API endpoint includes {authenticatorName}, specifying which Authenticator instance is being configured. ##### User/Credential Management -`GET(/druid-ext/basic-security/authentication/db/{authenticatorName}/users)` +`GET(/druid-ext/basic-security/authentication/db/{authenticatorName}/users)`
Return a list of all user names. -`GET(/druid-ext/basic-security/authentication/db/{authenticatorName}/users/{userName})` +`GET(/druid-ext/basic-security/authentication/db/{authenticatorName}/users/{userName})`
Return the name and credentials information of the user with name {userName} -`POST(/druid-ext/basic-security/authentication/db/{authenticatorName}/users/{userName})` +`POST(/druid-ext/basic-security/authentication/db/{authenticatorName}/users/{userName})`
Create a new user with name {userName} -`DELETE(/druid-ext/basic-security/authentication/db/{authenticatorName}/users/{userName})` +`DELETE(/druid-ext/basic-security/authentication/db/{authenticatorName}/users/{userName})`
Delete the user with name {userName} -`POST(/druid-ext/basic-security/authentication/db/{authenticatorName}/users/{userName}/credentials)` +`POST(/druid-ext/basic-security/authentication/db/{authenticatorName}/users/{userName}/credentials)`
Assign a password used for HTTP basic authentication for {userName} Content: JSON password request object @@ -238,20 +496,20 @@ Example request body: ``` ##### Cache Load Status -`GET(/druid-ext/basic-security/authentication/loadStatus)` +`GET(/druid-ext/basic-security/authentication/loadStatus)`
Return the current load status of the local caches of the authentication Druid metadata store. #### Authorization API -Root path: `/druid-ext/basic-security/authorization` +Root path: `/druid-ext/basic-security/authorization`
Each API endpoint includes {authorizerName}, specifying which Authorizer instance is being configured. ##### User Creation/Deletion -`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/users)` +`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/users)`
Return a list of all user names. -`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/users/{userName})` +`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/users/{userName})`
Return the name and role information of the user with name {userName} Example output: @@ -338,20 +596,20 @@ The `resourceNamePattern` is a compiled version of the resource name regex. It i } ``` -`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/users/{userName})` +`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/users/{userName})`
Create a new user with name {userName} -`DELETE(/druid-ext/basic-security/authorization/db/{authorizerName}/users/{userName})` +`DELETE(/druid-ext/basic-security/authorization/db/{authorizerName}/users/{userName})`
Delete the user with name {userName} ##### Group mapping Creation/Deletion -`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings)` +`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings)`
Return a list of all group mappings. -`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings/{groupMappingName})` +`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings/{groupMappingName})`
Return the group mapping and role information of the group mapping with name {groupMappingName} -`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings/{groupMappingName})` +`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings/{groupMappingName})`
Create a new group mapping with name {groupMappingName} Content: JSON group mapping object Example request body: @@ -366,14 +624,14 @@ Example request body: } ``` -`DELETE(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings/{groupMappingName})` +`DELETE(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings/{groupMappingName})`
Delete the group mapping with name {groupMappingName} #### Role Creation/Deletion -`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/roles)` +`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/roles)`
Return a list of all role names. -`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/roles/{roleName})` +`GET(/druid-ext/basic-security/authorization/db/{authorizerName}/roles/{roleName})`
Return name and permissions for the role named {roleName}. Example output: @@ -427,30 +685,30 @@ Example output: ``` -`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/roles/{roleName})` +`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/roles/{roleName})`
Create a new role with name {roleName}. Content: username string -`DELETE(/druid-ext/basic-security/authorization/db/{authorizerName}/roles/{roleName})` +`DELETE(/druid-ext/basic-security/authorization/db/{authorizerName}/roles/{roleName})`
Delete the role with name {roleName}. #### Role Assignment -`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/users/{userName}/roles/{roleName})` +`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/users/{userName}/roles/{roleName})`
Assign role {roleName} to user {userName}. -`DELETE(/druid-ext/basic-security/authorization/db/{authorizerName}/users/{userName}/roles/{roleName})` +`DELETE(/druid-ext/basic-security/authorization/db/{authorizerName}/users/{userName}/roles/{roleName})`
Unassign role {roleName} from user {userName} -`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings/{groupMappingName}/roles/{roleName})` +`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings/{groupMappingName}/roles/{roleName})`
Assign role {roleName} to group mapping {groupMappingName}. -`DELETE(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings/{groupMappingName}/roles/{roleName})` +`DELETE(/druid-ext/basic-security/authorization/db/{authorizerName}/groupMappings/{groupMappingName}/roles/{roleName})`
Unassign role {roleName} from group mapping {groupMappingName} #### Permissions -`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/roles/{roleName}/permissions)` +`POST(/druid-ext/basic-security/authorization/db/{authorizerName}/roles/{roleName}/permissions)`
Set the permissions of {roleName}. This replaces the previous set of permissions on the role. Content: List of JSON Resource-Action objects, e.g.: @@ -479,5 +737,5 @@ The "name" field for resources in the permission definitions are regexes used to Please see [Defining permissions](../../operations/security-user-auth.md#defining-permissions) for more details. ##### Cache Load Status -`GET(/druid-ext/basic-security/authorization/loadStatus)` +`GET(/druid-ext/basic-security/authorization/loadStatus)`
Return the current load status of the local caches of the authorization Druid metadata store. diff --git a/docs/development/extensions-core/druid-lookups.md b/docs/development/extensions-core/druid-lookups.md index b44f9620bd0a..5b19508c2375 100644 --- a/docs/development/extensions-core/druid-lookups.md +++ b/docs/development/extensions-core/druid-lookups.md @@ -22,9 +22,6 @@ title: "Cached Lookup Module" ~ under the License. --> - -> Please note that this is an experimental module and the development/testing still at early stage. Feel free to try it and give us your feedback. - ## Description This Apache Druid module provides a per-lookup caching mechanism for JDBC data sources. The main goal of this cache is to speed up the access to a high latency lookup sources and to provide a caching isolation for every lookup source. diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index 210207302f2d..360e75a83de4 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -56,8 +56,6 @@ This topic contains configuration reference information for the Apache Kafka sup ## Task Autoscaler Properties -> Note that Task AutoScaler is currently designated as experimental. - | Property | Description | Required | | ------------- | ------------- | ------------- | | `enableTaskAutoScaler` | Enable or disable autoscaling. `false` or blank disables the `autoScaler` even when `autoScalerConfig` is not null| no (default == false) | @@ -206,8 +204,8 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning | `handoffConditionTimeout` | Long | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. | no (default == 0) | | `resetOffsetAutomatically` | Boolean | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).

If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.md#supervisors). This mode is useful for production, since it will make you aware of issues with ingestion.

If true, Druid will automatically reset to the earlier or latest offset available in Kafka, based on the value of the `useEarliestOffset` property (earliest if true, latest if false). Note that this can lead to data being _DROPPED_ (if `useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.

This feature behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) | | `workerThreads` | Integer | The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation. | no (default == min(10, taskCount)) | -| `chatAsync` | Boolean | If true, use asynchronous communication with indexing tasks, and ignore the `chatThreads` parameter. If false, use synchronous communication in a thread pool of size `chatThreads`. | no (default == false) | -| `chatThreads` | Integer | The number of threads that will be used for communicating with indexing tasks. Ignored if `chatAsync` is `true`. | no (default == min(10, taskCount * replicas)) | +| `chatAsync` | Boolean | If true, use asynchronous communication with indexing tasks, and ignore the `chatThreads` parameter. If false, use synchronous communication in a thread pool of size `chatThreads`. | no (default == true) | +| `chatThreads` | Integer | The number of threads that will be used for communicating with indexing tasks. Ignored if `chatAsync` is `true` (the default). | no (default == min(10, taskCount * replicas)) | | `chatRetries` | Integer | The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive. | no (default == 8) | | `httpTimeout` | ISO8601 Period | How long to wait for a HTTP response from an indexing task. | no (default == PT10S) | | `shutdownTimeout` | ISO8601 Period | How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting. | no (default == PT80S) | diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index 84036665ec9e..9b14ec767c26 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -149,8 +149,6 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec: #### Task Autoscaler Properties -> Note that Task AutoScaler is currently designated as experimental. - | Property | Description | Required | | ------------- | ------------- | ------------- | | `enableTaskAutoScaler` | Enable or disable the auto scaler. When false or absent, Druid disables the `autoScaler` even when `autoScalerConfig` is not null.| no (default == false) | @@ -293,8 +291,8 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param |`resetOffsetAutomatically`|Boolean|Controls behavior when Druid needs to read Kinesis messages that are no longer available.

If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.md#supervisors). This mode is useful for production, since it will make you aware of issues with ingestion.

If true, Druid will automatically reset to the earlier or latest sequence number available in Kinesis, based on the value of the `useEarliestSequenceNumber` property (earliest if true, latest if false). Please note that this can lead to data being _DROPPED_ (if `useEarliestSequenceNumber` is false) or _DUPLICATED_ (if `useEarliestSequenceNumber` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.|no (default == false)| |`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if the current sequence number is still available in a particular Kinesis shard. If set to false, the indexing task will attempt to reset the current sequence number (or not), depending on the value of `resetOffsetAutomatically`.|no (default == false)| |`workerThreads`|Integer|The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.|no (default == min(10, taskCount))| -|`chatAsync`|Boolean| If true, use asynchronous communication with indexing tasks, and ignore the `chatThreads` parameter. If false, use synchronous communication in a thread pool of size `chatThreads`. | no (default == false) | -|`chatThreads`|Integer| The number of threads that will be used for communicating with indexing tasks. Ignored if `chatAsync` is `true`.| no (default == min(10, taskCount * replicas))| +|`chatAsync`|Boolean| If true, use asynchronous communication with indexing tasks, and ignore the `chatThreads` parameter. If false, use synchronous communication in a thread pool of size `chatThreads`. | no (default == true) | +|`chatThreads`|Integer| The number of threads that will be used for communicating with indexing tasks. Ignored if `chatAsync` is `true` (the default).| no (default == min(10, taskCount * replicas))| |`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.| no (default == 8)| |`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an indexing task.|no (default == PT10S)| |`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)| diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index eb08df0cf7a8..557060a5e663 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -606,9 +606,9 @@ For example: ### FlattenSpec -The `flattenSpec` object bridges the gap between potentially nested input data, such as Avro or ORC, and Druid's flat data model. It is an object within the `inputFormat` object. +You can use the `flattenSpec` object to flatten nested data, as an alternative to the Druid [nested columns](../querying/nested-columns.md) feature, and for nested input formats unsupported by the feature. It is an object within the `inputFormat` object. -> If you have nested JSON data, you can ingest and store JSON in an Apache Druid column as a `COMPLEX` data type. See [Nested columns](../querying/nested-columns.md) for more information. +See [Nested columns](../querying/nested-columns.md) for information on ingesting and storing nested data in an Apache Druid column as a `COMPLEX` data type. Configure your `flattenSpec` as follows: diff --git a/docs/ingestion/migrate-from-firehose-ingestion.md b/docs/ingestion/migrate-from-firehose-ingestion.md new file mode 100644 index 000000000000..c5d3f6946f21 --- /dev/null +++ b/docs/ingestion/migrate-from-firehose-ingestion.md @@ -0,0 +1,209 @@ +--- +id: migrate-from-firehose +title: "Migrate from firehose to input source ingestion" +sidebar_label: "Migrate from firehose" +--- + + + +Apache deprecated support for Druid firehoses in version 0.17. Support for firehose ingestion will be removed in version 26.0. + +If you're using a firehose for batch ingestion, we strongly recommend that you follow the instructions on this page to transition to using native batch ingestion input sources as soon as possible. + +Firehose ingestion doesn't work with newer Druid versions, so you must be using an ingestion spec with a defined input source before you upgrade. + +## Migrate from firehose ingestion to an input source + +To migrate from firehose ingestion, you can use the Druid console to update your ingestion spec, or you can update it manually. + +### Use the Druid console + +To update your ingestion spec using the Druid console, open the console and copy your spec into the **Edit spec** stage of the data loader. + +Druid converts the spec into one with a defined input source. For example, it converts the [example firehose ingestion spec](#example-firehose-ingestion-spec) below into the [example ingestion spec after migration](#example-ingestion-spec-after-migration). + +If you're unable to use the console or you have problems with the console method, the alternative is to update your ingestion spec manually. + +### Update your ingestion spec manually + +To update your ingestion spec manually, copy your existing spec into a new file. Refer to [Native batch ingestion with firehose (Deprecated)](./native-batch-firehose.md) for a description of firehose properties. + +Edit the new file as follows: + +1. In the `ioConfig` component, replace the `firehose` definition with an `inputSource` definition for your chosen input source. See [Native batch input sources](./native-batch-input-source.md) for details. +2. Move the `timeStampSpec` definition from `parser.parseSpec` to the `dataSchema` component. +3. Move the `dimensionsSpec` definition from `parser.parseSpec` to the `dataSchema` component. +4. Move the `format` definition from `parser.parseSpec` to an `inputFormat` definition in `ioConfig`. +5. Delete the `parser` definition. +6. Save the file. +
You can check the format of your new ingestion file against the [migrated example](#example-ingestion-spec-after-migration) below. +7. Test the new ingestion spec with a temporary data source. +8. Once you've successfully ingested sample data with the new spec, stop firehose ingestion and switch to the new spec. + +When the transition is complete, you can upgrade Druid to the latest version. See the [Druid release notes](https://druid.apache.org/downloads.html) for upgrade instructions. + +### Example firehose ingestion spec + +An example firehose ingestion spec is as follows: + +```json +{ + "type" : "index", + "spec" : { + "dataSchema" : { + "dataSource" : "wikipedia", + "metricsSpec" : [ + { + "type" : "count", + "name" : "count" + }, + { + "type" : "doubleSum", + "name" : "added", + "fieldName" : "added" + }, + { + "type" : "doubleSum", + "name" : "deleted", + "fieldName" : "deleted" + }, + { + "type" : "doubleSum", + "name" : "delta", + "fieldName" : "delta" + } + ], + "granularitySpec" : { + "type" : "uniform", + "segmentGranularity" : "DAY", + "queryGranularity" : "NONE", + "intervals" : [ "2013-08-31/2013-09-01" ] + }, + "parser": { + "type": "string", + "parseSpec": { + "format": "json", + "timestampSpec" : { + "column" : "timestamp", + "format" : "auto" + }, + "dimensionsSpec" : { + "dimensions": ["country", "page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","region","city"], + "dimensionExclusions" : [] + } + } + } + }, + "ioConfig" : { + "type" : "index", + "firehose" : { + "type" : "local", + "baseDir" : "examples/indexing/", + "filter" : "wikipedia_data.json" + } + }, + "tuningConfig" : { + "type" : "index", + "partitionsSpec": { + "type": "single_dim", + "partitionDimension": "country", + "targetRowsPerSegment": 5000000 + } + } + } +} +``` + +### Example ingestion spec after migration + +The following example illustrates the result of migrating the [example firehose ingestion spec](#example-firehose-ingestion-spec) to a spec with an input source: + +```json +{ + "type" : "index", + "spec" : { + "dataSchema" : { + "dataSource" : "wikipedia", + "timestampSpec" : { + "column" : "timestamp", + "format" : "auto" + }, + "dimensionsSpec" : { + "dimensions": ["country", "page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","region","city"], + "dimensionExclusions" : [] + }, + "metricsSpec" : [ + { + "type" : "count", + "name" : "count" + }, + { + "type" : "doubleSum", + "name" : "added", + "fieldName" : "added" + }, + { + "type" : "doubleSum", + "name" : "deleted", + "fieldName" : "deleted" + }, + { + "type" : "doubleSum", + "name" : "delta", + "fieldName" : "delta" + } + ], + "granularitySpec" : { + "type" : "uniform", + "segmentGranularity" : "DAY", + "queryGranularity" : "NONE", + "intervals" : [ "2013-08-31/2013-09-01" ] + } + }, + "ioConfig" : { + "type" : "index", + "inputSource" : { + "type" : "local", + "baseDir" : "examples/indexing/", + "filter" : "wikipedia_data.json" + }, + "inputFormat": { + "type": "json" + } + }, + "tuningConfig" : { + "type" : "index", + "partitionsSpec": { + "type": "single_dim", + "partitionDimension": "country", + "targetRowsPerSegment": 5000000 + } + } + } +} +``` + +## Learn more + +For more information, see the following pages: + +- [Ingestion](./index.md): Overview of the Druid ingestion process. +- [Native batch ingestion](./native-batch.md): Description of the supported native batch indexing tasks. +- [Ingestion spec reference](./ingestion-spec.md): Description of the components and properties in the ingestion spec. diff --git a/docs/ingestion/native-batch-firehose.md b/docs/ingestion/native-batch-firehose.md index 4e2cad97fc1a..ca848e725c87 100644 --- a/docs/ingestion/native-batch-firehose.md +++ b/docs/ingestion/native-batch-firehose.md @@ -1,6 +1,6 @@ --- id: native-batch-firehose -title: "Native batch ingestion with firehose" +title: "Native batch ingestion with firehose (Deprecated)" sidebar_label: "Firehose (deprecated)" --- @@ -23,14 +23,13 @@ sidebar_label: "Firehose (deprecated)" ~ under the License. --> - -Firehoses are deprecated in 0.17.0. It's highly recommended to use the [Native batch ingestion input sources](./native-batch-input-source.md) instead. +> Firehose ingestion is deprecated. See [Migrate from firehose to input source ingestion](./migrate-from-firehose-ingestion.md) for instructions on migrating from firehose ingestion to using native batch ingestion input sources. There are several firehoses readily available in Druid, some are meant for examples, others can be used directly in a production environment. ## StaticS3Firehose -> You need to include the [`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension to use the StaticS3Firehose. +You need to include the [`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension to use the StaticS3Firehose. This firehose ingests events from a predefined list of S3 objects. This firehose is _splittable_ and can be used by the [Parallel task](./native-batch.md). @@ -62,7 +61,7 @@ Note that prefetching or caching isn't that useful in the Parallel task. ## StaticGoogleBlobStoreFirehose -> You need to include the [`druid-google-extensions`](../development/extensions-core/google.md) as an extension to use the StaticGoogleBlobStoreFirehose. +You need to include the [`druid-google-extensions`](../development/extensions-core/google.md) as an extension to use the StaticGoogleBlobStoreFirehose. This firehose ingests events, similar to the StaticS3Firehose, but from an Google Cloud Store. @@ -112,7 +111,7 @@ Google Blobs: ## HDFSFirehose -> You need to include the [`druid-hdfs-storage`](../development/extensions-core/hdfs.md) as an extension to use the HDFSFirehose. +You need to include the [`druid-hdfs-storage`](../development/extensions-core/hdfs.md) as an extension to use the HDFSFirehose. This firehose ingests events from a predefined list of files from the HDFS storage. This firehose is _splittable_ and can be used by the [Parallel task](./native-batch.md). diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index 7106a9a000a3..e8d83f34046a 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -679,8 +679,7 @@ Returns the task attempt history of the worker task spec of the given id, or HTT While ingesting data using the parallel task indexing, Druid creates segments from the input data and pushes them. For segment pushing, the parallel task index supports the following segment pushing modes based upon your type of [rollup](./rollup.md): -- Bulk pushing mode: Used for perfect rollup. Druid pushes every segment at the very end of the index task. Until then, Druid stores created segments in memory and local storage of the service running the index task. This mode can cause problems if you have limited storage capacity, and is not recommended to use in production. -To enable bulk pushing mode, set `forceGuaranteedRollup` in your TuningConfig. You cannot use bulk pushing with `appendToExisting` in your IOConfig. +- Bulk pushing mode: Used for perfect rollup. Druid pushes every segment at the very end of the index task. Until then, Druid stores created segments in memory and local storage of the service running the index task. To enable bulk pushing mode, set `forceGuaranteedRollup` to `true` in your tuning config. You cannot use bulk pushing with `appendToExisting` in your IOConfig. - Incremental pushing mode: Used for best-effort rollup. Druid pushes segments are incrementally during the course of the indexing task. The index task collects data and stores created segments in the memory and disks of the services running the task until the total number of collected rows exceeds `maxTotalRows`. At that point the index task immediately pushes all segments created up until that moment, cleans up pushed segments, and continues to ingest the remaining data. ## Capacity planning diff --git a/docs/ingestion/schema-design.md b/docs/ingestion/schema-design.md index 10e6ea82cd2b..f006e792bc44 100644 --- a/docs/ingestion/schema-design.md +++ b/docs/ingestion/schema-design.md @@ -116,14 +116,13 @@ naturally emitted. It is also useful if you want to combine timeseries and non-t Similar to log aggregation systems, Druid offers inverted indexes for fast searching and filtering. Druid's search capabilities are generally less developed than these systems, and its analytical capabilities are generally more developed. The main data modeling differences between Druid and these systems are that when ingesting data into Druid, -you must be more explicit. Druid columns have types specific upfront and Druid does not, at this time, natively support -nested data. +you must be more explicit. Druid columns have types specific upfront. Tips for modeling log data in Druid: * If you don't know ahead of time what columns you'll want to ingest, use an empty dimensions list to trigger [automatic detection of dimension columns](#schema-less-dimensions). -* If you have nested data, flatten it using a [`flattenSpec`](./ingestion-spec.md#flattenspec). +* If you have nested data, you can ingest it using the [nested columns](../querying/nested-columns.md) feature or flatten it using a [`flattenSpec`](./ingestion-spec.md#flattenspec). * Consider enabling [rollup](./rollup.md) if you have mainly analytical use cases for your log data. This will mean you lose the ability to retrieve individual events from Druid, but you potentially gain substantial compression and query performance boosts. @@ -198,9 +197,9 @@ like `MILLIS_TO_TIMESTAMP`, `TIME_FLOOR`, and others. If you're using native Dru ### Nested dimensions -You can ingest and store nested JSON in a Druid column as a `COMPLEX` data type. See [Nested columns](../querying/nested-columns.md) for more information. +You can ingest and store nested data in a Druid column as a `COMPLEX` data type. See [Nested columns](../querying/nested-columns.md) for more information. -If you want to ingest nested data in a format other than JSON—for example Avro, ORC, and Parquet—you must use the `flattenSpec` object to flatten it. For example, if you have data of the following form: +If you want to ingest nested data in a format unsupported by the nested columns feature, you must use the `flattenSpec` object to flatten it. For example, if you have data of the following form: ```json { "foo": { "bar": 3 } } diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md index c8a2e915d472..5afbadb3d43a 100644 --- a/docs/ingestion/tasks.md +++ b/docs/ingestion/tasks.md @@ -343,6 +343,26 @@ You can override the task priority by setting your priority in the task context "priority" : 100 } ``` + + +## Task actions + +Task actions are overlord actions performed by tasks during their lifecycle. Some typical task actions are: +- `lockAcquire`: acquires a time-chunk lock on an interval for the task +- `lockRelease`: releases a lock acquired by the task on an interval +- `segmentTransactionalInsert`: publishes new segments created by a task and optionally overwrites and/or drops existing segments in a single transaction +- `segmentAllocate`: allocates pending segments to a task to write rows + +### Batching `segmentAllocate` actions + +In a cluster with several concurrent tasks, `segmentAllocate` actions on the overlord can take a long time to finish, causing spikes in the `task/action/run/time`. This can result in ingestion lag building up while a task waits for a segment to be allocated. +The root cause of such spikes is likely to be one or more of the following: +- several concurrent tasks trying to allocate segments for the same datasource and interval +- large number of metadata calls made to the segments and pending segments tables +- concurrency limitations while acquiring a task lock required for allocating a segment + +Since the contention typically arises from tasks allocating segments for the same datasource and interval, you can improve the run times by batching the actions together. +To enable batched segment allocation on the overlord, set `druid.indexer.tasklock.batchSegmentAllocation` to `true`. See [overlord configuration](../configuration/index.md#overlord-operations) for more details. diff --git a/docs/multi-stage-query/concepts.md b/docs/multi-stage-query/concepts.md index 44e5ea43d427..da0e774152d6 100644 --- a/docs/multi-stage-query/concepts.md +++ b/docs/multi-stage-query/concepts.md @@ -233,7 +233,8 @@ happens: The [`maxNumTasks`](./reference.md#context-parameters) query parameter determines the maximum number of tasks your query will use, including the one `query_controller` task. Generally, queries perform better with more workers. The lowest possible value of `maxNumTasks` is two (one worker and one controller). Do not set this higher than the number of -free slots available in your cluster; doing so will result in a [TaskStartTimeout](reference.md#error-codes) error. +free slots available in your cluster; doing so will result in a [TaskStartTimeout](reference.md#error_TaskStartTimeout) +error. When [reading external data](#extern), EXTERN can read multiple files in parallel across different worker tasks. However, EXTERN does not split individual files across multiple worker tasks. If you have a diff --git a/docs/multi-stage-query/known-issues.md b/docs/multi-stage-query/known-issues.md index c76ab57aa7ac..648d3c297b47 100644 --- a/docs/multi-stage-query/known-issues.md +++ b/docs/multi-stage-query/known-issues.md @@ -33,16 +33,18 @@ sidebar_label: Known issues - Worker task stage outputs are stored in the working directory given by `druid.indexer.task.baseDir`. Stages that generate a large amount of output data may exhaust all available disk space. In this case, the query fails with -an [UnknownError](./reference.md#error-codes) with a message including "No space left on device". +an [UnknownError](./reference.md#error_UnknownError) with a message including "No space left on device". ## SELECT - SELECT from a Druid datasource does not include unpublished real-time data. - GROUPING SETS and UNION ALL are not implemented. Queries using these features return a - [QueryNotSupported](reference.md#error-codes) error. + [QueryNotSupported](reference.md#error_QueryNotSupported) error. -- For some COUNT DISTINCT queries, you'll encounter a [QueryNotSupported](reference.md#error-codes) error that includes `Must not have 'subtotalsSpec'` as one of its causes. This is caused by the planner attempting to use GROUPING SETs, which are not implemented. +- For some COUNT DISTINCT queries, you'll encounter a [QueryNotSupported](reference.md#error_QueryNotSupported) error + that includes `Must not have 'subtotalsSpec'` as one of its causes. This is caused by the planner attempting to use + GROUPING SETs, which are not implemented. - The numeric varieties of the EARLIEST and LATEST aggregators do not work properly. Attempting to use the numeric varieties of these aggregators lead to an error like diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 3550566874b8..8ea9adf61ae6 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -32,9 +32,9 @@ sidebar_label: Reference This topic is a reference guide for the multi-stage query architecture in Apache Druid. For examples of real-world usage, refer to the [Examples](examples.md) page. -### EXTERN +### `EXTERN` -Use the EXTERN function to read external data. +Use the `EXTERN` function to read external data. Function format: @@ -50,7 +50,7 @@ FROM TABLE( ) ``` -EXTERN consists of the following parts: +`EXTERN` consists of the following parts: 1. Any [Druid input source](../ingestion/native-batch-input-source.md) as a JSON-encoded string. 2. Any [Druid input format](../ingestion/data-formats.md) as a JSON-encoded string. @@ -58,12 +58,134 @@ EXTERN consists of the following parts: For more information, see [Read external data with EXTERN](concepts.md#extern). -### INSERT +### `HTTP`, `INLINE` and `LOCALFILES` -Use the INSERT statement to insert data. +While `EXTERN` allows you to specify an external table using JSON, other table functions allow you +describe the external table using SQL syntax. Each function works for one specific kind of input +source. You provide properties using SQL named arguments. The row signature is given using the +Druid SQL `EXTEND` keyword using SQL syntax and types. Function format: -Unlike standard SQL, INSERT loads data into the target table according to column name, not positionally. If necessary, -use `AS` in your SELECT column list to assign the correct names. Do not rely on their positions within the SELECT +```sql +SELECT + +FROM TABLE( + http( + userName => 'bob', + password => 'secret', + uris => 'http:foo.com/bar.csv', + format => 'csv' + ) + ) EXTEND (x VARCHAR, y VARCHAR, z BIGINT) +``` + +Note that the `EXTEND` keyword is optional. The following is equally valid (and perhaps +more convenient): + +```sql +SELECT + +FROM TABLE( + http( + userName => 'bob', + password => 'secret', + uris => 'http:foo.com/bar.csv', + format => 'csv' + ) + ) (x VARCHAR, y VARCHAR, z BIGINT) +``` + + +The set of table functions and formats is preliminary in this release. + +#### `HTTP` + +The `HTTP` table function represents the `HttpInputSource` class in Druid which allows you to +read from an HTTP server. The function accepts the following arguments: + +| Name | Description | JSON equivalent | Required | +| ---- | ----------- | --------------- | -------- | +| `userName` | Basic authentication user name | `httpAuthenticationUsername` | No | +| `password` | Basic authentication password | `httpAuthenticationPassword` | No | +| `passwordEnvVar` | Environment variable that contains the basic authentication password| `httpAuthenticationPassword` | No | +| `uris` | Comma-separated list of URIs to read. | `uris` | Yes | + +#### `INLINE` + +The `INLINE` table function represents the `InlineInputSource` class in Druid which provides +data directly in the table function. The function accepts the following arguments: + +| Name | Description | JSON equivalent | Required | +| ---- | ----------- | --------------- | -------- | +| `data` | Text lines of inline data. Separate lines with a newline. | `data` | Yes | + +#### `LOCALFILES` + +The `LOCALFILES` table function represents the `LocalInputSource` class in Druid which reads +files from the file system of the node running Druid. This is most useful for single-node +installations. The function accepts the following arguments: + +| Name | Description | JSON equivalent | Required | +| ---- | ----------- | --------------- | -------- | +| `baseDir` | Directory to read from. | `baseDir` | No | +| `filter` | Filter pattern to read. Example: `*.csv`. | `filter` | No | +| `files` | Comma-separated list of files to read. | `files` | No | + +You must either provide the `baseDir` or the list of `files`. You can provide both, in which case +the files are assumed relative to the `baseDir`. If you provide a `filter`, you must provide the +`baseDir`. + +Note that, due to [Issue #13359](https://github.com/apache/druid/issues/13359), the functionality +described above is broken. Until that issue is resolved, you must provide one or more absolute +file paths in the `files` property and the other two properties are unavailable. + +#### Table Function Format + +Each of the table functions above requires that you specify a format. + +| Name | Description | JSON equivalent | Required | +| ---- | ----------- | --------------- | -------- | +| `format` | The input format, using the same names as for `EXTERN`. | `inputFormat.type` | Yes | + +#### CSV Format + +Use the `csv` format to read from CSV. This choice selects the Druid `CsvInputFormat` class. + +| Name | Description | JSON equivalent | Required | +| ---- | ----------- | --------------- | -------- | +| `listDelimiter` | The delimiter to use for fields that represent a list of strings. | `listDelimiter` | No | +| `skipRows` | The number of rows to skip at the start of the file. Default is 0. | `skipHeaderRows` | No | + +MSQ does not have the ability to infer schema from a CSV, file, so the `findColumnsFromHeader` property +is unavailable. Instead, Columns are given using the `EXTEND` syntax described above. + +#### Delimited Text Format + +Use the `tsv` format to read from an arbitrary delimited (CSV-like) file such as tab-delimited, +pipe-delimited, etc. This choice selects the Druid `DelimitedInputFormat` class. + +| Name | Description | JSON equivalent | Required | +| ---- | ----------- | --------------- | -------- | +| `delimiter` | The delimiter which separates fields. | `delimiter` | Yes | +| `listDelimiter` | The delimiter to use for fields that represent a list of strings. | `listDelimiter` | No | +| `skipRows` | The number of rows to skip at the start of the file. Default is 0. | `skipHeaderRows` | No | + +As noted above, MSQ cannot infer schema using headers. Use `EXTEND` instead. + +#### JSON Format + +Use the `json` format to read from a JSON input source. This choice selects the Druid `JsonInputFormat` class. + +| Name | Description | JSON equivalent | Required | +| ---- | ----------- | --------------- | -------- | +| `keepNulls` | Whether to keep null values. Defaults to `false`. | `keepNullColumns` | No | + + +### `INSERT` + +Use the `INSERT` statement to insert data. + +Unlike standard SQL, `INSERT` loads data into the target table according to column name, not positionally. If necessary, +use `AS` in your `SELECT` column list to assign the correct names. Do not rely on their positions within the SELECT clause. Statement format: @@ -85,15 +207,15 @@ INSERT consists of the following parts: For more information, see [Load data with INSERT](concepts.md#insert). -### REPLACE +### `REPLACE` -You can use the REPLACE function to replace all or some of the data. +You can use the `REPLACE` function to replace all or some of the data. -Unlike standard SQL, REPLACE loads data into the target table according to column name, not positionally. If necessary, -use `AS` in your SELECT column list to assign the correct names. Do not rely on their positions within the SELECT +Unlike standard SQL, `REPLACE` loads data into the target table according to column name, not positionally. If necessary, +use `AS` in your `SELECT` column list to assign the correct names. Do not rely on their positions within the SELECT clause. -#### REPLACE all data +#### `REPLACE` all data Function format to replace all data: @@ -105,7 +227,7 @@ PARTITIONED BY