Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 77 additions & 36 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,27 @@ CeresDB is a high-performance, distributed, schema-less, cloud native time-serie
- Java 8 or later is required for compilation


## Create table
## Init CeresDB client
```java
// CeresDBx options
final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) //
.tenant("test", "sub_test", "test_token") // tenant info
// maximum retry times when write fails
// (only some error codes will be retried, such as the routing table failure)
.writeMaxRetries(1)
// maximum retry times when read fails
// (only some error codes will be retried, such as the routing table failure)
.readMaxRetries(1)
.build();

final CeresDBxClient client = new CeresDBxClient();
if (!client.init(this.opts)) {
throw new IllegalStateException("Fail to start CeresDBxClient");
}
```
For more configuration options, see [configuration](docs/configuration.md)

## Create table example
CeresDB is a Schema-less time-series database, so creating table schema ahead of data ingestion is not required (CeresDB will create a default schema according to the very first data you write into it). Of course, you can also manually create a schema for fine grained management purposes (eg. managing index).

The following table creation statement(using the SQL API included in SDK )shows all field types supported by CeresDB:
Expand Down Expand Up @@ -126,51 +146,39 @@ SqlResult result = client.management().executeSql("CREATE TABLE MY_FIRST_TABL("
);
```

## Data ingestion example
## Write data example
```java
// CeresDBx options
final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) //
.tenant("test", "sub_test", "test_token") // tenant info
// maximum retry times when write fails
// (only some error codes will be retried, such as the routing table failure)
.writeMaxRetries(1)
// maximum retry times when read fails
// (only some error codes will be retried, such as the routing table failure)
.readMaxRetries(1)
.build();

final CeresDBxClient client = new CeresDBxClient();
if (!client.init(this.opts)) {
throw new IllegalStateException("Fail to start CeresDBxClient");
}

final long t0 = System.currentTimeMillis();
final long t1 = t0 + 1000;
final long t2 = t1 + 1000;
final Rows data = Series.newBuilder("machine_metric")
.tag("city", "Singapore")
.tag("ip", "127.0.0.1")
.toRowsBuilder()
// codes below organizes 3 lines data (3 timestamps) for the `cpu` and `mem` column, this will just transport once through network. CeresDB encourage practices like this, because the SDK could use efficient compression algorithm to reduce network traffic and also be friendly to the sever side.
.field(t0, "cpu", FieldValue.withDouble(0.23)) // first row, first column
.field(t0, "mem", FieldValue.withDouble(0.55)) // first row, second column
.field(t1, "cpu", FieldValue.withDouble(0.25)) // second row, first column
.field(t1, "mem", FieldValue.withDouble(0.56)) // second row, second column
.field(t2, "cpu", FieldValue.withDouble(0.21)) // third row, first column
.field(t2, "mem", FieldValue.withDouble(0.52)) // third row, second column
.build();
.tag("city", "Singapore")
.tag("ip", "127.0.0.1")
.toRowsBuilder()
// codes below organizes 3 lines data (3 timestamps) for the `cpu` and `mem` column, this will just transport once through network. CeresDB encourage practices like this, because the SDK could use efficient compression algorithm to reduce network traffic and also be friendly to the sever side.
.field(t0, "cpu", FieldValue.withDouble(0.23)) // first row, first column
.field(t0, "mem", FieldValue.withDouble(0.55)) // first row, second column
.field(t1, "cpu", FieldValue.withDouble(0.25)) // second row, first column
.field(t1, "mem", FieldValue.withDouble(0.56)) // second row, second column
.field(t2, "cpu", FieldValue.withDouble(0.21)) // third row, first column
.field(t2, "mem", FieldValue.withDouble(0.52)) // third row, second column
.build();

final CompletableFuture<Result<WriteOk, Err>> wf = client.write(data);
// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API
final Result<WriteOk, Err> wr = wf.get();

Assert.assertTrue(wr.isOk());
Assert.assertEquals(3, wr.getOk().getSuccess());
// `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage.
Assert.assertEquals(3, wr.mapOr(0, WriteOk::getSuccess).intValue());
Assert.assertEquals(0, wr.getOk().getFailed());
Assert.assertEquals(0, wr.mapOr(-1, WriteOk::getFailed).intValue());
Assert.assertTrue(wr.isOk());
Assert.assertEquals(3, wr.getOk().getSuccess());
// `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage.
Assert.assertEquals(3, wr.mapOr(0, WriteOk::getSuccess).intValue());
Assert.assertEquals(0, wr.getOk().getFailed());
Assert.assertEquals(0, wr.mapOr(-1, WriteOk::getFailed).intValue());
```
See [write](docs/write.md)

## Query data example
```java
final QueryRequest queryRequest = QueryRequest.newBuilder()
.forMetrics("machine_metric") // table name is optional. If not provided, SQL parser will parse the `ql` to get the table name and do the routing automaticly
.ql("select timestamp, cpu, mem from machine_metric") //
Expand All @@ -179,12 +187,45 @@ final CompletableFuture<Result<QueryOk, Err>> qf = client.query(queryRequest);
// here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API
final Result<QueryOk, Err> qr = qf.get();

