diff --git a/CHANGELOG.md b/CHANGELOG.md index e648ed0452d..b379b0cd744 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,8 @@ ### Bug Fixes -1, [#684](https://github.com/influxdata/influxdb-client-java/issues/684): Fix checking for CSV end of table marker when parsing CSV stream to InfluxQLQueryResult, needed for example when parsing the results of a query like "SHOW SERIES". +1. [#684](https://github.com/influxdata/influxdb-client-java/issues/684): Fix checking for CSV end of table marker when parsing CSV stream to InfluxQLQueryResult, needed for example when parsing the results of a query like "SHOW SERIES". +2. [#662](https://github.com/influxdata/influxdb-client-java/issues/662): Adds to FluxDsl support for the `|> elapsed(unit)` function. ### Dependencies diff --git a/flux-dsl/src/main/java/com/influxdb/query/dsl/Flux.java b/flux-dsl/src/main/java/com/influxdb/query/dsl/Flux.java index 18339c9365e..5b1678014dc 100644 --- a/flux-dsl/src/main/java/com/influxdb/query/dsl/Flux.java +++ b/flux-dsl/src/main/java/com/influxdb/query/dsl/Flux.java @@ -46,6 +46,7 @@ import com.influxdb.query.dsl.functions.DistinctFlux; import com.influxdb.query.dsl.functions.DropFlux; import com.influxdb.query.dsl.functions.DuplicateFlux; +import com.influxdb.query.dsl.functions.ElapsedFlux; import com.influxdb.query.dsl.functions.ExpressionFlux; import com.influxdb.query.dsl.functions.FillFlux; import com.influxdb.query.dsl.functions.FilterFlux; @@ -89,6 +90,7 @@ import com.influxdb.query.dsl.functions.WindowFlux; import com.influxdb.query.dsl.functions.YieldFlux; import com.influxdb.query.dsl.functions.properties.FunctionsParameters; +import com.influxdb.query.dsl.functions.properties.TimeInterval; import com.influxdb.query.dsl.functions.restriction.Restrictions; import com.influxdb.query.dsl.utils.ImportUtils; import com.influxdb.utils.Arguments; @@ -111,6 +113,8 @@ *
  • {@link DistinctFlux}
  • *
  • {@link DropFlux}
  • *
  • {@link DuplicateFlux}
  • + *
  • {@link ElapsedFlux}
  • + *
  • {@link FillFlux}
  • *
  • {@link FilterFlux}
  • *
  • {@link FirstFlux}
  • *
  • {@link GroupFlux}
  • @@ -829,6 +833,56 @@ public final DuplicateFlux duplicate(@Nonnull final String column, @Nonnull fina return new DuplicateFlux(this).withColumn(column).withAs(as); } + /** + * Elapsed will add a column "elapsed" which measures the time elapsed since the last reading in the series. + *

    The unit parameter is defined by {@link ElapsedFlux#withDuration}. + * + * @param unit the {@link TimeInterval} used for measuring elapsed time. + * @return an {@link ElapsedFlux} object. + */ + @Nonnull + public final ElapsedFlux elapsed(@Nonnull final TimeInterval unit) { + return new ElapsedFlux(this).withDuration(unit); + } + + /** + * Elapsed will add a column "elapsed" which measures the time elapsed since the last reading in the series. + *

    The unit parameter is defined by {@link ElapsedFlux#withDuration}. + * + * @param count the number of ChronoUnits used for measuring elapsed time. + * @param unit {@link java.time.temporal.ChronoUnit} + * @return an {@link ElapsedFlux} object. + */ + @Nonnull + public final ElapsedFlux elapsed(@Nonnull final int count, @Nonnull final ChronoUnit unit) { + return new ElapsedFlux(this).withDuration(new TimeInterval((long) count, unit)); + } + + /** + * Elapsed will add a column "elapsed" which measures the time elapsed since the last reading in the series. + *

    In this version the default count is 1. So the interval will be measured only in the provided ChronoUnit. + *

    Internally, the unit parameter is defined by {@link ElapsedFlux#withDuration}. + * + * @param unit the {@link java.time.temporal.ChronoUnit} used for measuring elapsed time. + * @return an {@link ElapsedFlux} object. + */ + @Nonnull + public final ElapsedFlux elapsed(@Nonnull final ChronoUnit unit) { + return new ElapsedFlux(this).withDuration(new TimeInterval(1L, unit)); + } + + /** + * Elapsed will add a column "elapsed" which measures the time elapsed since the last reading in the series + * (this method defaults to units of 1 ms). + *

    This version defaults to single millisecond time units. + * + * @return an {@link ElapsedFlux} object. + */ + @Nonnull + public final ElapsedFlux elapsed() { + return new ElapsedFlux(this).withDuration(new TimeInterval(1L, ChronoUnit.MILLIS)); + } + /** * Replaces all null values in input tables with a non-null value. * diff --git a/flux-dsl/src/main/java/com/influxdb/query/dsl/functions/ElapsedFlux.java b/flux-dsl/src/main/java/com/influxdb/query/dsl/functions/ElapsedFlux.java new file mode 100644 index 00000000000..2395c960b4c --- /dev/null +++ b/flux-dsl/src/main/java/com/influxdb/query/dsl/functions/ElapsedFlux.java @@ -0,0 +1,66 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.query.dsl.functions; + +import javax.annotation.Nonnull; + +import com.influxdb.query.dsl.Flux; +import com.influxdb.query.dsl.functions.properties.TimeInterval; +import com.influxdb.utils.Arguments; + +/** + * Add an extra "elapsed" column to the result showing the time elapsed since the previous record in the series. + * + *

    + * Example + *

    + *    Flux flux = Flux.from("my-bucket")
    + *        .range(Instant.now().minus(15, ChronoUnit.MINUTES), Instant.now())
    + *        .filter(Restrictions.measurement().equal("wumpus"))
    + *        .elapsed(new TimeInterval(100L, ChronoUnit.NANOS));
    + *   
    + * + */ +public class ElapsedFlux extends AbstractParametrizedFlux { + + public ElapsedFlux(@Nonnull final Flux source) { + super(source); + } + + @Nonnull + @Override + protected String operatorName() { + return "elapsed"; + } + + /** + * + * @param duration - TimeInterval to be used for units when reporting elapsed period. + * @return this + */ + public ElapsedFlux withDuration(final TimeInterval duration) { + Arguments.checkNotNull(duration, "Duration is required"); + + this.withPropertyValue("unit", duration); + return this; + } +} diff --git a/flux-dsl/src/test/java/com/influxdb/query/dsl/functions/ElapsedFluxTest.java b/flux-dsl/src/test/java/com/influxdb/query/dsl/functions/ElapsedFluxTest.java new file mode 100644 index 00000000000..2e78afb01ad --- /dev/null +++ b/flux-dsl/src/test/java/com/influxdb/query/dsl/functions/ElapsedFluxTest.java @@ -0,0 +1,134 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.query.dsl.functions; + +import com.influxdb.query.dsl.Flux; +import com.influxdb.query.dsl.functions.properties.TimeInterval; +import com.influxdb.query.dsl.functions.restriction.Restrictions; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.*; + +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.stream.*; + +import static java.util.Map.entry; + +public class ElapsedFluxTest { + + @Test + void elapsedBasic(){ + Flux flux = Flux.from("telegraf") + .filter(Restrictions.measurement().equal("cpu")) + .range(-15L, ChronoUnit.MINUTES) + .elapsed(new TimeInterval(1000L, ChronoUnit.NANOS)); + + String expected = "from(bucket:\"telegraf\")\n" + + "\t|> filter(fn: (r) => r[\"_measurement\"] == \"cpu\")\n" + + "\t|> range(start:-15m)\n" + + "\t|> elapsed(unit:1000ns)"; + + Assertions.assertThat(flux.toString()).isEqualTo(expected); + } + + @Test + void elapsedIntChrono(){ + Flux flux = Flux.from("telegraf") + .filter(Restrictions.measurement().equal("mem")) + .range(-5L, ChronoUnit.MINUTES) + .elapsed(10, ChronoUnit.MICROS); + + String expected = "from(bucket:\"telegraf\")\n" + + "\t|> filter(fn: (r) => r[\"_measurement\"] == \"mem\")\n" + + "\t|> range(start:-5m)\n" + + "\t|> elapsed(unit:10us)"; + + Assertions.assertThat(flux.toString()).isEqualTo(expected); + } + + @Test + void elapsedChrono(){ + Flux flux = Flux.from("telegraf") + .filter(Restrictions.measurement().equal("netio")) + .range(-3L, ChronoUnit.HOURS) + .elapsed(ChronoUnit.MINUTES); + + String expected = "from(bucket:\"telegraf\")\n" + + "\t|> filter(fn: (r) => r[\"_measurement\"] == \"netio\")\n" + + "\t|> range(start:-3h)\n" + + "\t|> elapsed(unit:1m)"; + + Assertions.assertThat(flux.toString()).isEqualTo(expected); + } + + @Test + void elapsedDefault(){ + Flux flux = Flux.from("telegraf") + .filter(Restrictions.measurement().equal("disk")) + .range(-30L, ChronoUnit.MINUTES) + .elapsed(); + + String expected = "from(bucket:\"telegraf\")\n" + + "\t|> filter(fn: (r) => r[\"_measurement\"] == \"disk\")\n" + + "\t|> range(start:-30m)\n" + + "\t|> elapsed(unit:1ms)"; + + Assertions.assertThat(flux.toString()).isEqualTo(expected); + } + + private static Map chronoVals = Map.ofEntries( + entry(ChronoUnit.NANOS, "1ns"), + entry(ChronoUnit.MICROS, "1us"), + entry(ChronoUnit.MILLIS, "1ms"), + entry(ChronoUnit.SECONDS, "1s"), + entry(ChronoUnit.MINUTES, "1m"), + entry(ChronoUnit.HOURS, "1h"), + entry(ChronoUnit.HALF_DAYS, "12h"), + entry(ChronoUnit.DAYS, "1d"), + entry(ChronoUnit.WEEKS, "1w"), + entry(ChronoUnit.MONTHS, "1mo"), + entry(ChronoUnit.YEARS, "1y"), + entry(ChronoUnit.DECADES, "10y"), + entry(ChronoUnit.CENTURIES, "100y"), + entry(ChronoUnit.MILLENNIA, "1000y"), + entry(ChronoUnit.ERAS, "1000000000y") + ); + + @Test + void chronoUnitsSupported(){ + for(ChronoUnit cu : ChronoUnit.values()){ + if(cu.equals(ChronoUnit.FOREVER)){ + Flux flux = Flux.from("telegraf") + .elapsed(cu); + Assertions.assertThatThrownBy(flux::toString) + .isInstanceOf(IllegalArgumentException.class); + }else { + Flux flux = Flux.from("telegraf") + .elapsed(cu); + + Assertions.assertThat(String.format("from(bucket:\"telegraf\")\n" + + "\t|> elapsed(unit:%s)", chronoVals.get(cu))).isEqualTo(flux.toString()); + } + } + } +}