diff --git a/src/UserGuide/Master/Tree/API/Programming-Go-Native-API.md b/src/UserGuide/Master/Tree/API/Programming-Go-Native-API.md index baad278b4..a4afa6c4f 100644 --- a/src/UserGuide/Master/Tree/API/Programming-Go-Native-API.md +++ b/src/UserGuide/Master/Tree/API/Programming-Go-Native-API.md @@ -21,44 +21,396 @@ # Go Native API -The Git repository for the Go Native API client is located [here](https://github.com/apache/iotdb-client-go/) +The Go Native API supports interaction with the database through both `Session` and `SessionPool` methods. Since `Session` is not thread-safe, using `SessionPool` is strongly recommended for programming. In multi-threaded concurrent scenarios, `SessionPool` can reasonably manage and allocate connection resources to enhance system performance and resource utilization efficiency. -## 1. Dependencies +This article focuses on the usage of `SessionPool`, covering the complete process from environment preparation and core operation steps to the full set of interfaces. - * golang >= 1.13 - * make >= 3.0 - * curl >= 7.1.1 - * thrift 0.15.0 - * Linux、Macos or other unix-like systems - * Windows+bash (WSL、cygwin、Git Bash) +## 1. Environment Preparation -## 2. Installation +### 1.1 Prerequisites - * go mod +- golang >= 1.13 +- make >= 3.0 +- curl >= 7.1.1 +- thrift: 0.15.0 +- Linux, MacOS, or other Unix-like systems +- Windows + bash (Git is needed to download the IoTDB Go client; any one of WSL, cygwin, or Git Bash is acceptable) -```sh -export GO111MODULE=on -export GOPROXY=https://goproxy.io +### 1.2 Installation Methods -mkdir session_example && cd session_example +- **Using go mod** -curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go + ```bash + # Switch to the HOME path of GOPATH and enable the Go Modules feature + export GO111MODULE=on + + # Configure the GOPROXY environment variable + export GOPROXY=https://goproxy.io + + # Create a named folder or directory and switch to it + mkdir session_example && cd session_example + + # Save the file, which will automatically redirect to the new address + curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go + + # Initialize the go module environment + go mod init session_example + + # Download dependency packages + go mod tidy + + # Compile and run the program + go run session_example.go + ``` +- **Using GOPATH** -go mod init session_example -go run session_example.go + ```bash + # Get thrift 0.13.0 + go get github.com/apache/thrift@0.13.0 + + # Recursively create the directory + mkdir -p $GOPATH/src/iotdb-client-go-example/session_example + + # Switch to the current directory + cd $GOPATH/src/iotdb-client-go-example/session_example + + # Save the file, which will automatically redirect to the new address + curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go + + # Initialize the go module environment + go mod init + + # Download dependency packages + go mod tidy + + # Compile and run the program + go run session_example.go + ``` + +## 2. Core Steps + +The three core steps for using the Go native interface to operate IoTDB are as follows: + +1. **Create a connection pool instance**: Initialize a `SessionPool` object, configuring connection parameters and pool size. +2. **Execute database operations**: `GetSession()` from the pool, perform operations like data writing or querying, and **must** `PutBack(session)` upon completion. +3. **Close connection pool resources**: Call `sessionPool.Close()` at the end of the program to release all connections. + +The following sections illustrate the core development workflow and do not demonstrate all parameters and interfaces. For the complete functionality and parameters, please refer to: **[Full Interface Description](../API/Programming-Go-Native-API.md#_3-full-interface-list)** or check: **[SessionPool Example Source Code](https://github.com/apache/iotdb-client-go/blob/main/example/session_pool/session_pool_example.go)** + +### 2.1 Create Connection Pool Instance + +- **Single Instance** + + ```go + config := &client.PoolConfig{ + Host: host, + Port: port, + UserName: user, + Password: password, + } + sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + defer sessionPool.Close() + ``` +- **Distributed or Active-Active** + + ```go + config := &client.PoolConfig{ + UserName: user, + Password: password, + NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","), + } + sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + defer sessionPool.Close() + ``` + +### 2.2 Database Operations + +#### 2.2.1 Data Insertion + +```go +session, err := sessionPool.GetSession() +defer sessionPool.PutBack(session) +status, err := session.InsertTablet(tablet, false) +tablet.Reset() +checkError(status, err) ``` -* GOPATH +#### 2.2.2 Data Query + +```go +var timeout int64 = 1000 +session, err := sessionPool.GetSession() +defer sessionPool.PutBack(session) +if err != nil { + log.Print(err) + return +} +sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout) +if err == nil { + defer sessionDataSet.Close() + printDataSet(sessionDataSet) +} else { + log.Println(err) +} +``` + +### 2.3 Usage Example + +```go +import ( + "flag" + "fmt" + "log" + "math/rand" + "strings" + "time" + + "github.com/apache/iotdb-client-go/v2/client" + "github.com/apache/iotdb-client-go/v2/common" +) + +var ( + host string + port string + user string + password string +) +var sessionPool client.SessionPool + +func main() { + flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100") + flag.StringVar(&port, "port", "6667", "--port=6667") + flag.StringVar(&user, "user", "root", "--user=root") + flag.StringVar(&password, "password", "root", "--password=root") + flag.Parse() + + // 1. Create connection pool + config := &client.PoolConfig{ + Host: host, + Port: port, + UserName: user, + Password: password, + } + sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + + defer sessionPool.Close() + + // 2. Create storage group + setStorageGroup("root.sg1") + + // 3. Create time series + createTimeseries("root.sg1.dev1.temperature") + + // 4. Data insertion + insertTablet() + + // 5. Data query + executeQueryStatement("select temperature from root.sg1.dev1") + + // 6. Deletion + deleteTimeseries("root.sg1.dev1.temperature") + deleteStorageGroup("root.sg1") -```sh -# get thrift 0.15.0 -go get github.com/apache/thrift -cd $GOPATH/src/github.com/apache/thrift -git checkout 0.15.0 +} -mkdir -p $GOPATH/src/iotdb-client-go-example/session_example -cd $GOPATH/src/iotdb-client-go-example/session_example -curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go -go run session_example.go +// Set storage group +func setStorageGroup(sg string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + session.SetStorageGroup(sg) + } +} + +// Delete storage group +func deleteStorageGroup(sg string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.DeleteStorageGroup(sg)) + } +} + +// Create time series +func createTimeseries(path string) { + var ( + dataType = client.FLOAT + encoding = client.PLAIN + compressor = client.SNAPPY + ) + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil)) + } +} + +// Delete time series +func deleteTimeseries(paths ...string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.DeleteTimeseries(paths)) + } +} + +// Insert Tablet data +func insertTablet() { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + if tablet, err := createTablet(12); err == nil { + status, err := session.InsertTablet(tablet, false) + tablet.Reset() + checkError(status, err) + } else { + log.Fatal(err) + } + } +} + +// Create Tablet +func createTablet(rowCount int) (*client.Tablet, error) { + tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{ + { + Measurement: "temperature", + DataType: client.FLOAT, + }, + }, rowCount) + + if err != nil { + return nil, err + } + ts := time.Now().UTC().UnixNano() / 1000000 + for row := 0; row < int(rowCount); row++ { + ts++ + tablet.SetTimestamp(ts, row) + tablet.SetValueAt(rand.Float32(), 0, row) + tablet.RowSize++ + } + return tablet, nil +} + +// Execute query statement +func executeQueryStatement(sql string) { + var timeout int64 = 1000 + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err != nil { + log.Print(err) + return + } + sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout) + if err == nil { + defer sessionDataSet.Close() + printDataSet(sessionDataSet) + } else { + log.Println(err) + } +} + +// Print query results +func printDataSet(sds *client.SessionDataSet) { + columnNames := sds.GetColumnNames() + for _, value := range columnNames { + fmt.Printf("%s\t", value) + } + fmt.Println() + + for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { + for _, columnName := range columnNames { + isNull, _ := sds.IsNull(columnName) + + if isNull { + fmt.Printf("%v\t\t", "null") + } else { + v, _ := sds.GetString(columnName) + fmt.Printf("%v\t\t", v) + } + } + fmt.Println() + } +} + +// Check error +func checkError(status *common.TSStatus, err error) { + if err != nil { + log.Fatal(err) + } + + if status != nil { + if err = client.VerifySuccess(status); err != nil { + log.Println(err) + } + } +} ``` +## 3. Full Interface List + +### 3.1 SessionPool Management Interfaces + +| Interface Name | Function Description | Parameter Description | +|:-------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool` | Creates and returns a Session connection pool instance. | `config`: Pool configuration
`maxSize`: Maximum connections (≤0 uses CPU count * 5)
`connTimeoutMs`: TCP connection timeout (ms)
`waitTimeoutMs`: Session acquisition wait timeout (ms)
`enableComp`: Whether to enable compression | +| `GetSession() (Session, error)` | Gets an available Session from the pool. Blocks if the pool is full, returns error on timeout. Must be paired with `PutBack`. | None | +| `PutBack(session Session)` | Returns a used Session back to the connection pool. | `session`: The instance obtained from `GetSession` | +| `Close()` | Closes the connection pool, releasing all active connections. Must be called before program exit. | None | + +### 3.2 Data Insertion Interfaces + +*The following interfaces are called via the obtained Session.* + +| Interface Name | Function Description | Parameter Description | +|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | Inserts a single record. | `deviceId`: Device ID
`measurements`: Measurement list
`dataTypes`: Data type list
`values`: Value list
`timestamp`: Timestamp | +| `InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | Inserts a single aligned record. | `deviceId`: Device ID
`measurements`: Measurement list
`dataTypes`: Data type list
`values`: Value list
`timestamp`: Timestamp | +| `InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error)` | Inserts a single record in string format. | `deviceId`: Device ID
`measurements`: Measurement list
`values`: String-type value list
`timestamp`: Timestamp | +| `InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | Inserts multiple records for multiple devices. | `deviceIds`: Device ID list
`measurements`: 2D measurement list
`dataTypes`: 2D data type list
`values`: 2D value list
`timestamps`: Timestamp list | +| `InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | Inserts multiple records for multiple aligned devices. | `deviceIds`: Device ID list
`measurements`: 2D measurement list
`dataTypes`: 2D data type list
`values`: 2D value list
`timestamps`: Timestamp list | +| `InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | Inserts multiple rows of data for a single device. | `tablet`: The Tablet data to insert
`sorted`: Whether the data is sorted | +| `InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | Inserts multiple rows of data for a single aligned device. | `tablet`: The Tablet data to insert
`sorted`: Whether the data is sorted | +| `InsertTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | Batch inserts multiple Tablet data. | `tablets`: Multiple Tablet data to insert
`sorted`: Whether the data is sorted | +| `InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | Batch inserts multiple aligned devices' data. | `tablets`: Multiple Tablet data to insert
`sorted`: Whether the data is sorted | + +### 3.3 SQL and Query Interfaces + +*The following interfaces are called via the obtained Session.* + +| Interface Name | Function Description | Parameter Description | +|:---------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `ExecuteStatement(sql string)(SessionDataSet, error)` | Executes SQL (primarily for queries), returns a SessionDataSet. | `sql`: The SQL query statement to execute | +| `ExecuteQueryStatement(sql string, timeoutMs *int64) (SessionDataSet, error)` | Executes a query SQL with optional timeout, returns a SessionDataSet. | `sql`: The SQL query statement to execute
`timeoutMs`: Query timeout time (milliseconds) | +| `ExecuteNonQueryStatement(sql string) (r common.TSStatus, err error)` | Executes SQL that does not return a result set (e.g., INSERT, CREATE, DELETE). | `sql`: The SQL statement to execute | +| `ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error)` | Queries raw data for specified time series within a time range. | `paths`: Query path list
`startTime`: Start timestamp
`endTime`: End timestamp | +| `ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error)` | Executes an aggregation query (COUNT, AVG, etc.). | `paths`: Query path list
`aggregations`: Aggregation type list
`startTime, endTime, interval`: Start time, end time, and interval
`timeoutMs`: Query timeout time | +| `ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error)` | Executes multiple SQL statements in batch. | `sqls`: The SQL statements to execute | + +### 3.4 Metadata Operation Interfaces + +*The following interfaces are called via the obtained Session.* + +| Interface Name | Function Description | Parameter Description | +|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `SetStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | Creates a database (storage group). | `storageGroupId`: Database (storage group) name | +| `DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | Deletes a database (storage group). | `storageGroupId`: The database (storage group) name to delete | +| `DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error)` | Deletes multiple databases (storage groups). | `storageGroupIds`: The list of database (storage group) names to delete | +| `CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error)` | Creates a non-aligned time series. | `path`: Time series path
`dataType`: Data type
`encoding`: Encoding method
`compressor`: Compression algorithm
`attributes`: (Optional) Series attributes
`tags`: (Optional) Series tags | +| `CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error)` | Creates a group of aligned time series. | `prefixPath`: Time series path prefix
`measurements`: Measurement name list
`dataTypes, encodings, compressors`: Data type, encoding, and compressor list for each measurement
`measurementAlias`: (Optional) Alias list for each measurement | +| `DeleteTimeseries(paths []string) (r common.TSStatus, err error)` | Deletes multiple time series (including their data). | `paths`: The list of time series paths to delete | +| `DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error)` | Deletes data within a time period for specified time series (metadata is preserved). | `paths`: The list of time series paths
`startTime`: Start timestamp
`endTime`: End timestamp | +| `SetTimeZone(timeZone string) (r common.TSStatus, err error)` | Sets the time zone for the current session. | `timeZone`: Time zone string, e.g., "UTC", "Asia/Shanghai", "GMT+8" | +| `GetTimeZone() (string, error)` | Gets the time zone of the current session. | None | + +### 3.5 Key Configuration Structure (PoolConfig) + +| Field | Type | Required | Description | +|:------------|:-----------|:------------------------------|:-------------------------------------------------------------------------| +| `Host` | `string` | Choose one with `NodeUrls` | Single-node host address. | +| `Port` | `string` | Choose one with `NodeUrls` | Single-node port. | +| `NodeUrls` | `[]string` | Choose one with `Host`/`Port` | Cluster node address list, format: `"host:port"`. | +| `UserName` | `string` | Yes | Username. | +| `Password` | `string` | Yes | Password. | +| `FetchSize` | `int32` | No | Query result set fetch size, default 1024. | +| `TimeZone` | `string` | No | Session time zone, e.g., "Asia/Shanghai". Default uses server time zone. | +| `Database` | `string` | No | For table model; used to set the session's default database. | diff --git a/src/UserGuide/latest/API/Programming-Go-Native-API.md b/src/UserGuide/latest/API/Programming-Go-Native-API.md index baad278b4..a4afa6c4f 100644 --- a/src/UserGuide/latest/API/Programming-Go-Native-API.md +++ b/src/UserGuide/latest/API/Programming-Go-Native-API.md @@ -21,44 +21,396 @@ # Go Native API -The Git repository for the Go Native API client is located [here](https://github.com/apache/iotdb-client-go/) +The Go Native API supports interaction with the database through both `Session` and `SessionPool` methods. Since `Session` is not thread-safe, using `SessionPool` is strongly recommended for programming. In multi-threaded concurrent scenarios, `SessionPool` can reasonably manage and allocate connection resources to enhance system performance and resource utilization efficiency. -## 1. Dependencies +This article focuses on the usage of `SessionPool`, covering the complete process from environment preparation and core operation steps to the full set of interfaces. - * golang >= 1.13 - * make >= 3.0 - * curl >= 7.1.1 - * thrift 0.15.0 - * Linux、Macos or other unix-like systems - * Windows+bash (WSL、cygwin、Git Bash) +## 1. Environment Preparation -## 2. Installation +### 1.1 Prerequisites - * go mod +- golang >= 1.13 +- make >= 3.0 +- curl >= 7.1.1 +- thrift: 0.15.0 +- Linux, MacOS, or other Unix-like systems +- Windows + bash (Git is needed to download the IoTDB Go client; any one of WSL, cygwin, or Git Bash is acceptable) -```sh -export GO111MODULE=on -export GOPROXY=https://goproxy.io +### 1.2 Installation Methods -mkdir session_example && cd session_example +- **Using go mod** -curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go + ```bash + # Switch to the HOME path of GOPATH and enable the Go Modules feature + export GO111MODULE=on + + # Configure the GOPROXY environment variable + export GOPROXY=https://goproxy.io + + # Create a named folder or directory and switch to it + mkdir session_example && cd session_example + + # Save the file, which will automatically redirect to the new address + curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go + + # Initialize the go module environment + go mod init session_example + + # Download dependency packages + go mod tidy + + # Compile and run the program + go run session_example.go + ``` +- **Using GOPATH** -go mod init session_example -go run session_example.go + ```bash + # Get thrift 0.13.0 + go get github.com/apache/thrift@0.13.0 + + # Recursively create the directory + mkdir -p $GOPATH/src/iotdb-client-go-example/session_example + + # Switch to the current directory + cd $GOPATH/src/iotdb-client-go-example/session_example + + # Save the file, which will automatically redirect to the new address + curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go + + # Initialize the go module environment + go mod init + + # Download dependency packages + go mod tidy + + # Compile and run the program + go run session_example.go + ``` + +## 2. Core Steps + +The three core steps for using the Go native interface to operate IoTDB are as follows: + +1. **Create a connection pool instance**: Initialize a `SessionPool` object, configuring connection parameters and pool size. +2. **Execute database operations**: `GetSession()` from the pool, perform operations like data writing or querying, and **must** `PutBack(session)` upon completion. +3. **Close connection pool resources**: Call `sessionPool.Close()` at the end of the program to release all connections. + +The following sections illustrate the core development workflow and do not demonstrate all parameters and interfaces. For the complete functionality and parameters, please refer to: **[Full Interface Description](../API/Programming-Go-Native-API.md#_3-full-interface-list)** or check: **[SessionPool Example Source Code](https://github.com/apache/iotdb-client-go/blob/main/example/session_pool/session_pool_example.go)** + +### 2.1 Create Connection Pool Instance + +- **Single Instance** + + ```go + config := &client.PoolConfig{ + Host: host, + Port: port, + UserName: user, + Password: password, + } + sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + defer sessionPool.Close() + ``` +- **Distributed or Active-Active** + + ```go + config := &client.PoolConfig{ + UserName: user, + Password: password, + NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","), + } + sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + defer sessionPool.Close() + ``` + +### 2.2 Database Operations + +#### 2.2.1 Data Insertion + +```go +session, err := sessionPool.GetSession() +defer sessionPool.PutBack(session) +status, err := session.InsertTablet(tablet, false) +tablet.Reset() +checkError(status, err) ``` -* GOPATH +#### 2.2.2 Data Query + +```go +var timeout int64 = 1000 +session, err := sessionPool.GetSession() +defer sessionPool.PutBack(session) +if err != nil { + log.Print(err) + return +} +sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout) +if err == nil { + defer sessionDataSet.Close() + printDataSet(sessionDataSet) +} else { + log.Println(err) +} +``` + +### 2.3 Usage Example + +```go +import ( + "flag" + "fmt" + "log" + "math/rand" + "strings" + "time" + + "github.com/apache/iotdb-client-go/v2/client" + "github.com/apache/iotdb-client-go/v2/common" +) + +var ( + host string + port string + user string + password string +) +var sessionPool client.SessionPool + +func main() { + flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100") + flag.StringVar(&port, "port", "6667", "--port=6667") + flag.StringVar(&user, "user", "root", "--user=root") + flag.StringVar(&password, "password", "root", "--password=root") + flag.Parse() + + // 1. Create connection pool + config := &client.PoolConfig{ + Host: host, + Port: port, + UserName: user, + Password: password, + } + sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + + defer sessionPool.Close() + + // 2. Create storage group + setStorageGroup("root.sg1") + + // 3. Create time series + createTimeseries("root.sg1.dev1.temperature") + + // 4. Data insertion + insertTablet() + + // 5. Data query + executeQueryStatement("select temperature from root.sg1.dev1") + + // 6. Deletion + deleteTimeseries("root.sg1.dev1.temperature") + deleteStorageGroup("root.sg1") -```sh -# get thrift 0.15.0 -go get github.com/apache/thrift -cd $GOPATH/src/github.com/apache/thrift -git checkout 0.15.0 +} -mkdir -p $GOPATH/src/iotdb-client-go-example/session_example -cd $GOPATH/src/iotdb-client-go-example/session_example -curl -o session_example.go -L https://github.com/apache/iotdb-client-go/raw/main/example/session_example.go -go run session_example.go +// Set storage group +func setStorageGroup(sg string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + session.SetStorageGroup(sg) + } +} + +// Delete storage group +func deleteStorageGroup(sg string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.DeleteStorageGroup(sg)) + } +} + +// Create time series +func createTimeseries(path string) { + var ( + dataType = client.FLOAT + encoding = client.PLAIN + compressor = client.SNAPPY + ) + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil)) + } +} + +// Delete time series +func deleteTimeseries(paths ...string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.DeleteTimeseries(paths)) + } +} + +// Insert Tablet data +func insertTablet() { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + if tablet, err := createTablet(12); err == nil { + status, err := session.InsertTablet(tablet, false) + tablet.Reset() + checkError(status, err) + } else { + log.Fatal(err) + } + } +} + +// Create Tablet +func createTablet(rowCount int) (*client.Tablet, error) { + tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{ + { + Measurement: "temperature", + DataType: client.FLOAT, + }, + }, rowCount) + + if err != nil { + return nil, err + } + ts := time.Now().UTC().UnixNano() / 1000000 + for row := 0; row < int(rowCount); row++ { + ts++ + tablet.SetTimestamp(ts, row) + tablet.SetValueAt(rand.Float32(), 0, row) + tablet.RowSize++ + } + return tablet, nil +} + +// Execute query statement +func executeQueryStatement(sql string) { + var timeout int64 = 1000 + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err != nil { + log.Print(err) + return + } + sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout) + if err == nil { + defer sessionDataSet.Close() + printDataSet(sessionDataSet) + } else { + log.Println(err) + } +} + +// Print query results +func printDataSet(sds *client.SessionDataSet) { + columnNames := sds.GetColumnNames() + for _, value := range columnNames { + fmt.Printf("%s\t", value) + } + fmt.Println() + + for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { + for _, columnName := range columnNames { + isNull, _ := sds.IsNull(columnName) + + if isNull { + fmt.Printf("%v\t\t", "null") + } else { + v, _ := sds.GetString(columnName) + fmt.Printf("%v\t\t", v) + } + } + fmt.Println() + } +} + +// Check error +func checkError(status *common.TSStatus, err error) { + if err != nil { + log.Fatal(err) + } + + if status != nil { + if err = client.VerifySuccess(status); err != nil { + log.Println(err) + } + } +} ``` +## 3. Full Interface List + +### 3.1 SessionPool Management Interfaces + +| Interface Name | Function Description | Parameter Description | +|:-------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool` | Creates and returns a Session connection pool instance. | `config`: Pool configuration
`maxSize`: Maximum connections (≤0 uses CPU count * 5)
`connTimeoutMs`: TCP connection timeout (ms)
`waitTimeoutMs`: Session acquisition wait timeout (ms)
`enableComp`: Whether to enable compression | +| `GetSession() (Session, error)` | Gets an available Session from the pool. Blocks if the pool is full, returns error on timeout. Must be paired with `PutBack`. | None | +| `PutBack(session Session)` | Returns a used Session back to the connection pool. | `session`: The instance obtained from `GetSession` | +| `Close()` | Closes the connection pool, releasing all active connections. Must be called before program exit. | None | + +### 3.2 Data Insertion Interfaces + +*The following interfaces are called via the obtained Session.* + +| Interface Name | Function Description | Parameter Description | +|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | Inserts a single record. | `deviceId`: Device ID
`measurements`: Measurement list
`dataTypes`: Data type list
`values`: Value list
`timestamp`: Timestamp | +| `InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | Inserts a single aligned record. | `deviceId`: Device ID
`measurements`: Measurement list
`dataTypes`: Data type list
`values`: Value list
`timestamp`: Timestamp | +| `InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error)` | Inserts a single record in string format. | `deviceId`: Device ID
`measurements`: Measurement list
`values`: String-type value list
`timestamp`: Timestamp | +| `InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | Inserts multiple records for multiple devices. | `deviceIds`: Device ID list
`measurements`: 2D measurement list
`dataTypes`: 2D data type list
`values`: 2D value list
`timestamps`: Timestamp list | +| `InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | Inserts multiple records for multiple aligned devices. | `deviceIds`: Device ID list
`measurements`: 2D measurement list
`dataTypes`: 2D data type list
`values`: 2D value list
`timestamps`: Timestamp list | +| `InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | Inserts multiple rows of data for a single device. | `tablet`: The Tablet data to insert
`sorted`: Whether the data is sorted | +| `InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | Inserts multiple rows of data for a single aligned device. | `tablet`: The Tablet data to insert
`sorted`: Whether the data is sorted | +| `InsertTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | Batch inserts multiple Tablet data. | `tablets`: Multiple Tablet data to insert
`sorted`: Whether the data is sorted | +| `InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | Batch inserts multiple aligned devices' data. | `tablets`: Multiple Tablet data to insert
`sorted`: Whether the data is sorted | + +### 3.3 SQL and Query Interfaces + +*The following interfaces are called via the obtained Session.* + +| Interface Name | Function Description | Parameter Description | +|:---------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `ExecuteStatement(sql string)(SessionDataSet, error)` | Executes SQL (primarily for queries), returns a SessionDataSet. | `sql`: The SQL query statement to execute | +| `ExecuteQueryStatement(sql string, timeoutMs *int64) (SessionDataSet, error)` | Executes a query SQL with optional timeout, returns a SessionDataSet. | `sql`: The SQL query statement to execute
`timeoutMs`: Query timeout time (milliseconds) | +| `ExecuteNonQueryStatement(sql string) (r common.TSStatus, err error)` | Executes SQL that does not return a result set (e.g., INSERT, CREATE, DELETE). | `sql`: The SQL statement to execute | +| `ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error)` | Queries raw data for specified time series within a time range. | `paths`: Query path list
`startTime`: Start timestamp
`endTime`: End timestamp | +| `ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error)` | Executes an aggregation query (COUNT, AVG, etc.). | `paths`: Query path list
`aggregations`: Aggregation type list
`startTime, endTime, interval`: Start time, end time, and interval
`timeoutMs`: Query timeout time | +| `ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error)` | Executes multiple SQL statements in batch. | `sqls`: The SQL statements to execute | + +### 3.4 Metadata Operation Interfaces + +*The following interfaces are called via the obtained Session.* + +| Interface Name | Function Description | Parameter Description | +|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `SetStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | Creates a database (storage group). | `storageGroupId`: Database (storage group) name | +| `DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | Deletes a database (storage group). | `storageGroupId`: The database (storage group) name to delete | +| `DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error)` | Deletes multiple databases (storage groups). | `storageGroupIds`: The list of database (storage group) names to delete | +| `CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error)` | Creates a non-aligned time series. | `path`: Time series path
`dataType`: Data type
`encoding`: Encoding method
`compressor`: Compression algorithm
`attributes`: (Optional) Series attributes
`tags`: (Optional) Series tags | +| `CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error)` | Creates a group of aligned time series. | `prefixPath`: Time series path prefix
`measurements`: Measurement name list
`dataTypes, encodings, compressors`: Data type, encoding, and compressor list for each measurement
`measurementAlias`: (Optional) Alias list for each measurement | +| `DeleteTimeseries(paths []string) (r common.TSStatus, err error)` | Deletes multiple time series (including their data). | `paths`: The list of time series paths to delete | +| `DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error)` | Deletes data within a time period for specified time series (metadata is preserved). | `paths`: The list of time series paths
`startTime`: Start timestamp
`endTime`: End timestamp | +| `SetTimeZone(timeZone string) (r common.TSStatus, err error)` | Sets the time zone for the current session. | `timeZone`: Time zone string, e.g., "UTC", "Asia/Shanghai", "GMT+8" | +| `GetTimeZone() (string, error)` | Gets the time zone of the current session. | None | + +### 3.5 Key Configuration Structure (PoolConfig) + +| Field | Type | Required | Description | +|:------------|:-----------|:------------------------------|:-------------------------------------------------------------------------| +| `Host` | `string` | Choose one with `NodeUrls` | Single-node host address. | +| `Port` | `string` | Choose one with `NodeUrls` | Single-node port. | +| `NodeUrls` | `[]string` | Choose one with `Host`/`Port` | Cluster node address list, format: `"host:port"`. | +| `UserName` | `string` | Yes | Username. | +| `Password` | `string` | Yes | Password. | +| `FetchSize` | `int32` | No | Query result set fetch size, default 1024. | +| `TimeZone` | `string` | No | Session time zone, e.g., "Asia/Shanghai". Default uses server time zone. | +| `Database` | `string` | No | For table model; used to set the session's default database. | diff --git a/src/zh/UserGuide/Master/Tree/API/Programming-Go-Native-API.md b/src/zh/UserGuide/Master/Tree/API/Programming-Go-Native-API.md index 99f8d748c..fe9e49b9b 100644 --- a/src/zh/UserGuide/Master/Tree/API/Programming-Go-Native-API.md +++ b/src/zh/UserGuide/Master/Tree/API/Programming-Go-Native-API.md @@ -21,20 +21,25 @@ # Go 原生接口 -## 1. 依赖 +Go 原生 API 支持通过 `Session `和 `SessionPool `两种方式与数据库进行交互。由于 `Session `非线程安全,因此强烈推荐使用 `SessionPool `编程。在多线程并发的情形下,`SessionPool `能够合理地管理和分配连接资源,以提升系统性能与资源利用效率。 - * golang >= 1.13 - * make >= 3.0 - * curl >= 7.1.1 - * thrift 0.15.0 - * Linux、Macos 或其他类 unix 系统 - * Windows+bash (下载 IoTDB Go client 需要 git ,通过 WSL、cygwin、Git Bash 任意一种方式均可) +本文将围绕 `SessionPool` 的使用进行说明,涵盖从环境准备、核心操作步骤到全量接口的完整内容。 -## 2. 安装方法 +## 1. 环境准备 +### 1.1 前置依赖 - * 通过 go mod +* golang >= 1.13 +* make >= 3.0 +* curl >= 7.1.1 +* thrift: 0.15.0 +* Linux、Macos 或其他类 unix 系统 +* Windows+bash (下载 IoTDB Go client 需要 git ,通过 WSL、cygwin、Git Bash 任意一种方式均可) -```sh +### 1.2 安装方法 + +* 使用 go mod + +```Bash # 切换到 GOPATH 的 HOME 路径,启用 Go Modules 功能 export GO111MODULE=on @@ -57,9 +62,9 @@ go mod tidy go run session_example.go ``` -* 通过 GOPATH +* 使用 GOPATH -```sh +```Bash # get thrift 0.13.0 go get github.com/apache/thrift@0.13.0 @@ -81,4 +86,330 @@ go mod tidy # 编译并运行程序 go run session_example.go ``` -**注意:GO原生客户端Session不是线程安全的,强烈不建议在多线程场景下应用。如有多线程应用场景,请使用Session Pool.** + +## 2. 核心步骤 + +使用 Go 原生接口操作 IoTDB 的三个核心步骤如下: + +1. 创建连接池实例:初始化一个`SessionPool`对象,配置连接参数和池大小。 +2. 执行数据库操作:从连接池中`GetSession()`,执行数据写入或查询等操作,完成后必须`PutBack(session)`。 +3. 关闭连接池资源:程序结束时调用`sessionPool.Close()`,释放所有连接。 + +下面的章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [全量接口说明](../API/Programming-Go-Native-API.md#_3-全量接口) 或 查阅: [SessionPool 示例源码](https://github.com/apache/iotdb-client-go/blob/main/example/session_pool/session_pool_example.go) + +### 2.1 创建连接池实例 + +* 单实例 + +```Go +config := &client.PoolConfig{ + Host: host, + Port: port, + UserName: user, + Password: password, +} +sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) +defer​ ​sessionPool.Close() +``` + +* 分布式或双活 + +```Go +config := &client.PoolConfig{ + UserName: user, + Password: password, + NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","), + } +sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) +defer​ ​sessionPool.Close() +``` + +### 2.2 数据库操作 +#### 2.2.1 数据写入 + +```Go +session, err := sessionPool.GetSession() +defer sessionPool.PutBack(session) +status, err := session.InsertTablet(tablet, false) +tablet.Reset() +checkError(status, err) +``` + +#### 2.2.2 数据查询 + +```Go +var timeout int64 = 1000 +session, err := sessionPool.GetSession() +defer sessionPool.PutBack(session) +if err != nil { + log.Print(err) + return +} +sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout) +if err == nil { + defer sessionDataSet.Close() + printDataSet(sessionDataSet) +} else { + log.Println(err) +} +``` + +### 2.3 使用示例 + +```Go +import ( + "flag" + "fmt" + "log" + "math/rand" + "strings" + "time" + + "github.com/apache/iotdb-client-go/v2/client" + "github.com/apache/iotdb-client-go/v2/common" +) + +var ( + host string + port string + user string + password string +) +var sessionPool client.SessionPool + +func main() { + flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100") + flag.StringVar(&port, "port", "6667", "--port=6667") + flag.StringVar(&user, "user", "root", "--user=root") + flag.StringVar(&password, "password", "root", "--password=root") + flag.Parse() + + //1.创建连接池 + config := &client.PoolConfig{ + Host: host, + Port: port, + UserName: user, + Password: password, + } + sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + + defer sessionPool.Close() + + //2.创建存储组 + setStorageGroup("root.sg1") + + //3. 创建时间序列 + createTimeseries("root.sg1.dev1.temperature") + + //4.数据写入 + insertTablet() + + //5. 数据查询 + executeQueryStatement("select temperature from root.sg1.dev1") + + //6. 删除 + deleteTimeseries("root.sg1.dev1.temperature") + deleteStorageGroup("root.sg1") + +} + +// 设置存储组 +func setStorageGroup(sg string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + session.SetStorageGroup(sg) + } +} + +// 删除存储组 +func deleteStorageGroup(sg string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.DeleteStorageGroup(sg)) + } +} + +// 创建时间序列 +func createTimeseries(path string) { + var ( + dataType = client.FLOAT + encoding = client.PLAIN + compressor = client.SNAPPY + ) + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil)) + } +} + +// 删除时间序列 +func deleteTimeseries(paths ...string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.DeleteTimeseries(paths)) + } +} + +// 插入Tablet数据 +func insertTablet() { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + if tablet, err := createTablet(12); err == nil { + status, err := session.InsertTablet(tablet, false) + tablet.Reset() + checkError(status, err) + } else { + log.Fatal(err) + } + } +} + +//创建Tablet +func createTablet(rowCount int) (*client.Tablet, error) { + tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{ + { + Measurement: "temperature", + DataType: client.FLOAT, + }, + }, rowCount) + + if err != nil { + return nil, err + } + ts := time.Now().UTC().UnixNano() / 1000000 + for row := 0; row < int(rowCount); row++ { + ts++ + tablet.SetTimestamp(ts, row) + tablet.SetValueAt(rand.Float32(), 0, row) + tablet.RowSize++ + } + return tablet, nil +} + +// 执行查询语句 +func executeQueryStatement(sql string) { + var timeout int64 = 1000 + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err != nil { + log.Print(err) + return + } + sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout) + if err == nil { + defer sessionDataSet.Close() + printDataSet(sessionDataSet) + } else { + log.Println(err) + } +} + +// 打印查询结果 +func printDataSet(sds *client.SessionDataSet) { + columnNames := sds.GetColumnNames() + for _, value := range columnNames { + fmt.Printf("%s\t", value) + } + fmt.Println() + + for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { + for _, columnName := range columnNames { + isNull, _ := sds.IsNull(columnName) + + if isNull { + fmt.Printf("%v\t\t", "null") + } else { + v, _ := sds.GetString(columnName) + fmt.Printf("%v\t\t", v) + } + } + fmt.Println() + } +} + +// 检查错误 +func checkError(status *common.TSStatus, err error) { + if err != nil { + log.Fatal(err) + } + + if status != nil { + if err = client.VerifySuccess(status); err != nil { + log.Println(err) + } + } +} +``` + +## 3. 全量接口 +### 3.1 SessionPool 管理接口 + +| 接口名称 | 功能描述 | 参数说明 | +|--------------------------------------------------------------------------------------------------------------|--------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------| +| `NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool` | 创建并返回一个Session连接池实例。 | `config`: 连接池配置
`maxSize`: 最大连接数(≤0时取CPU数\*5)
`connTimeoutMs`: TCP连接超时(ms)
`waitTimeoutMs`: 获取Session等待超时(ms)
`enableComp`: 是否启用压缩 | +| `GetSession() (Session, error)` | 从池中获取一个可用Session。若池满则阻塞等待,超时返回错误。​**必须与PutBack配对使用**​。 | 无 | +| `PutBack(session Session)` | 将使用完毕的Session归还到连接池中。 | `session`: 从GetSession获取的实例 | +| `Close()` | 关闭连接池,释放所有活跃连接。程序退出前必须调用。 | 无 | + +### 3.2 数据写入接口 + +以下接口需通过获取的 Session 进行调用 + +| 接口名称 | 功能描述 | 参数说明 | +|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | 插入单条记录。| `deviceId`: 设备ID
`measurements`: 测点列表
`dataTypes`: 数据类型列表
`values`: 值列表
`timestamp`: 时间戳 | +| `InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | 插入单条对齐记录。 | `deviceId`: 设备ID
`measurements`: 测点列表
`dataTypes`: 数据类型列表
`values`: 值列表
`timestamp`: 时间戳 | +| `InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error)` | 插入字符串格式的单条记录。 | `deviceId`: 设备ID
`measurements`: 测点列表
`values`: 字符串类型的值列表
`timestamp`: 时间戳 | +| `InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | 插入多条记录。| `deviceIds`: 设备ID列表
`measurements`:二维测点列表
`dataTypes`: 二维数据类型列表
`values`: 二维值列表
`timestamps`: 时间戳列表 | +| `InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | 插入多个对齐设备的多条记录。 | `deviceIds`: 设备ID列表
`measurements`:二维测点列表
`dataTypes`: 二维数据类型列表
`values`: 二维值列表
`timestamps`: 时间戳列表 | +| `InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | 插入单个设备的多条数据。 | `tablet`: 要插入的`Tablet`数据
`sorted`: 数据是否已排序 | +| `InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | 插入单个对齐设备的多条数据。 | `tablet`: 要插入的`Tablet`数据
`sorted`: 数据是否已排序 | +| `InsertTablets``(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | 批量插入多个 Tablet 数据。 | `tablets`: 要插入的多个`Tablet `数据
`sorted`: 数据是否已排序 | +| `InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | 批量插入多个对齐设备的数据。 | `tablets`: 要插入的多个`Tablet `数据
`sorted`: 数据是否已排序 | + +### 3.3 SQL与查询接口 + +以下接口需通过获取的 Session 进行调用 + +| 接口名称 | 功能描述 | 参数说明 | +|----------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------------| +| `ExecuteStatement(sql string)`*`(`*`SessionDataSet, error)` | 执行SQL(主要查询),返回`SessionDataSet`。 | `sql`:要执行的SQL查询语句 | +| `ExecuteQueryStatement(sql string, timeoutMs int64) (SessionDataSet, error)` | 执行查询SQL,可指定超时,返回`SessionDataSet`。 | `sql`:要执行的SQL查询语句
`timeoutMs`: 查询超时时间(毫秒) | +| `ExecuteNonQueryStatementExecuteNonQueryStatement(sql string) (r common.TSStatus, err error)` | 执行不返回结果集的SQL(如INSERT, CREATE, DELETE)。 | `sql`:要执行的SQL语句 | +| `ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error)` | 查询指定时间序列在时间范围内的原始数据。 | `paths`: 查询路径列表
`startTime`: 起始时间戳
`endTime`: 结束时间戳 | +| `ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error)` | 执行聚合查询(COUNT, AVG等)。 | `paths`: 查询路径列表
`aggregations`: 聚合类型列表
`startTime, endTime, interval`: 起始时间、结束时间和间隔时间
`timeoutMs`: 查询超时时间 | +| `ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error)` | 批量执行多条SQL语句。 | `sqls`:要执行的SQL语句 | + +### 3.4 元数据操作接口 + +以下接口需通过获取的 Session 进行调用 + +| 接口名称 | 功能描述 | 参数说明 | +| -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| `SetStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | 创建数据库(存储组)。 | `storageGroupId`:数据库(存储组)名称 | +| `DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | 删除一个数据库(存储组)。 | `storageGroupId`:要删除的数据库(存储组)名称 | +| `DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error)` | 删除多个数据库(存储组)。 | `storageGroupIds`:要删除的数据库(存储组)名称列表 | +| `CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error)` | 创建非对齐时间序列。 | `path`: 时间序列路径
`dataType`: 数据类型
`encoding`: 编码方式
`compressor`: 压缩算法
`attributes`: (可选)序列属性
`tags`: (可选)序列标签 | +| `CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error)` | 创建一组对齐时间序列。 | `prefixPath`: 时间序列路径前缀
`measurements`: 测点名称列表
`dataTypes, encodings, compressors`: 每个测点对应的数据类型、编码和压缩算法列表
`measurementAlias`: (可选)每个测点的别名列表 | +| `DeleteTimeseries(paths []string) (r common.TSStatus, err error)` | 删除多条时间序列(含数据)。 | `paths`:要删除的时间序列路径列表 | +| `DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error)` | 删除指定时间序列在时间段内的数据(保留元数据)。 | `paths`: 要删除的时间序列路径列表
`startTime`: 起始时间戳
`endTime`: 结束时间戳。 | +| `SetTimeZone(timeZone string) (r common.TSStatus, err error)` | 设置当前会话时区。 | `timeZone`: 时区字符串,例如 ”UTC”, ”Asia/Shanghai”, ”GMT+8” | +| `GetTimeZone() (string, error)` | 获取当前会话时区。 | 无 | + +### 3.5 关键配置结构 (PoolConfig) + +| 字段 | 类型 | 必填 | 描述 | +| ----------------- | ---------------- | ------------------- | ----------------------------------------------------------- | +| `Host` | `string` | 与NodeUrls二选一 | 单节点主机地址。 | +| `Port` | `string` | 与NodeUrls二选一 | 单节点端口。 | +| `NodeUrls` | `[]string` | 与Host/Port二选一 | 集群节点地址列表,格式为`”host:port”`。 | +| `UserName` | `string` | 是 | 用户名。 | +| `Password` | `string` | 是 | 密码。 | +| `FetchSize` | `int32` | 否 | 查询结果集获取大小,默认1024。 | +| `TimeZone` | `string` | 否 | 会话时区,如`”Asia/Shanghai”`,默认使用服务端时区。 | +| `Database` | `string` | 否 | 表模型适用,用于设置会话默认数据库。 | diff --git a/src/zh/UserGuide/latest/API/Programming-Go-Native-API.md b/src/zh/UserGuide/latest/API/Programming-Go-Native-API.md index 99f8d748c..fe9e49b9b 100644 --- a/src/zh/UserGuide/latest/API/Programming-Go-Native-API.md +++ b/src/zh/UserGuide/latest/API/Programming-Go-Native-API.md @@ -21,20 +21,25 @@ # Go 原生接口 -## 1. 依赖 +Go 原生 API 支持通过 `Session `和 `SessionPool `两种方式与数据库进行交互。由于 `Session `非线程安全,因此强烈推荐使用 `SessionPool `编程。在多线程并发的情形下,`SessionPool `能够合理地管理和分配连接资源,以提升系统性能与资源利用效率。 - * golang >= 1.13 - * make >= 3.0 - * curl >= 7.1.1 - * thrift 0.15.0 - * Linux、Macos 或其他类 unix 系统 - * Windows+bash (下载 IoTDB Go client 需要 git ,通过 WSL、cygwin、Git Bash 任意一种方式均可) +本文将围绕 `SessionPool` 的使用进行说明,涵盖从环境准备、核心操作步骤到全量接口的完整内容。 -## 2. 安装方法 +## 1. 环境准备 +### 1.1 前置依赖 - * 通过 go mod +* golang >= 1.13 +* make >= 3.0 +* curl >= 7.1.1 +* thrift: 0.15.0 +* Linux、Macos 或其他类 unix 系统 +* Windows+bash (下载 IoTDB Go client 需要 git ,通过 WSL、cygwin、Git Bash 任意一种方式均可) -```sh +### 1.2 安装方法 + +* 使用 go mod + +```Bash # 切换到 GOPATH 的 HOME 路径,启用 Go Modules 功能 export GO111MODULE=on @@ -57,9 +62,9 @@ go mod tidy go run session_example.go ``` -* 通过 GOPATH +* 使用 GOPATH -```sh +```Bash # get thrift 0.13.0 go get github.com/apache/thrift@0.13.0 @@ -81,4 +86,330 @@ go mod tidy # 编译并运行程序 go run session_example.go ``` -**注意:GO原生客户端Session不是线程安全的,强烈不建议在多线程场景下应用。如有多线程应用场景,请使用Session Pool.** + +## 2. 核心步骤 + +使用 Go 原生接口操作 IoTDB 的三个核心步骤如下: + +1. 创建连接池实例:初始化一个`SessionPool`对象,配置连接参数和池大小。 +2. 执行数据库操作:从连接池中`GetSession()`,执行数据写入或查询等操作,完成后必须`PutBack(session)`。 +3. 关闭连接池资源:程序结束时调用`sessionPool.Close()`,释放所有连接。 + +下面的章节用于说明开发的核心流程,并未演示所有的参数和接口,如需了解全部功能及参数请参见: [全量接口说明](../API/Programming-Go-Native-API.md#_3-全量接口) 或 查阅: [SessionPool 示例源码](https://github.com/apache/iotdb-client-go/blob/main/example/session_pool/session_pool_example.go) + +### 2.1 创建连接池实例 + +* 单实例 + +```Go +config := &client.PoolConfig{ + Host: host, + Port: port, + UserName: user, + Password: password, +} +sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) +defer​ ​sessionPool.Close() +``` + +* 分布式或双活 + +```Go +config := &client.PoolConfig{ + UserName: user, + Password: password, + NodeUrls: strings.Split("127.0.0.1:6667,127.0.0.1:6668", ","), + } +sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) +defer​ ​sessionPool.Close() +``` + +### 2.2 数据库操作 +#### 2.2.1 数据写入 + +```Go +session, err := sessionPool.GetSession() +defer sessionPool.PutBack(session) +status, err := session.InsertTablet(tablet, false) +tablet.Reset() +checkError(status, err) +``` + +#### 2.2.2 数据查询 + +```Go +var timeout int64 = 1000 +session, err := sessionPool.GetSession() +defer sessionPool.PutBack(session) +if err != nil { + log.Print(err) + return +} +sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout) +if err == nil { + defer sessionDataSet.Close() + printDataSet(sessionDataSet) +} else { + log.Println(err) +} +``` + +### 2.3 使用示例 + +```Go +import ( + "flag" + "fmt" + "log" + "math/rand" + "strings" + "time" + + "github.com/apache/iotdb-client-go/v2/client" + "github.com/apache/iotdb-client-go/v2/common" +) + +var ( + host string + port string + user string + password string +) +var sessionPool client.SessionPool + +func main() { + flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100") + flag.StringVar(&port, "port", "6667", "--port=6667") + flag.StringVar(&user, "user", "root", "--user=root") + flag.StringVar(&password, "password", "root", "--password=root") + flag.Parse() + + //1.创建连接池 + config := &client.PoolConfig{ + Host: host, + Port: port, + UserName: user, + Password: password, + } + sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false) + + defer sessionPool.Close() + + //2.创建存储组 + setStorageGroup("root.sg1") + + //3. 创建时间序列 + createTimeseries("root.sg1.dev1.temperature") + + //4.数据写入 + insertTablet() + + //5. 数据查询 + executeQueryStatement("select temperature from root.sg1.dev1") + + //6. 删除 + deleteTimeseries("root.sg1.dev1.temperature") + deleteStorageGroup("root.sg1") + +} + +// 设置存储组 +func setStorageGroup(sg string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + session.SetStorageGroup(sg) + } +} + +// 删除存储组 +func deleteStorageGroup(sg string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.DeleteStorageGroup(sg)) + } +} + +// 创建时间序列 +func createTimeseries(path string) { + var ( + dataType = client.FLOAT + encoding = client.PLAIN + compressor = client.SNAPPY + ) + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil)) + } +} + +// 删除时间序列 +func deleteTimeseries(paths ...string) { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + checkError(session.DeleteTimeseries(paths)) + } +} + +// 插入Tablet数据 +func insertTablet() { + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err == nil { + if tablet, err := createTablet(12); err == nil { + status, err := session.InsertTablet(tablet, false) + tablet.Reset() + checkError(status, err) + } else { + log.Fatal(err) + } + } +} + +//创建Tablet +func createTablet(rowCount int) (*client.Tablet, error) { + tablet, err := client.NewTablet("root.sg1.dev1", []*client.MeasurementSchema{ + { + Measurement: "temperature", + DataType: client.FLOAT, + }, + }, rowCount) + + if err != nil { + return nil, err + } + ts := time.Now().UTC().UnixNano() / 1000000 + for row := 0; row < int(rowCount); row++ { + ts++ + tablet.SetTimestamp(ts, row) + tablet.SetValueAt(rand.Float32(), 0, row) + tablet.RowSize++ + } + return tablet, nil +} + +// 执行查询语句 +func executeQueryStatement(sql string) { + var timeout int64 = 1000 + session, err := sessionPool.GetSession() + defer sessionPool.PutBack(session) + if err != nil { + log.Print(err) + return + } + sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout) + if err == nil { + defer sessionDataSet.Close() + printDataSet(sessionDataSet) + } else { + log.Println(err) + } +} + +// 打印查询结果 +func printDataSet(sds *client.SessionDataSet) { + columnNames := sds.GetColumnNames() + for _, value := range columnNames { + fmt.Printf("%s\t", value) + } + fmt.Println() + + for next, err := sds.Next(); err == nil && next; next, err = sds.Next() { + for _, columnName := range columnNames { + isNull, _ := sds.IsNull(columnName) + + if isNull { + fmt.Printf("%v\t\t", "null") + } else { + v, _ := sds.GetString(columnName) + fmt.Printf("%v\t\t", v) + } + } + fmt.Println() + } +} + +// 检查错误 +func checkError(status *common.TSStatus, err error) { + if err != nil { + log.Fatal(err) + } + + if status != nil { + if err = client.VerifySuccess(status); err != nil { + log.Println(err) + } + } +} +``` + +## 3. 全量接口 +### 3.1 SessionPool 管理接口 + +| 接口名称 | 功能描述 | 参数说明 | +|--------------------------------------------------------------------------------------------------------------|--------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------| +| `NewSessionPool(config *PoolConfig, maxSize, connTimeoutMs, waitTimeoutMs int, enableComp bool) SessionPool` | 创建并返回一个Session连接池实例。 | `config`: 连接池配置
`maxSize`: 最大连接数(≤0时取CPU数\*5)
`connTimeoutMs`: TCP连接超时(ms)
`waitTimeoutMs`: 获取Session等待超时(ms)
`enableComp`: 是否启用压缩 | +| `GetSession() (Session, error)` | 从池中获取一个可用Session。若池满则阻塞等待,超时返回错误。​**必须与PutBack配对使用**​。 | 无 | +| `PutBack(session Session)` | 将使用完毕的Session归还到连接池中。 | `session`: 从GetSession获取的实例 | +| `Close()` | 关闭连接池,释放所有活跃连接。程序退出前必须调用。 | 无 | + +### 3.2 数据写入接口 + +以下接口需通过获取的 Session 进行调用 + +| 接口名称 | 功能描述 | 参数说明 | +|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------------------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `InsertRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | 插入单条记录。| `deviceId`: 设备ID
`measurements`: 测点列表
`dataTypes`: 数据类型列表
`values`: 值列表
`timestamp`: 时间戳 | +| `InsertAlignedRecord(deviceId string, measurements []string, dataTypes []TSDataType, values []interface{}, timestamp int64) (r common.TSStatus, err error)` | 插入单条对齐记录。 | `deviceId`: 设备ID
`measurements`: 测点列表
`dataTypes`: 数据类型列表
`values`: 值列表
`timestamp`: 时间戳 | +| `InsertStringRecord(deviceId string, measurements []string, values []string, timestamp int64) (r common.TSStatus, err error)` | 插入字符串格式的单条记录。 | `deviceId`: 设备ID
`measurements`: 测点列表
`values`: 字符串类型的值列表
`timestamp`: 时间戳 | +| `InsertRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | 插入多条记录。| `deviceIds`: 设备ID列表
`measurements`:二维测点列表
`dataTypes`: 二维数据类型列表
`values`: 二维值列表
`timestamps`: 时间戳列表 | +| `InsertAlignedRecords(deviceIds []string, measurements [][]string, dataTypes [][]TSDataType, values [][]interface{}, timestamps []int64) (r common.TSStatus, err error)` | 插入多个对齐设备的多条记录。 | `deviceIds`: 设备ID列表
`measurements`:二维测点列表
`dataTypes`: 二维数据类型列表
`values`: 二维值列表
`timestamps`: 时间戳列表 | +| `InsertTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | 插入单个设备的多条数据。 | `tablet`: 要插入的`Tablet`数据
`sorted`: 数据是否已排序 | +| `InsertAlignedTablet(tablet Tablet, sorted bool) (r common.TSStatus, err error)` | 插入单个对齐设备的多条数据。 | `tablet`: 要插入的`Tablet`数据
`sorted`: 数据是否已排序 | +| `InsertTablets``(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | 批量插入多个 Tablet 数据。 | `tablets`: 要插入的多个`Tablet `数据
`sorted`: 数据是否已排序 | +| `InsertAlignedTablets(tablets []Tablet, sorted bool) (r common.TSStatus, err error)` | 批量插入多个对齐设备的数据。 | `tablets`: 要插入的多个`Tablet `数据
`sorted`: 数据是否已排序 | + +### 3.3 SQL与查询接口 + +以下接口需通过获取的 Session 进行调用 + +| 接口名称 | 功能描述 | 参数说明 | +|----------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------------| +| `ExecuteStatement(sql string)`*`(`*`SessionDataSet, error)` | 执行SQL(主要查询),返回`SessionDataSet`。 | `sql`:要执行的SQL查询语句 | +| `ExecuteQueryStatement(sql string, timeoutMs int64) (SessionDataSet, error)` | 执行查询SQL,可指定超时,返回`SessionDataSet`。 | `sql`:要执行的SQL查询语句
`timeoutMs`: 查询超时时间(毫秒) | +| `ExecuteNonQueryStatementExecuteNonQueryStatement(sql string) (r common.TSStatus, err error)` | 执行不返回结果集的SQL(如INSERT, CREATE, DELETE)。 | `sql`:要执行的SQL语句 | +| `ExecuteRawDataQuery(paths []string, startTime int64, endTime int64) (*SessionDataSet, error)` | 查询指定时间序列在时间范围内的原始数据。 | `paths`: 查询路径列表
`startTime`: 起始时间戳
`endTime`: 结束时间戳 | +| `ExecuteAggregationQuery(paths []string, aggregations []common.TAggregationType, startTime, endTime, interval, timeoutMs int64) (SessionDataSet, error)` | 执行聚合查询(COUNT, AVG等)。 | `paths`: 查询路径列表
`aggregations`: 聚合类型列表
`startTime, endTime, interval`: 起始时间、结束时间和间隔时间
`timeoutMs`: 查询超时时间 | +| `ExecuteBatchStatement(sqls []string) (r common.TSStatus, err error)` | 批量执行多条SQL语句。 | `sqls`:要执行的SQL语句 | + +### 3.4 元数据操作接口 + +以下接口需通过获取的 Session 进行调用 + +| 接口名称 | 功能描述 | 参数说明 | +| -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| `SetStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | 创建数据库(存储组)。 | `storageGroupId`:数据库(存储组)名称 | +| `DeleteStorageGroup(storageGroupId string) (r common.TSStatus, err error)` | 删除一个数据库(存储组)。 | `storageGroupId`:要删除的数据库(存储组)名称 | +| `DeleteStorageGroups(storageGroupIds ...string) (r common.TSStatus, err error)` | 删除多个数据库(存储组)。 | `storageGroupIds`:要删除的数据库(存储组)名称列表 | +| `CreateTimeseries(path string, dataType TSDataType, encoding TSEncoding, compressor TSCompressionType, attributes map[string]string, tags map[string]string) (r common.TSStatus, err error)` | 创建非对齐时间序列。 | `path`: 时间序列路径
`dataType`: 数据类型
`encoding`: 编码方式
`compressor`: 压缩算法
`attributes`: (可选)序列属性
`tags`: (可选)序列标签 | +| `CreateAlignedTimeseries(prefixPath string, measurements []string, dataTypes []TSDataType, encodings []TSEncoding, compressors []TSCompressionType, measurementAlias []string) (r common.TSStatus, err error)` | 创建一组对齐时间序列。 | `prefixPath`: 时间序列路径前缀
`measurements`: 测点名称列表
`dataTypes, encodings, compressors`: 每个测点对应的数据类型、编码和压缩算法列表
`measurementAlias`: (可选)每个测点的别名列表 | +| `DeleteTimeseries(paths []string) (r common.TSStatus, err error)` | 删除多条时间序列(含数据)。 | `paths`:要删除的时间序列路径列表 | +| `DeleteData(paths []string, startTime int64, endTime int64) (r common.TSStatus, err error)` | 删除指定时间序列在时间段内的数据(保留元数据)。 | `paths`: 要删除的时间序列路径列表
`startTime`: 起始时间戳
`endTime`: 结束时间戳。 | +| `SetTimeZone(timeZone string) (r common.TSStatus, err error)` | 设置当前会话时区。 | `timeZone`: 时区字符串,例如 ”UTC”, ”Asia/Shanghai”, ”GMT+8” | +| `GetTimeZone() (string, error)` | 获取当前会话时区。 | 无 | + +### 3.5 关键配置结构 (PoolConfig) + +| 字段 | 类型 | 必填 | 描述 | +| ----------------- | ---------------- | ------------------- | ----------------------------------------------------------- | +| `Host` | `string` | 与NodeUrls二选一 | 单节点主机地址。 | +| `Port` | `string` | 与NodeUrls二选一 | 单节点端口。 | +| `NodeUrls` | `[]string` | 与Host/Port二选一 | 集群节点地址列表,格式为`”host:port”`。 | +| `UserName` | `string` | 是 | 用户名。 | +| `Password` | `string` | 是 | 密码。 | +| `FetchSize` | `int32` | 否 | 查询结果集获取大小,默认1024。 | +| `TimeZone` | `string` | 否 | 会话时区,如`”Asia/Shanghai”`,默认使用服务端时区。 | +| `Database` | `string` | 否 | 表模型适用,用于设置会话默认数据库。 |