Assert.assertTrue(qr.isOk());
Assert.assertTrue(qr.isOk());

final QueryOk queryOk = qr.getOk();

final List<Record> records = queryOk.mapToRecord().collect(Collectors.toList())
```
See [read](docs/read.md)

## stream write/read Example
CeresDB support streaming writing and reading,suitable for large-scale data reading and writing。
```java
final Calendar time = Calendar.getInstance();
final StreamWriteBuf<Rows, WriteOk> writeBuf = client.streamWrite("machine_metric");
for (int i = 0; i < 1000; i++) {
time.add(Calendar.MILLISECOND, 1);
Collection<Rows> rows = new ArrayList<>();
final long t0 = System.currentTimeMillis();
final long t1 = t0 + 1000;
final long t2 = t1 + 1000;
final Rows data = Series.newBuilder("machine_metric").tag("city", "Singapore").tag("ip", "127.0.0.1")
.toRowsBuilder()
.field(t0, "cpu", FieldValue.withDouble(0.23))
.field(t0, "mem", FieldValue.withDouble(0.55))
.field(t1, "cpu", FieldValue.withDouble(0.25))
.field(t1, "mem", FieldValue.withDouble(0.56))
.field(t2, "cpu", FieldValue.withDouble(0.21))
.field(t2, "mem", FieldValue.withDouble(0.52))
.build();
rows.add(data);
writeBuf.writeAndFlush(data);
}
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
Assert.assertEquals(1000, writeOk.join().getSuccess());

final QueryRequest req = QueryRequest.newBuilder().ql("select * from %s", "machine_metric").build();
final Iterator<Record> it = client.blockingStreamQuery(req, 3, TimeUnit.SECONDS);
```
See [streaming](docs/streaming.md)

## Licensing
Under [Apache License 2.0](./LICENSE).

Expand Down
83 changes: 71 additions & 12 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,41 @@ CeresDBxClient 是 CeresDB 的高性能 Java 版客户端。CeresDB 是定位为
## 需要
编译需要 Java 8 及以上

## 建表
## 初始化 CeresDB Client
```java
// CeresDBx options
final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) //
.tenant("test", "sub_test", "test_token") // tenant info
// maximum retry times when write fails
// (only some error codes will be retried, such as the routing table failure)
.writeMaxRetries(1)
// maximum retry times when read fails
// (only some error codes will be retried, such as the routing table failure)
.readMaxRetries(1)
.build();

final CeresDBxClient client = new CeresDBxClient();
if (!client.init(this.opts)) {
throw new IllegalStateException("Fail to start CeresDBxClient");
}
```
配置详情见 [configuration](docs/configuration.md)


## 建表 Example
CeresDB 是一个 Schema-less 的时序数据引擎,你可以不必创建 schema 就立刻写入数据(CeresDB 会根据你的第一次写入帮你创建一个默认的 schema)。
当然你也可以自行创建一个 schema 来更精细化的管理的表(比如索引等)

下面的建表语句(使用 SDK 的 SQL API)包含了 CeresDB 支持的所有字段类型:

```java
final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) // 默认 gprc 端口号
.managementAddress("127.0.0.1", 5440) // 注意,直接使用 sql 需要连接 CeresDB 的 http 端口
.tenant("test", "sub_test", "test_token") // 租户信息
.writeMaxRetries(1) // 写入失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效)
.readMaxRetries(1) // 查询失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效)
.build();

SqlResult result = client.management().executeSql("CREATE TABLE MY_FIRST_TABL(" +
"ts TIMESTAMP NOT NULL," +
"c1 STRING TAG NOT NULL," +
Expand All @@ -122,20 +150,11 @@ SqlResult result = client.management().executeSql("CREATE TABLE MY_FIRST_TABL("
"TIMESTAMP KEY(ts)) ENGINE=Analytic"
);
```
详情见 [table](docs/table.md)


## 写入 Example
```java
// CeresDBx options
final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) //
.tenant("test", "sub_test", "test_token") // 租户信息
.writeMaxRetries(1) // 写入失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效)
.readMaxRetries(1) // 查询失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效)
.build();

final CeresDBxClient client = new CeresDBxClient();
if (!client.init(this.opts)) {
throw new IllegalStateException("Fail to start CeresDBxClient");
}

final long t0 = System.currentTimeMillis();
final long t1 = t0 + 1000;
Expand Down Expand Up @@ -164,6 +183,11 @@ Assert.assertEquals(3, wr.mapOr(0, WriteOk::getSuccess).intValue());
Assert.assertEquals(0, wr.getOk().getFailed());
Assert.assertEquals(0, wr.mapOr(-1, WriteOk::getFailed).intValue());

