Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 94 additions & 13 deletions api/seqproxyapi/v1/seq_proxy_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ service SeqProxyApi {
// The server processes the request in the background and returns a search ID.
rpc StartAsyncSearch(StartAsyncSearchRequest) returns (StartAsyncSearchResponse) {
option (google.api.http) = {
post: "/async-search"
post: "/async-searches"
body: "*"
};
}
Expand All @@ -85,15 +85,30 @@ service SeqProxyApi {
// Clients should use the search ID returned by StartAsyncSearch.
rpc FetchAsyncSearchResult(FetchAsyncSearchResultRequest) returns (FetchAsyncSearchResultResponse) {
option (google.api.http) = {
get: "/async-search/{search_id}"
post: "/async-searches/fetch"
body: "*"
};
}

// Cancels an ongoing asynchronous search operation if it hasn't completed yet.
// Useful for freeing up resources if the result is no longer needed.
rpc CancelAsyncSearch(CancelAsyncSearchRequest) returns (CancelAsyncSearchResponse) {
option (google.api.http) = {
delete: "/async-search/{search_id}"
post: "/async-searches/{search_id}/cancel"
};
}

// Frees up resources if the result is no longer needed.
rpc DeleteAsyncSearch(DeleteAsyncSearchRequest) returns (DeleteAsyncSearchResponse) {
option (google.api.http) = {
delete: "/async-searches/{search_id}"
};
}

// Fetch list of async searches.
rpc GetAsyncSearchesList(GetAsyncSearchesListRequest) returns (GetAsyncSearchesListResponse) {
option (google.api.http) = {
post: "/async-searches/list"
body: "*"
};
}
}
Expand Down Expand Up @@ -229,28 +244,61 @@ message ComplexSearchResponse {
}

message StartAsyncSearchRequest {
// Duration to retain the result of an asynchronous query.
// After this period, the result will be deleted.
google.protobuf.Duration retention = 1;
SearchQuery query = 2; // Search query.
repeated AggQuery aggs = 3; // List of aggregation queries.
optional HistQuery hist = 4; // Histogram query.
Order order = 5; // Document order ORDER_DESC/ORDER_ASC.
// Search query to execute.
SearchQuery query = 2;
// List of aggregation queries.
repeated AggQuery aggs = 3;
// Optional histogram query.
optional HistQuery hist = 4;
// Set this to true to enable document retrieval via FetchAsyncSearch.
// Note: enabling this may significantly increase disk space usage.
bool with_docs = 5;
}

message StartAsyncSearchResponse {
// Unique ID used to retrieve search results with FetchAsyncSearchResult.
string search_id = 1;
}

message FetchAsyncSearchResultRequest {
string search_id = 1;
bool with_docs = 2;
int32 size = 3;
int32 offset = 4;
// Maximum number of documents to fetch (pagination).
// Ignored if with_docs was set to false, since documents are not stored in that case.
int32 size = 2;
// Document offset (pagination).
// Ignored if with_docs was set to false, since documents are not stored in that case.
int32 offset = 3;
// Documents sort order.
Order order = 4;
}
Comment thread
forshev marked this conversation as resolved.

enum AsyncSearchStatus {
Comment thread
forshev marked this conversation as resolved.
// The asynchronous search is still in progress.
// See the 'progress' field for completion percentage.
AsyncSearchStatusInProgress = 0;
// The asynchronous search completed successfully.
AsyncSearchStatusDone = 1;
// The asynchronous search was canceled, possibly via the CancelAsyncSearch handler.
AsyncSearchStatusCanceled = 2;
// The asynchronous search encountered errors in some shards.
// See ComplexSearchResponse.Error for details.
AsyncSearchStatusError = 3;
}

message FetchAsyncSearchResultResponse {
bool done = 1;
google.protobuf.Timestamp expiration = 2;
AsyncSearchStatus status = 1;
StartAsyncSearchRequest request = 2;
ComplexSearchResponse response = 3;
google.protobuf.Timestamp started_at = 4;
google.protobuf.Timestamp expires_at = 5;
optional google.protobuf.Timestamp canceled_at = 6;
// Search progress in range [0, 1].
double progress = 7;
// The size of data stored on disk, in bytes.
uint64 disk_usage = 8;
}

message CancelAsyncSearchRequest{
Expand All @@ -259,6 +307,39 @@ message CancelAsyncSearchRequest{

message CancelAsyncSearchResponse{}

message DeleteAsyncSearchRequest {
string search_id = 1;
}

message DeleteAsyncSearchResponse {}

message GetAsyncSearchesListRequest {
optional AsyncSearchStatus status = 1;
// Maximum number of searches to fetch (pagination).
int32 size = 2;
// Searches offset (pagination).
int32 offset = 3;
// list of async search ids to filter out result
repeated string ids = 4;
}

message GetAsyncSearchesListResponse {
repeated AsyncSearchesListItem searches = 1;
}

message AsyncSearchesListItem {
string search_id = 1;
AsyncSearchStatus status = 2;
StartAsyncSearchRequest request = 3;
google.protobuf.Timestamp started_at = 4;
google.protobuf.Timestamp expires_at = 5;
optional google.protobuf.Timestamp canceled_at = 6;
// Search progress in range [0, 1].
double progress = 7;
// The size of data stored on disk, in bytes.
uint64 disk_usage = 8;
}

message GetAggregationRequest {
SearchQuery query = 1; // Search query.
repeated AggQuery aggs = 2; // List of aggregation queries.
Expand Down
95 changes: 79 additions & 16 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ service StoreApi {

rpc FetchAsyncSearchResult(FetchAsyncSearchResultRequest) returns (FetchAsyncSearchResultResponse) {}

rpc CancelAsyncSearch(CancelAsyncSearchRequest) returns (CancelAsyncSearchResponse) {}

rpc DeleteAsyncSearch(DeleteAsyncSearchRequest) returns (DeleteAsyncSearchResponse) {}

rpc GetAsyncSearchesList(GetAsyncSearchesListRequest) returns (GetAsyncSearchesListResponse) {}

rpc Fetch(FetchRequest) returns (stream BinaryData) {}

rpc Status(StatusRequest) returns (StatusResponse) {}
Expand Down Expand Up @@ -135,33 +141,90 @@ enum SearchErrorCode {

message StartAsyncSearchRequest {
string search_id = 1;

string query = 2;
int64 from = 3;
int64 to = 4;
repeated AggQuery aggs = 5;
int64 histogram_interval = 6;
Order order = 7;
google.protobuf.Duration retention = 2;
string query = 3;
int64 from = 4;
int64 to = 5;
repeated AggQuery aggs = 6;
int64 histogram_interval = 7;
bool with_docs = 8;
}
Comment thread
forshev marked this conversation as resolved.

message StartAsyncSearchResponse {}

message FetchAsyncSearchResultRequest {
string search_id = 1;
bool with_docs = 2;
int32 size = 3;
int32 offset = 4;
int32 size = 2;
int32 offset = 3;
Order order = 4;
}

message FetchAsyncSearchResultResponse {
bool done = 1;
enum AsyncSearchStatus {
Comment thread
forshev marked this conversation as resolved.
AsyncSearchStatusInProgress = 0;
AsyncSearchStatusDone = 1;
AsyncSearchStatusCanceled = 2;
AsyncSearchStatusError = 3;
}

message FetchAsyncSearchResultResponse {
AsyncSearchStatus status = 1;
SearchResponse response = 2;
google.protobuf.Timestamp expiration = 3;
google.protobuf.Timestamp started_at = 3;
google.protobuf.Timestamp expires_at = 4;
optional google.protobuf.Timestamp canceled_at = 5;
uint64 fracs_done = 6;
uint64 fracs_queue = 7;
uint64 disk_usage = 8;

repeated AggQuery aggs = 9;
int64 histogram_interval = 10;

string query = 11;
google.protobuf.Timestamp from = 12;
google.protobuf.Timestamp to = 13;
google.protobuf.Duration retention = 14;
bool with_docs = 15;
}
Comment thread
forshev marked this conversation as resolved.

message CancelAsyncSearchRequest{
string search_id = 1;
}

message CancelAsyncSearchResponse{}

repeated AggQuery aggs = 5;
int64 histogram_interval = 6;
Order order = 7;
message DeleteAsyncSearchRequest {
string search_id = 1;
}

message DeleteAsyncSearchResponse {}

message GetAsyncSearchesListRequest {
optional AsyncSearchStatus status = 1;
repeated string ids = 2;
}

message GetAsyncSearchesListResponse {
repeated AsyncSearchesListItem searches = 1;
}

message AsyncSearchesListItem {
string search_id = 1;
AsyncSearchStatus status = 2;
google.protobuf.Timestamp started_at = 3;
google.protobuf.Timestamp expires_at = 4;
optional google.protobuf.Timestamp canceled_at = 5;
uint64 fracs_done = 6;
uint64 fracs_queue = 7;
uint64 disk_usage = 8;

repeated AggQuery aggs = 9;
int64 histogram_interval = 10;

string query = 11;
google.protobuf.Timestamp from = 12;
google.protobuf.Timestamp to = 13;
google.protobuf.Duration retention = 14;
bool with_docs = 15;
}

message IdWithHint {
Expand Down
6 changes: 4 additions & 2 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,10 @@ func startStore(
RequestsLimit: uint64(cfg.Limits.SearchRequests),
LogThreshold: cfg.SlowLogs.SearchThreshold,
Async: fracmanager.AsyncSearcherConfig{
DataDir: cfg.AsyncSearch.DataDir,
Parallelism: cfg.AsyncSearch.Concurrency,
DataDir: cfg.AsyncSearch.DataDir,
Workers: cfg.AsyncSearch.Concurrency,
MaxSize: int(cfg.AsyncSearch.MaxTotalSize),
MaxSizePerRequest: int(cfg.AsyncSearch.MaxSizePerRequest),
},
},
Fetch: storeapi.FetchConfig{
Expand Down
6 changes: 4 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,10 @@ type Config struct {
AsyncSearch struct {
// DataDir specifies directory that contains data for asynchronous searches.
// By default will be subdirectory in [Config.Storage.DataDir].
DataDir string `config:"data_dir"`
Concurrency int `config:"concurrency"`
DataDir string `config:"data_dir"`
Concurrency int `config:"concurrency"`
MaxTotalSize Bytes `config:"max_total_size" default:"1GiB"`
MaxSizePerRequest Bytes `config:"max_size_per_request" default:"100MiB"`
} `config:"async_search"`

API struct {
Expand Down
2 changes: 1 addition & 1 deletion frac/meta_data_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (m *MetaData) MarshalBinaryTo(b []byte) []byte {
b = binary.LittleEndian.AppendUint64(b, uint64(m.ID.MID))
b = binary.LittleEndian.AppendUint64(b, uint64(m.ID.RID))

// Encode Size.
// Encode BlockLength.
b = binary.LittleEndian.AppendUint32(b, m.Size)

// Encode tokens.
Expand Down
2 changes: 1 addition & 1 deletion frac/processor/search_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type AggQuery struct {
}

type SearchParams struct {
AST *parser.ASTNode
AST *parser.ASTNode `json:"-"`

AggQ []AggQuery
HistInterval uint64
Expand Down
Loading
Loading