diff --git a/README.md b/README.md index be243e6..488c2bf 100644 --- a/README.md +++ b/README.md @@ -98,7 +98,27 @@ CeresDB is a high-performance, distributed, schema-less, cloud native time-serie - Java 8 or later is required for compilation -## Create table +## Init CeresDB client +```java +// CeresDBx options +final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) // + .tenant("test", "sub_test", "test_token") // tenant info + // maximum retry times when write fails + // (only some error codes will be retried, such as the routing table failure) + .writeMaxRetries(1) + // maximum retry times when read fails + // (only some error codes will be retried, such as the routing table failure) + .readMaxRetries(1) + .build(); + +final CeresDBxClient client = new CeresDBxClient(); + if (!client.init(this.opts)) { + throw new IllegalStateException("Fail to start CeresDBxClient"); + } +``` +For more configuration options, see [configuration](docs/configuration.md) + +## Create table example CeresDB is a Schema-less time-series database, so creating table schema ahead of data ingestion is not required (CeresDB will create a default schema according to the very first data you write into it). Of course, you can also manually create a schema for fine grained management purposes (eg. managing index). The following table creation statement(using the SQL API included in SDK )shows all field types supported by CeresDB: @@ -126,51 +146,39 @@ SqlResult result = client.management().executeSql("CREATE TABLE MY_FIRST_TABL(" ); ``` -## Data ingestion example +## Write data example ```java -// CeresDBx options -final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) // - .tenant("test", "sub_test", "test_token") // tenant info - // maximum retry times when write fails - // (only some error codes will be retried, such as the routing table failure) - .writeMaxRetries(1) - // maximum retry times when read fails - // (only some error codes will be retried, such as the routing table failure) - .readMaxRetries(1) - .build(); - -final CeresDBxClient client = new CeresDBxClient(); -if (!client.init(this.opts)) { - throw new IllegalStateException("Fail to start CeresDBxClient"); -} - final long t0 = System.currentTimeMillis(); final long t1 = t0 + 1000; final long t2 = t1 + 1000; final Rows data = Series.newBuilder("machine_metric") - .tag("city", "Singapore") - .tag("ip", "127.0.0.1") - .toRowsBuilder() - // codes below organizes 3 lines data (3 timestamps) for the `cpu` and `mem` column, this will just transport once through network. CeresDB encourage practices like this, because the SDK could use efficient compression algorithm to reduce network traffic and also be friendly to the sever side. - .field(t0, "cpu", FieldValue.withDouble(0.23)) // first row, first column - .field(t0, "mem", FieldValue.withDouble(0.55)) // first row, second column - .field(t1, "cpu", FieldValue.withDouble(0.25)) // second row, first column - .field(t1, "mem", FieldValue.withDouble(0.56)) // second row, second column - .field(t2, "cpu", FieldValue.withDouble(0.21)) // third row, first column - .field(t2, "mem", FieldValue.withDouble(0.52)) // third row, second column - .build(); + .tag("city", "Singapore") + .tag("ip", "127.0.0.1") + .toRowsBuilder() + // codes below organizes 3 lines data (3 timestamps) for the `cpu` and `mem` column, this will just transport once through network. CeresDB encourage practices like this, because the SDK could use efficient compression algorithm to reduce network traffic and also be friendly to the sever side. + .field(t0, "cpu", FieldValue.withDouble(0.23)) // first row, first column + .field(t0, "mem", FieldValue.withDouble(0.55)) // first row, second column + .field(t1, "cpu", FieldValue.withDouble(0.25)) // second row, first column + .field(t1, "mem", FieldValue.withDouble(0.56)) // second row, second column + .field(t2, "cpu", FieldValue.withDouble(0.21)) // third row, first column + .field(t2, "mem", FieldValue.withDouble(0.52)) // third row, second column + .build(); final CompletableFuture> wf = client.write(data); // here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API final Result wr = wf.get(); -Assert.assertTrue(wr.isOk()); -Assert.assertEquals(3, wr.getOk().getSuccess()); -// `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage. -Assert.assertEquals(3, wr.mapOr(0, WriteOk::getSuccess).intValue()); -Assert.assertEquals(0, wr.getOk().getFailed()); -Assert.assertEquals(0, wr.mapOr(-1, WriteOk::getFailed).intValue()); + Assert.assertTrue(wr.isOk()); + Assert.assertEquals(3, wr.getOk().getSuccess()); + // `Result` class referenced the Rust language practice, provides rich functions (such as mapXXX, andThen) transforming the result value to improve programming efficiency. You can refer to the API docs for detail usage. + Assert.assertEquals(3, wr.mapOr(0, WriteOk::getSuccess).intValue()); + Assert.assertEquals(0, wr.getOk().getFailed()); + Assert.assertEquals(0, wr.mapOr(-1, WriteOk::getFailed).intValue()); +``` +See [write](docs/write.md) +## Query data example +```java final QueryRequest queryRequest = QueryRequest.newBuilder() .forMetrics("machine_metric") // table name is optional. If not provided, SQL parser will parse the `ql` to get the table name and do the routing automaticly .ql("select timestamp, cpu, mem from machine_metric") // @@ -179,12 +187,45 @@ final CompletableFuture> qf = client.query(queryRequest); // here the `future.get` is just for demonstration, a better async programming practice would be using the CompletableFuture API final Result qr = qf.get(); -Assert.assertTrue(qr.isOk()); + Assert.assertTrue(qr.isOk()); final QueryOk queryOk = qr.getOk(); final List records = queryOk.mapToRecord().collect(Collectors.toList()) ``` +See [read](docs/read.md) + +## stream write/read Example +CeresDB support streaming writing and reading,suitable for large-scale data reading and writing。 +```java +final Calendar time = Calendar.getInstance(); +final StreamWriteBuf writeBuf = client.streamWrite("machine_metric"); + for (int i = 0; i < 1000; i++) { + time.add(Calendar.MILLISECOND, 1); + Collection rows = new ArrayList<>(); + final long t0 = System.currentTimeMillis(); + final long t1 = t0 + 1000; + final long t2 = t1 + 1000; + final Rows data = Series.newBuilder("machine_metric").tag("city", "Singapore").tag("ip", "127.0.0.1") + .toRowsBuilder() + .field(t0, "cpu", FieldValue.withDouble(0.23)) + .field(t0, "mem", FieldValue.withDouble(0.55)) + .field(t1, "cpu", FieldValue.withDouble(0.25)) + .field(t1, "mem", FieldValue.withDouble(0.56)) + .field(t2, "cpu", FieldValue.withDouble(0.21)) + .field(t2, "mem", FieldValue.withDouble(0.52)) + .build(); + rows.add(data); + writeBuf.writeAndFlush(data); + } +final CompletableFuture writeOk = writeBuf.completed(); + Assert.assertEquals(1000, writeOk.join().getSuccess()); + +final QueryRequest req = QueryRequest.newBuilder().ql("select * from %s", "machine_metric").build(); +final Iterator it = client.blockingStreamQuery(req, 3, TimeUnit.SECONDS); +``` +See [streaming](docs/streaming.md) + ## Licensing Under [Apache License 2.0](./LICENSE). diff --git a/README_CN.md b/README_CN.md index 9579474..e1ebf49 100644 --- a/README_CN.md +++ b/README_CN.md @@ -94,13 +94,41 @@ CeresDBxClient 是 CeresDB 的高性能 Java 版客户端。CeresDB 是定位为 ## 需要 编译需要 Java 8 及以上 -## 建表 +## 初始化 CeresDB Client +```java +// CeresDBx options +final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) // + .tenant("test", "sub_test", "test_token") // tenant info + // maximum retry times when write fails + // (only some error codes will be retried, such as the routing table failure) + .writeMaxRetries(1) + // maximum retry times when read fails + // (only some error codes will be retried, such as the routing table failure) + .readMaxRetries(1) + .build(); + +final CeresDBxClient client = new CeresDBxClient(); + if (!client.init(this.opts)) { + throw new IllegalStateException("Fail to start CeresDBxClient"); + } +``` +配置详情见 [configuration](docs/configuration.md) + + +## 建表 Example CeresDB 是一个 Schema-less 的时序数据引擎,你可以不必创建 schema 就立刻写入数据(CeresDB 会根据你的第一次写入帮你创建一个默认的 schema)。 当然你也可以自行创建一个 schema 来更精细化的管理的表(比如索引等) 下面的建表语句(使用 SDK 的 SQL API)包含了 CeresDB 支持的所有字段类型: ```java +final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) // 默认 gprc 端口号 + .managementAddress("127.0.0.1", 5440) // 注意,直接使用 sql 需要连接 CeresDB 的 http 端口 + .tenant("test", "sub_test", "test_token") // 租户信息 + .writeMaxRetries(1) // 写入失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效) + .readMaxRetries(1) // 查询失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效) + .build(); + SqlResult result = client.management().executeSql("CREATE TABLE MY_FIRST_TABL(" + "ts TIMESTAMP NOT NULL," + "c1 STRING TAG NOT NULL," + @@ -122,20 +150,11 @@ SqlResult result = client.management().executeSql("CREATE TABLE MY_FIRST_TABL(" "TIMESTAMP KEY(ts)) ENGINE=Analytic" ); ``` +详情见 [table](docs/table.md) + ## 写入 Example ```java -// CeresDBx options -final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) // - .tenant("test", "sub_test", "test_token") // 租户信息 - .writeMaxRetries(1) // 写入失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效) - .readMaxRetries(1) // 查询失败重试次数上限(只有部分错误 code 才会重试,比如路由表失效) - .build(); - -final CeresDBxClient client = new CeresDBxClient(); -if (!client.init(this.opts)) { - throw new IllegalStateException("Fail to start CeresDBxClient"); -} final long t0 = System.currentTimeMillis(); final long t1 = t0 + 1000; @@ -164,6 +183,11 @@ Assert.assertEquals(3, wr.mapOr(0, WriteOk::getSuccess).intValue()); Assert.assertEquals(0, wr.getOk().getFailed()); Assert.assertEquals(0, wr.mapOr(-1, WriteOk::getFailed).intValue()); +``` +详情见 [write](docs/write.md) + +## 查询 Example +```java final QueryRequest queryRequest = QueryRequest.newBuilder() .forMetrics("machine_metric") // 表名可选填,不填的话 SQL Parser 会自动解析 ql 涉及到的表名并完成自动路由 .ql("select timestamp, cpu, mem from machine_metric") // @@ -177,7 +201,42 @@ Assert.assertTrue(qr.isOk()); final QueryOk queryOk = qr.getOk(); final List records = queryOk.mapToRecord().collect(Collectors.toList()) +final Stream users = queryOk.map(bytes -> parseUser(bytes)); + +``` +详情见 [read](docs/read.md) + +## 流式读写 Example +CeresDB 支持流式读写,适用于大规模数据读写。 +```java +final Calendar time = Calendar.getInstance(); +final StreamWriteBuf writeBuf = client.streamWrite("machine_metric"); + for (int i = 0; i < 1000; i++) { + time.add(Calendar.MILLISECOND, 1); + Collection rows = new ArrayList<>(); + final long t0 = System.currentTimeMillis(); + final long t1 = t0 + 1000; + final long t2 = t1 + 1000; + final Rows data = Series.newBuilder("machine_metric").tag("city", "Singapore").tag("ip", "127.0.0.1") + .toRowsBuilder() + .field(t0, "cpu", FieldValue.withDouble(0.23)) + .field(t0, "mem", FieldValue.withDouble(0.55)) + .field(t1, "cpu", FieldValue.withDouble(0.25)) + .field(t1, "mem", FieldValue.withDouble(0.56)) + .field(t2, "cpu", FieldValue.withDouble(0.21)) + .field(t2, "mem", FieldValue.withDouble(0.52)) + .build(); + rows.add(data); + writeBuf.writeAndFlush(data); + } +final CompletableFuture writeOk = writeBuf.completed(); + Assert.assertEquals(1000, writeOk.join().getSuccess()); + +final QueryRequest req = QueryRequest.newBuilder().ql("select * from %s", "machine_metric").build(); +final Iterator it = client.blockingStreamQuery(req, 3, TimeUnit.SECONDS); ``` +详情见 [streaming](docs/streaming.md) + ## Licensing 遵守 [Apache License 2.0](./LICENSE). diff --git a/ceresdb-example/src/test/java/com/ceresdb/ReadmeTest.java b/ceresdb-example/src/test/java/com/ceresdb/ReadmeTest.java index 9ee8e73..29946cb 100644 --- a/ceresdb-example/src/test/java/com/ceresdb/ReadmeTest.java +++ b/ceresdb-example/src/test/java/com/ceresdb/ReadmeTest.java @@ -35,7 +35,8 @@ public class ReadmeTest { @Ignore @Test public void readmeTest() throws ExecutionException, InterruptedException { - final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) // ceresdb default grpc port 8831 + final CeresDBxOptions opts = CeresDBxOptions.newBuilder("127.0.0.1", 8831) // CeresDB default grpc port 8831 + .managementAddress("127.0.0.1", 5440) // CeresDB default http port 3307 .tenant("test", "sub_test", "test_token") // tenant info // maximum retry times when write fails // (only some error codes will be retried, such as the routing table failure) @@ -49,6 +50,15 @@ public void readmeTest() throws ExecutionException, InterruptedException { throw new IllegalStateException("Fail to start CeresDBxClient"); } + // Create table manually, creating table schema ahead of data ingestion is not required + String createTableSql = "CREATE TABLE MY_FIRST_TABLE(" + "ts TIMESTAMP NOT NULL," + "c1 STRING TAG NOT NULL," + + "c2 STRING TAG NOT NULL," + "c3 DOUBLE NULL," + "c4 STRING NULL," + "c5 INT64 NULL," + + "c6 FLOAT NULL," + "c7 INT32 NULL," + "c8 INT16 NULL," + "c9 INT8 NULL," + + "c10 BOOLEAN NULL," + "c11 UINT64 NULL," + "c12 UINT32 NULL," + "c13 UINT16 NULL," + + "c14 UINT8 NULL," + "c15 TIMESTAMP NULL," + "c16 VARBINARY NULL," + + "TIMESTAMP KEY(ts)" + ") ENGINE=Analytic"; + SqlResult result = client.management().executeSql(createTableSql); + final long t0 = System.currentTimeMillis(); final long t1 = t0 + 1000; final long t2 = t1 + 1000; diff --git a/docs/configuration.md b/docs/configuration.md index 8c827f0..80e9516 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -48,7 +48,7 @@ | name | description | | --- | --- | -| managementAddress | 管理服务地址,通常和几区地址 IP/Host 相同,但端口不同 | +| managementAddress | 管理服务地址,通常和集群地址 IP/Host 相同,但端口不同 | | tenant | 租户信息 | | checkSql | 是否在客户端提前检查 sql 有效性,默认 true | diff --git a/docs/streaming.md b/docs/streaming.md index b2994cf..aa8fad0 100644 --- a/docs/streaming.md +++ b/docs/streaming.md @@ -77,7 +77,7 @@ final CompletableFuture ret = writer .completed(); // 调用 completed 会结束这个`流`,server 会返回总体的写入结果 ``` -### 流失查询 API 说明 +### 流式查询 API 说明 ```java /**