```
详情见 [write](docs/write.md)

## 查询 Example
```java
final QueryRequest queryRequest = QueryRequest.newBuilder()
.forMetrics("machine_metric") // 表名可选填,不填的话 SQL Parser 会自动解析 ql 涉及到的表名并完成自动路由
.ql("select timestamp, cpu, mem from machine_metric") //
Expand All @@ -177,7 +201,42 @@ Assert.assertTrue(qr.isOk());
final QueryOk queryOk = qr.getOk();

final List<Record> records = queryOk.mapToRecord().collect(Collectors.toList())
final Stream<User> users = queryOk.map(bytes -> parseUser(bytes));

```
详情见 [read](docs/read.md)

## 流式读写 Example
CeresDB 支持流式读写,适用于大规模数据读写。
```java
final Calendar time = Calendar.getInstance();
final StreamWriteBuf<Rows, WriteOk> writeBuf = client.streamWrite("machine_metric");
for (int i = 0; i < 1000; i++) {
time.add(Calendar.MILLISECOND, 1);
Collection<Rows> rows = new ArrayList<>();
final long t0 = System.currentTimeMillis();
final long t1 = t0 + 1000;
final long t2 = t1 + 1000;
final Rows data = Series.newBuilder("machine_metric").tag("city", "Singapore").tag("ip", "127.0.0.1")
.toRowsBuilder()
.field(t0, "cpu", FieldValue.withDouble(0.23))
.field(t0, "mem", FieldValue.withDouble(0.55))
.field(t1, "cpu", FieldValue.withDouble(0.25))
.field(t1, "mem", FieldValue.withDouble(0.56))
.field(t2, "cpu", FieldValue.withDouble(0.21))
.field(t2, "mem", FieldValue.withDouble(0.52))
.build();
rows.add(data);
writeBuf.writeAndFlush(data);
}
final CompletableFuture<WriteOk> writeOk = writeBuf.completed();
Assert.assertEquals(1000, writeOk.join().getSuccess());

final QueryRequest req = QueryRequest.newBuilder().ql("select * from %s", "machine_metric").build();
final Iterator<Record> it = client.blockingStreamQuery(req, 3, TimeUnit.SECONDS);
```
详情见 [streaming](docs/streaming.md)


## Licensing
遵守 [Apache License 2.0](./LICENSE).
Expand Down
12 changes: 11 additions & 1 deletion ceresdb-example/src/test/java/com/ceresdb/ReadmeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class ReadmeTest {
@Ignore
@Test
public void readmeTest() throws ExecutionException, InterruptedException {
final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) // ceresdb default grpc port 8831
final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) // CeresDB default grpc port 8831
.managementAddress("127.0.0.1", 5440) // CeresDB default http port 3307
.tenant("test", "sub_test", "test_token") // tenant info
// maximum retry times when write fails
// (only some error codes will be retried, such as the routing table failure)
Expand All @@ -49,6 +50,15 @@ public void readmeTest() throws ExecutionException, InterruptedException {
throw new IllegalStateException("Fail to start CeresDBxClient");
}

// Create table manually, creating table schema ahead of data ingestion is not required
String createTableSql = "CREATE TABLE MY_FIRST_TABLE(" + "ts TIMESTAMP NOT NULL," + "c1 STRING TAG NOT NULL,"
+ "c2 STRING TAG NOT NULL," + "c3 DOUBLE NULL," + "c4 STRING NULL," + "c5 INT64 NULL,"
+ "c6 FLOAT NULL," + "c7 INT32 NULL," + "c8 INT16 NULL," + "c9 INT8 NULL,"
+ "c10 BOOLEAN NULL," + "c11 UINT64 NULL," + "c12 UINT32 NULL," + "c13 UINT16 NULL,"
+ "c14 UINT8 NULL," + "c15 TIMESTAMP NULL," + "c16 VARBINARY NULL,"
+ "TIMESTAMP KEY(ts)" + ") ENGINE=Analytic";
SqlResult result = client.management().executeSql(createTableSql);

final long t0 = System.currentTimeMillis();
final long t1 = t0 + 1000;
final long t2 = t1 + 1000;
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

| name | description |
| --- | --- |
| managementAddress | 管理服务地址,通常和几区地址 IP/Host 相同,但端口不同 |
| managementAddress | 管理服务地址,通常和集群地址 IP/Host 相同,但端口不同 |
| tenant | 租户信息 |
| checkSql | 是否在客户端提前检查 sql 有效性,默认 true |

Expand Down
2 changes: 1 addition & 1 deletion docs/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ final CompletableFuture<WriteOk> ret = writer
.completed(); // 调用 completed 会结束这个`流`,server 会返回总体的写入结果
```

### 流失查询 API 说明
### 流式查询 API 说明

```java
/**
Expand Down