Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,22 @@ public void write(final String database, final String retentionPolicy,
*/
public void query(final Query query, final Consumer<QueryResult> onSuccess, final Consumer<Throwable> onFailure);

/**
* Execute a query against a database with ability to specify the time unit.
*
* One of the consumers will be executed.
*
* @param query
* the query to execute.
* @param timeUnit the time unit of the results.
* @param onSuccess
* the consumer to invoke when result is received
* @param onFailure
* the consumer to invoke when error is thrown
*/
public void query(final Query query, TimeUnit timeUnit,
final Consumer<QueryResult> onSuccess, final Consumer<Throwable> onFailure);

/**
* Execute a streaming query against a database.
*
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,27 @@ public void onFailure(final Call<QueryResult> call, final Throwable throwable) {
});
}

/**
* {@inheritDoc}
*/
@Override
public void query(final Query query, final TimeUnit timeUnit,
final Consumer<QueryResult> onSuccess, final Consumer<Throwable> onFailure) {

final Call<QueryResult> call = callQuery(query, timeUnit);
call.enqueue(new Callback<QueryResult>() {
@Override
public void onResponse(final Call<QueryResult> call, final Response<QueryResult> response) {
onSuccess.accept(response.body());
}

@Override
public void onFailure(final Call<QueryResult> call, final Throwable throwable) {
onFailure.accept(throwable);
}
});
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -560,6 +581,20 @@ private Call<QueryResult> callQuery(final Query query) {
return call;
}

/**
* Calls the influxDBService for the query.
*/
private Call<QueryResult> callQuery(final Query query, final TimeUnit timeUnit) {
Call<QueryResult> call;
if (query.requiresPost()) {
call = this.influxDBService.postQuery(this.username,
this.password, query.getDatabase(), query.getCommandWithUrlEncoded());
} else {
call = this.influxDBService.query(this.username, this.password, query.getDatabase(),
TimeUtil.toTimePrecision(timeUnit), query.getCommandWithUrlEncoded());
}
return call;
}

private <T> T execute(final Call<T> call) {
try {
Expand Down
100 changes: 100 additions & 0 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.junit.Assert.*;

/**
* Test the InfluxDB API.
*
Expand Down Expand Up @@ -108,6 +110,104 @@ public void accept(QueryResult queryResult) {
result.result();
}

/**
* Tests for callback query variant with TimeUnit.
*/
@Test
public void testCallbackQueryWithTimeUnit() throws Throwable {

// first write some data
String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.createDatabase(dbName);
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
BatchPoints batchPoints = BatchPoints.database(dbName).tag("async", "true").retentionPolicy(rp).build();
Point point1 = Point
.measurement("cpu")
.addField("idle", 90L)
.time(1_000_000, TimeUnit.MICROSECONDS)
.build();
batchPoints.point(point1);
this.influxDB.write(batchPoints);

final CountDownLatch countDownLatch = new CountDownLatch(1);
Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName);

final Consumer<QueryResult> onSuccess = queryResult -> {
final QueryResult.Series series = queryResult.getResults().get(0).getSeries().get(0);
final Object time = series.getValues().get(0).get(0);
// time should be a number 1000000
assertFalse(time instanceof String);
assertEquals(time, 1000000.0);
influxDB.deleteDatabase(dbName);
countDownLatch.countDown();
};

this.influxDB.query(query, TimeUnit.MICROSECONDS, onSuccess, throwable -> {
influxDB.deleteDatabase(dbName);
fail("The query has failed");
});
Assertions.assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
}

/**
* Tests for query variant with TimeUnit.
*/
@Test
public void testQueryWithTimeUnit() throws Throwable {

// first write some data
String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.createDatabase(dbName);
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
BatchPoints batchPoints = BatchPoints.database(dbName).tag("async", "true").retentionPolicy(rp).build();
Point point1 = Point
.measurement("cpu")
.addField("idle", 90L)
.time(1_000_000, TimeUnit.MICROSECONDS)
.build();
batchPoints.point(point1);
this.influxDB.write(batchPoints);

Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName);
final QueryResult result = this.influxDB.query(query, TimeUnit.MICROSECONDS);
final QueryResult.Series series = result.getResults().get(0).getSeries().get(0);
final Object time = series.getValues().get(0).get(0);
// time should be a number 1000000
assertFalse(time instanceof String);
assertEquals(time, 1000000.0);
influxDB.deleteDatabase(dbName);
}

/**
* Tests for query variant without TimeUnit. This shoudl return timestamp as a string in RFC3339
*/
@Test
public void testQueryWithoutTimeUnit() throws Throwable {

// first write some data
String dbName = "write_unittest_" + System.currentTimeMillis();
this.influxDB.createDatabase(dbName);
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
BatchPoints batchPoints = BatchPoints.database(dbName).tag("async", "true").retentionPolicy(rp).build();
Point point1 = Point
.measurement("cpu")
.addField("idle", 90L)
.time(1_000_000, TimeUnit.MICROSECONDS)
.build();
batchPoints.point(point1);
this.influxDB.write(batchPoints);

Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName);
final QueryResult result = this.influxDB.query(query);
final QueryResult.Series series = result.getResults().get(0).getSeries().get(0);
final Object time = series.getValues().get(0).get(0);
// time should be a string
assertTrue(time instanceof String);
// 1_000_000 microseconds is 1 second, so the timestamp = epoch+1sec
assertEquals(time, "1970-01-01T00:00:01Z");
influxDB.deleteDatabase(dbName);
}

/**
* Test that describe Databases works.
*/
Expand Down