From a91c9aba52195eaed0f5c4ea49d4d51305a464eb Mon Sep 17 00:00:00 2001 From: Vadim Kurland Date: Sat, 10 Feb 2018 17:50:57 -0800 Subject: [PATCH 1/2] implement query() call to make asynchronous InfluxDb query with TimeUnit --- src/main/java/org/influxdb/InfluxDB.java | 16 +++++++++ .../java/org/influxdb/impl/InfluxDBImpl.java | 35 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 7bfd17d76..ccd72d826 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -356,6 +356,22 @@ public void write(final String database, final String retentionPolicy, */ public void query(final Query query, final Consumer onSuccess, final Consumer onFailure); + /** + * Execute a query against a database with ability to specify the time unit. + * + * One of the consumers will be executed. + * + * @param query + * the query to execute. + * @param timeUnit the time unit of the results. + * @param onSuccess + * the consumer to invoke when result is received + * @param onFailure + * the consumer to invoke when error is thrown + */ + public void query(final Query query, TimeUnit timeUnit, + final Consumer onSuccess, final Consumer onFailure); + /** * Execute a streaming query against a database. * diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 0c12bcb9b..6596b3273 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -434,6 +434,27 @@ public void onFailure(final Call call, final Throwable throwable) { }); } + /** + * {@inheritDoc} + */ + @Override + public void query(final Query query, final TimeUnit timeUnit, + final Consumer onSuccess, final Consumer onFailure) { + + final Call call = callQuery(query, timeUnit); + call.enqueue(new Callback() { + @Override + public void onResponse(final Call call, final Response response) { + onSuccess.accept(response.body()); + } + + @Override + public void onFailure(final Call call, final Throwable throwable) { + onFailure.accept(throwable); + } + }); + } + /** * {@inheritDoc} */ @@ -560,6 +581,20 @@ private Call callQuery(final Query query) { return call; } + /** + * Calls the influxDBService for the query. + */ + private Call callQuery(final Query query, final TimeUnit timeUnit) { + Call call; + if (query.requiresPost()) { + call = this.influxDBService.postQuery(this.username, + this.password, query.getDatabase(), query.getCommandWithUrlEncoded()); + } else { + call = this.influxDBService.query(this.username, this.password, query.getDatabase(), + TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded()); + } + return call; + } private T execute(final Call call) { try { From e3b01a9c1724c1f4d95644eab23cfeb37511015b Mon Sep 17 00:00:00 2001 From: Vadim Kurland Date: Sun, 11 Feb 2018 10:13:26 -0800 Subject: [PATCH 2/2] add unit tests for blocking and non-blocking variants of the query() call with and without TimeInit parameter --- src/test/java/org/influxdb/InfluxDBTest.java | 100 +++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index c9b1eee21..2964646b9 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import static org.junit.Assert.*; + /** * Test the InfluxDB API. * @@ -108,6 +110,104 @@ public void accept(QueryResult queryResult) { result.result(); } + /** + * Tests for callback query variant with TimeUnit. + */ + @Test + public void testCallbackQueryWithTimeUnit() throws Throwable { + + // first write some data + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.createDatabase(dbName); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + BatchPoints batchPoints = BatchPoints.database(dbName).tag("async", "true").retentionPolicy(rp).build(); + Point point1 = Point + .measurement("cpu") + .addField("idle", 90L) + .time(1_000_000, TimeUnit.MICROSECONDS) + .build(); + batchPoints.point(point1); + this.influxDB.write(batchPoints); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + + final Consumer onSuccess = queryResult -> { + final QueryResult.Series series = queryResult.getResults().get(0).getSeries().get(0); + final Object time = series.getValues().get(0).get(0); + // time should be a number 1000000 + assertFalse(time instanceof String); + assertEquals(time, 1000000.0); + influxDB.deleteDatabase(dbName); + countDownLatch.countDown(); + }; + + this.influxDB.query(query, TimeUnit.MICROSECONDS, onSuccess, throwable -> { + influxDB.deleteDatabase(dbName); + fail("The query has failed"); + }); + Assertions.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + } + + /** + * Tests for query variant with TimeUnit. + */ + @Test + public void testQueryWithTimeUnit() throws Throwable { + + // first write some data + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.createDatabase(dbName); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + BatchPoints batchPoints = BatchPoints.database(dbName).tag("async", "true").retentionPolicy(rp).build(); + Point point1 = Point + .measurement("cpu") + .addField("idle", 90L) + .time(1_000_000, TimeUnit.MICROSECONDS) + .build(); + batchPoints.point(point1); + this.influxDB.write(batchPoints); + + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + final QueryResult result = this.influxDB.query(query, TimeUnit.MICROSECONDS); + final QueryResult.Series series = result.getResults().get(0).getSeries().get(0); + final Object time = series.getValues().get(0).get(0); + // time should be a number 1000000 + assertFalse(time instanceof String); + assertEquals(time, 1000000.0); + influxDB.deleteDatabase(dbName); + } + + /** + * Tests for query variant without TimeUnit. This shoudl return timestamp as a string in RFC3339 + */ + @Test + public void testQueryWithoutTimeUnit() throws Throwable { + + // first write some data + String dbName = "write_unittest_" + System.currentTimeMillis(); + this.influxDB.createDatabase(dbName); + String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version()); + BatchPoints batchPoints = BatchPoints.database(dbName).tag("async", "true").retentionPolicy(rp).build(); + Point point1 = Point + .measurement("cpu") + .addField("idle", 90L) + .time(1_000_000, TimeUnit.MICROSECONDS) + .build(); + batchPoints.point(point1); + this.influxDB.write(batchPoints); + + Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); + final QueryResult result = this.influxDB.query(query); + final QueryResult.Series series = result.getResults().get(0).getSeries().get(0); + final Object time = series.getValues().get(0).get(0); + // time should be a string + assertTrue(time instanceof String); + // 1_000_000 microseconds is 1 second, so the timestamp = epoch+1sec + assertEquals(time, "1970-01-01T00:00:01Z"); + influxDB.deleteDatabase(dbName); + } + /** * Test that describe Databases works. */