diff --git a/bql/grammar/grammar.go b/bql/grammar/grammar.go index ec85d8d7..fa71d5d4 100644 --- a/bql/grammar/grammar.go +++ b/bql/grammar/grammar.go @@ -327,6 +327,7 @@ func moreClauses() []*Clause { Elements: []Element{ NewTokenType(lexer.ItemDot), NewSymbol("CLAUSES"), + NewSymbol("FILTER_CLAUSES"), }, }, {}, @@ -365,6 +366,33 @@ func clauses() []*Clause { } } +func moreFilterClauses() []*Clause { + return []*Clause{ + { + Elements: []Element{ + NewTokenType(lexer.ItemDot), + NewSymbol("FILTER_CLAUSES"), + }, + }, + {}, + } +} +func filterClauses() []*Clause { + return []*Clause{ + { + Elements: []Element{ + NewTokenType(lexer.ItemFilter), + NewTokenType(lexer.ItemFilterFunction), + NewTokenType(lexer.ItemLPar), + NewTokenType(lexer.ItemBinding), + NewTokenType(lexer.ItemRPar), + NewSymbol("MORE_FILTER_CLAUSES"), + }, + }, + {}, + } +} + func optionalClauses() []*Clause { return []*Clause{ { @@ -1266,6 +1294,8 @@ func BQL() *Grammar { "MORE_CLAUSES": moreClauses(), "CLAUSES": clauses(), "OPTIONAL_CLAUSE": optionalClauses(), + "FILTER_CLAUSES": filterClauses(), + "MORE_FILTER_CLAUSES": moreFilterClauses(), "SUBJECT_EXTRACT": subjectExtractClauses(), "SUBJECT_TYPE": subjectTypeClauses(), "SUBJECT_ID": subjectIDClauses(), @@ -1400,6 +1430,12 @@ func SemanticBQL() *Grammar { } setElementHook(semanticBQL, objSymbols, semantic.WhereObjectClauseHook(), nil) + // Filter clause hook. + filterSymbols := []semantic.Symbol{ + "FILTER_CLAUSES", + } + setElementHook(semanticBQL, filterSymbols, semantic.WhereFilterClauseHook(), nil) + // Collect binding variables variables. varSymbols := []semantic.Symbol{ "VARS", "VARS_AS", "MORE_VARS", "COUNT_DISTINCT", diff --git a/bql/grammar/grammar_test.go b/bql/grammar/grammar_test.go index 53f761c6..614bfc67 100644 --- a/bql/grammar/grammar_test.go +++ b/bql/grammar/grammar_test.go @@ -194,6 +194,20 @@ func TestAcceptByParse(t *testing.T) { ?n "_object"@[] ?o};`, // Show the graphs. `show graphs;`, + // Test FILTER clause inside WHERE. + `select ?a + from ?b + where { + ?s ?p ?o . + FILTER latest(?p) + };`, + `select ?a + from ?b + where { + ?s ?p ?o . + FILTER latest(?p) . + FILTER latest(?o) + };`, // Test optional trailing dot after the last clause inside WHERE. `select ?a from ?b @@ -229,6 +243,19 @@ func TestAcceptByParse(t *testing.T) { ?s ?p ?o . /u ?p ?o };`, + `select ?a + from ?b + where { + ?s ?p ?o . + FILTER latest(?p) . + };`, + `select ?a + from ?b + where { + ?s ?p ?o . + FILTER latest(?p) . + FILTER latest(?o) . + };`, } p, err := NewParser(BQL()) if err != nil { @@ -386,6 +413,46 @@ func TestRejectByParse(t *testing.T) { where {?n "_subject"@[] ?s. ?n "_predicate"@[] ?p. ?n "_object"@[] ?o};`, + // Test invalid FILTER clause inside WHERE. + `select ?a + from ?b + where { + ?s ?p ?o . + FILTER latest ?p + };`, + `select ?a + from ?b + where { + ?s ?p ?o . + FILTER latest (?p) + };`, + `select ?a + from ?b + where { + FILTER latest(?p) . + ?s ?p ?o + };`, + `select ?a + from ?b + where { + ?s ?p ?o . + FILTER latest(?p) . + /u ?p ?o + };`, + `select ?a + from ?b + where { + ?s ?p ?o . + ?s ?p ?o . + FILTER latest(?p) . + /u ?p ?o + };`, + `select ?a + from ?b + where { + ?s ?p ?o . + FILTER late^st(?p) + };`, // Test invalid trailing dot use inside WHERE. `select ?a from ?b @@ -411,6 +478,19 @@ func TestRejectByParse(t *testing.T) { ?s ?p ?o /u ?p ?o };`, + `select ?a + from ?b + where { + ?s ?p ?o + FILTER latest(?p) + };`, + `select ?a + from ?b + where { + ?s ?p ?o . + FILTER latest(?p) + FILTER latest(?o) + };`, } p, err := NewParser(BQL()) if err != nil { @@ -565,6 +645,13 @@ func TestAcceptQueryBySemanticParse(t *testing.T) { `select ?s from ?g where{/_ as ?s ?p "id"@[?foo, ?bar] as ?o} order by ?s;`, `select ?s as ?a, ?o as ?b, ?o as ?c from ?g where{?s ?p ?o} order by ?a ASC, ?b DESC;`, `select ?s as ?a, ?o as ?b, ?o as ?c from ?g where{?s ?p ?o} order by ?a ASC, ?b DESC, ?a ASC, ?b DESC, ?c;`, + // Test valid FILTER clause for grammar with hooks. + `select ?p + from ?b + where { + ?s ?p ?o . + FILTER latest(?p) + };`, } p, err := NewParser(SemanticBQL()) if err != nil { @@ -596,6 +683,13 @@ func TestRejectByParseAndSemantic(t *testing.T) { `select ?s as ?a, ?o as ?b, ?o as ?c from ?g where{?s ?p ?o} order by ?a ASC, ?a DESC;`, // Wrong limit literal. `select ?s as ?a, ?o as ?b, ?o as ?c from ?g where{?s ?p ?o} LIMIT "true"^^type:bool;`, + // Reject not supported FILTER function. + `select ?p, ?o + from ?test + where { + /u ?p ?o . + FILTER notSupportedFilterFunction(?p) + };`, } p, err := NewParser(SemanticBQL()) if err != nil { diff --git a/bql/lexer/lexer.go b/bql/lexer/lexer.go index 0c45a69b..1cbd5570 100644 --- a/bql/lexer/lexer.go +++ b/bql/lexer/lexer.go @@ -139,6 +139,10 @@ const ( ItemGraphs // ItemOptional identifies optional graph pattern clauses. ItemOptional + // ItemFilter represents the filter keyword in BQL. + ItemFilter + // ItemFilterFunction represents a filter function in BQL. + ItemFilterFunction ) func (tt TokenType) String() string { @@ -253,6 +257,10 @@ func (tt TokenType) String() string { return "GRAPHS" case ItemOptional: return "OPTIONAL" + case ItemFilter: + return "FILTER" + case ItemFilterFunction: + return "FILTER_FUNCTION" default: return "UNKNOWN" } @@ -294,6 +302,7 @@ const ( from = "from" where = "where" optional = "optional" + filter = "filter" as = "as" before = "before" after = "after" @@ -402,6 +411,9 @@ func lexToken(l *lexer) stateFn { return lexPredicateOrLiteral } if unicode.IsLetter(r) { + if l.lastTokenType == ItemFilter { + return lexFilterFunction + } return lexKeyword } } @@ -617,6 +629,10 @@ func lexKeyword(l *lexer) stateFn { consumeKeyword(l, ItemOptional) return lexSpace } + if strings.EqualFold(input, filter) { + consumeKeyword(l, ItemFilter) + return lexSpace + } if strings.EqualFold(input, typeKeyword) { consumeKeyword(l, ItemType) return lexSpace @@ -648,6 +664,25 @@ func lexKeyword(l *lexer) stateFn { return nil } +// lexFilterFunction lexes a filter function out of the input (used in FILTER clauses). +func lexFilterFunction(l *lexer) stateFn { + l.next() + var nr rune + for { + nr = l.next() + if nr == leftPar { + l.backup() + break + } + if !unicode.IsLetter(nr) { + l.emitError(`invalid rune in filter function: "` + string(nr) + `"; filter functions should be formed only by letters`) + return nil + } + } + l.emit(ItemFilterFunction) + return lexSpace +} + func lexNode(l *lexer) stateFn { ltID := false for done := false; !done; { diff --git a/bql/lexer/lexer_test.go b/bql/lexer/lexer_test.go index 55a5c0e1..d30fd594 100644 --- a/bql/lexer/lexer_test.go +++ b/bql/lexer/lexer_test.go @@ -76,6 +76,8 @@ func TestTokenTypeString(t *testing.T) { {ItemShow, "SHOW"}, {ItemGraphs, "GRAPHS"}, {ItemOptional, "OPTIONAL"}, + {ItemFilter, "FILTER"}, + {ItemFilterFunction, "FILTER_FUNCTION"}, {TokenType(-1), "UNKNOWN"}, } @@ -388,6 +390,29 @@ func TestIndividualTokens(t *testing.T) { {Type: ItemEOF}, }, }, + { + `FILTER latest(?p)`, + []Token{ + {Type: ItemFilter, Text: "FILTER"}, + {Type: ItemFilterFunction, Text: "latest"}, + {Type: ItemLPar, Text: "("}, + {Type: ItemBinding, Text: "?p"}, + {Type: ItemRPar, Text: ")"}, + {Type: ItemEOF}, + }, + }, + { + `FILTER latest(?p) .`, + []Token{ + {Type: ItemFilter, Text: "FILTER"}, + {Type: ItemFilterFunction, Text: "latest"}, + {Type: ItemLPar, Text: "("}, + {Type: ItemBinding, Text: "?p"}, + {Type: ItemRPar, Text: ")"}, + {Type: ItemDot, Text: "."}, + {Type: ItemEOF}, + }, + }, } for _, test := range table { @@ -578,6 +603,36 @@ func TestValidTokenQuery(t *testing.T) { ItemHaving, ItemBinding, ItemEQ, ItemTime, ItemSemicolon, ItemEOF, }, }, + { + `select ?s ?p ?o + from ?foo + where { + ?s ?p ?o . + FILTER latest(?p) + };`, + []TokenType{ + ItemQuery, ItemBinding, ItemBinding, ItemBinding, ItemFrom, ItemBinding, + ItemWhere, ItemLBracket, ItemBinding, ItemBinding, ItemBinding, ItemDot, + ItemFilter, ItemFilterFunction, ItemLPar, ItemBinding, ItemRPar, + ItemRBracket, ItemSemicolon, ItemEOF, + }, + }, + { + `select ?s ?p ?o + from ?foo + where { + ?s ?p ?o . + FILTER latest(?p) . + FILTER latest(?o) + };`, + []TokenType{ + ItemQuery, ItemBinding, ItemBinding, ItemBinding, ItemFrom, ItemBinding, + ItemWhere, ItemLBracket, ItemBinding, ItemBinding, ItemBinding, ItemDot, + ItemFilter, ItemFilterFunction, ItemLPar, ItemBinding, ItemRPar, ItemDot, + ItemFilter, ItemFilterFunction, ItemLPar, ItemBinding, ItemRPar, + ItemRBracket, ItemSemicolon, ItemEOF, + }, + }, } for _, test := range table { diff --git a/bql/planner/data_access.go b/bql/planner/data_access.go index 04bba8d2..324e18e0 100644 --- a/bql/planner/data_access.go +++ b/bql/planner/data_access.go @@ -45,9 +45,10 @@ var _ error = (*skippableError)(nil) // provided graph clause. func updateTimeBounds(lo *storage.LookupOptions, cls *semantic.GraphClause) *storage.LookupOptions { nlo := &storage.LookupOptions{ - MaxElements: lo.MaxElements, - LowerAnchor: lo.LowerAnchor, - UpperAnchor: lo.UpperAnchor, + MaxElements: lo.MaxElements, + LowerAnchor: lo.LowerAnchor, + UpperAnchor: lo.UpperAnchor, + FilterOptions: lo.FilterOptions, } if cls.PLowerBound != nil { if lo.LowerAnchor == nil || (lo.LowerAnchor != nil && cls.PLowerBound.After(*lo.LowerAnchor)) { diff --git a/bql/planner/filter/filter.go b/bql/planner/filter/filter.go new file mode 100644 index 00000000..c18b2631 --- /dev/null +++ b/bql/planner/filter/filter.go @@ -0,0 +1,88 @@ +// Copyright 2020 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package filter isolates core FILTER related implementation. +package filter + +import ( + "fmt" +) + +// Operation represents a filter operation supported in BadWolf. +type Operation int + +// List of supported filter operations. +const ( + Latest Operation = iota + 1 +) + +// Field represents the position of the semantic.GraphClause that will be operated by the filter at storage level. +type Field int + +// List of filter fields. +const ( + SubjectField Field = iota + 1 + PredicateField + ObjectField +) + +// SupportedOperations maps suported filter operation strings to their correspondant Operation. +// Note that the string keys here must be in lowercase letters only (for compatibility with the WhereFilterClauseHook). +var SupportedOperations = map[string]Operation{ + "latest": Latest, +} + +// StorageOptions represent the storage level specifications for the filtering to be executed. +// Operation below refers to the filter function being applied (eg: Latest), Field refers to the position of the graph clause it +// will be applied to (subject, predicate or object) and Value, when specified, contains the second argument of the filter +// function (not applicable for all Operations - some like Latest do not use it while others like GreaterThan do, see Issue 129). +type StorageOptions struct { + Operation Operation + Field Field + Value string +} + +// String returns the string representation of Operation. +func (op Operation) String() string { + switch op { + case Latest: + return "latest" + default: + return fmt.Sprintf(`not defined filter operation "%d"`, op) + } +} + +// IsEmpty returns true if the Operation was not set yet. +func (op Operation) IsEmpty() bool { + return op == Operation(0) +} + +// String returns the string representation of Field. +func (f Field) String() string { + switch f { + case SubjectField: + return "subject field" + case PredicateField: + return "predicate field" + case ObjectField: + return "object field" + default: + return fmt.Sprintf(`not defined filter field "%d"`, f) + } +} + +// String returns the string representation of StorageOptions. +func (so *StorageOptions) String() string { + return fmt.Sprintf("%+v", *so) +} diff --git a/bql/planner/planner.go b/bql/planner/planner.go index d3e05eff..aa3384d8 100644 --- a/bql/planner/planner.go +++ b/bql/planner/planner.go @@ -28,6 +28,7 @@ import ( "sync" "github.com/google/badwolf/bql/lexer" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/bql/planner/tracer" "github.com/google/badwolf/bql/semantic" "github.com/google/badwolf/bql/table" @@ -258,7 +259,8 @@ type queryPlan struct { bndgs []string grfsNames []string grfs []storage.Graph - cls []*semantic.GraphClause + clauses []*semantic.GraphClause + filters []*semantic.FilterClause tbl *table.Table chanSize int tracer io.Writer @@ -284,7 +286,8 @@ func newQueryPlan(ctx context.Context, store storage.Store, stm *semantic.Statem store: store, bndgs: bs, grfsNames: stm.InputGraphNames(), - cls: stm.GraphPatternClauses(), + clauses: stm.GraphPatternClauses(), + filters: stm.FilterClauses(), tbl: t, chanSize: chanSize, tracer: w, @@ -620,24 +623,116 @@ func (p *queryPlan) filterOnExistence(ctx context.Context, cls *semantic.GraphCl return grp.Wait() } +// organizeClausesByBinding takes the graph clauses received as input and organize them in a map +// on which the keys are the bindings of these clauses. +func organizeClausesByBinding(clauses []*semantic.GraphClause) map[string][]*semantic.GraphClause { + clausesByBinding := map[string][]*semantic.GraphClause{} + for _, cls := range clauses { + for b := range cls.BindingsMap() { + clausesByBinding[b] = append(clausesByBinding[b], cls) + } + } + + return clausesByBinding +} + +// compatibleBindingsInClauseForFilterOperation returns a function that, for each given clause, returns the bindings that are +// compatible with the specified filter operation. +func compatibleBindingsInClauseForFilterOperation(operation filter.Operation) (compatibleBindingsInClause func(cls *semantic.GraphClause) (bindingsByField map[filter.Field]map[string]bool), err error) { + switch operation { + case filter.Latest: + compatibleBindingsInClause = func(cls *semantic.GraphClause) (bindingsByField map[filter.Field]map[string]bool) { + bindingsByField = map[filter.Field]map[string]bool{ + filter.PredicateField: {cls.PBinding: true, cls.PAlias: true}, + filter.ObjectField: {cls.OBinding: true, cls.OAlias: true}, + } + return + } + default: + err = fmt.Errorf("filter function %q has no bindings in clause specified for it (planner level)", operation) + } + + return +} + +// organizeFilterOptionsByClause processes all the given filters and organize them in a map that has as keys the +// clauses to which they must be applied. +func organizeFilterOptionsByClause(filters []*semantic.FilterClause, clauses []*semantic.GraphClause) (map[*semantic.GraphClause]*filter.StorageOptions, error) { + clausesByBinding := organizeClausesByBinding(clauses) + filterOptionsByClause := map[*semantic.GraphClause]*filter.StorageOptions{} + + for _, f := range filters { + if _, ok := clausesByBinding[f.Binding]; !ok { + return nil, fmt.Errorf("binding %q referenced by filter clause %q does not exist in the graph pattern", f.Binding, f) + } + + compatibleBindingsInClause, err := compatibleBindingsInClauseForFilterOperation(f.Operation) + if err != nil { + return nil, err + } + for _, cls := range clausesByBinding[f.Binding] { + if _, ok := filterOptionsByClause[cls]; ok { + return nil, fmt.Errorf("multiple filters for the same graph clause or same binding are not supported at the moment") + } + + compatibleBindingsByField := compatibleBindingsInClause(cls) + filterBindingIsCompatible := false + for field, bndgs := range compatibleBindingsByField { + if bndgs[f.Binding] { + filterBindingIsCompatible = true + filterOptionsByClause[cls] = &filter.StorageOptions{ + Operation: f.Operation, + Field: field, + Value: f.Value, + } + break + } + } + if !filterBindingIsCompatible { + return nil, fmt.Errorf("binding %q occupies a position in graph clause %q that is incompatible with filter function %q", f.Binding, cls, f.Operation) + } + } + } + + return filterOptionsByClause, nil +} + +// addFilterOptions adds FilterOptions to lookup options if the given clause has bindings for which +// filters were defined (organized in filterOptionsByClause). +func addFilterOptions(lo *storage.LookupOptions, cls *semantic.GraphClause, filterOptionsByClause map[*semantic.GraphClause]*filter.StorageOptions) { + if _, ok := filterOptionsByClause[cls]; ok { + lo.FilterOptions = filterOptionsByClause[cls] + } +} + +// resetFilterOptions resets FilterOptions in lookup options to nil. +func resetFilterOptions(lo *storage.LookupOptions) { + lo.FilterOptions = (*filter.StorageOptions)(nil) +} + // processGraphPattern process the query graph pattern to retrieve the // data from the specified graphs. func (p *queryPlan) processGraphPattern(ctx context.Context, lo *storage.LookupOptions) error { tracer.Trace(p.tracer, func() []string { var res []string - for i, cls := range p.cls { + for i, cls := range p.clauses { res = append(res, fmt.Sprintf("Clause %d to process: %v", i, cls)) } return res }) - for i, c := range p.cls { - i, cls := i, *c + + filterOptionsByClause, err := organizeFilterOptionsByClause(p.filters, p.clauses) + if err != nil { + return err + } + for i, cls := range p.clauses { tracer.Trace(p.tracer, func() []string { return []string{fmt.Sprintf("Processing clause %d: %v", i, &cls)} }) - // The current planner is based on naively executing clauses by - // specificity. - unresolvable, err := p.processClause(ctx, &cls, lo) + + addFilterOptions(lo, cls, filterOptionsByClause) + unresolvable, err := p.processClause(ctx, cls, lo) + resetFilterOptions(lo) if err != nil { return err } @@ -646,6 +741,7 @@ func (p *queryPlan) processGraphPattern(ctx context.Context, lo *storage.LookupO return nil } } + return nil } @@ -831,11 +927,17 @@ func (p *queryPlan) String(ctx context.Context) string { b.WriteString("using store(\"") b.WriteString(p.store.Name(nil)) b.WriteString(fmt.Sprintf("\") graphs %v\nresolve\n", p.grfsNames)) - for _, c := range p.cls { + for _, c := range p.clauses { b.WriteString("\t") b.WriteString(c.String()) b.WriteString("\n") } + b.WriteString("with filters\n") + for _, f := range p.filters { + b.WriteString("\t") + b.WriteString(f.String()) + b.WriteString("\n") + } b.WriteString("project results using\n") for _, p := range p.stm.Projection() { b.WriteString("\t") diff --git a/bql/planner/planner_test.go b/bql/planner/planner_test.go index 0ed651f8..2135ccfd 100644 --- a/bql/planner/planner_test.go +++ b/bql/planner/planner_test.go @@ -39,6 +39,8 @@ const ( /u "bought"@[2016-02-01T00:00:00-08:00] /c /u "bought"@[2016-03-01T00:00:00-08:00] /c /u "bought"@[2016-04-01T00:00:00-08:00] /c + /u "bought"@[2016-01-01T00:00:00-08:00] /c + /u "bought"@[2016-04-01T00:00:00-08:00] /c /c "is_a"@[] /t /c "is_a"@[] /t /c "is_a"@[] /t @@ -48,6 +50,7 @@ const ( /l "predicate"@[] "turned"@[2016-03-01T00:00:00-08:00] /l "predicate"@[] "turned"@[2016-04-01T00:00:00-08:00] /l "predicate"@[] "immutable_predicate"@[] + /l "predicate"@[] "turned"@[2016-04-01T00:00:00-08:00] /u "height_cm"@[] "174"^^type:int64 /u "tag"@[] "abc"^^type:text /u "height_cm"@[] "151"^^type:int64 @@ -840,7 +843,7 @@ func TestPlannerQuery(t *testing.T) { } HAVING (?p_id < "in"^^type:text) AND (?time > 2016-02-01T00:00:00-08:00);`, nBindings: 4, - nRows: 2, + nRows: 3, }, { q: `SELECT ?p, ?time @@ -958,7 +961,7 @@ func TestPlannerQuery(t *testing.T) { ?s ?p ?o AT ?o_time };`, nBindings: 3, - nRows: 4, + nRows: 5, }, { q: `SELECT ?s, ?o, ?o_time @@ -978,7 +981,131 @@ func TestPlannerQuery(t *testing.T) { ?s ?p "turned"@[?o_time] };`, nBindings: 2, - nRows: 4, + nRows: 5, + }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /u ?p ?o . + FILTER latest(?p) + };`, + nBindings: 2, + nRows: 1, + }, + { + q: `SELECT ?p_alias, ?o + FROM ?test + WHERE { + /u ?p AS ?p_alias ?o . + FILTER latest(?p_alias) + };`, + nBindings: 2, + nRows: 1, + }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /l ?p ?o . + FILTER latest(?o) + };`, + nBindings: 2, + nRows: 1, + }, + { + q: `SELECT ?p, ?o_alias + FROM ?test + WHERE { + /l ?p ?o AS ?o_alias . + FILTER latest(?o_alias) + };`, + nBindings: 2, + nRows: 1, + }, + { + q: `SELECT ?p1, ?p2 + FROM ?test + WHERE { + /u ?p1 ?o1 . + /item/book<000> ?p2 ?o2 . + FILTER latest(?p1) . + FILTER latest(?p2) + };`, + nBindings: 2, + nRows: 1, + }, + { + q: `SELECT ?s, ?p, ?o + FROM ?test + WHERE { + ?s ?p ?o . + FILTER latest(?p) + };`, + nBindings: 3, + nRows: 3, + }, + { + q: `SELECT ?s, ?p, ?o + FROM ?test + WHERE { + ?s ?p ?o . + FILTER latest(?o) + };`, + nBindings: 3, + nRows: 2, + }, + { + q: `SELECT ?s, ?p_alias, ?o + FROM ?test + WHERE { + ?s "bought"@[2016-03-01T00:00:00-08:00] AS ?p_alias ?o . + FILTER latest(?p_alias) + };`, + nBindings: 3, + nRows: 1, + }, + { + q: `SELECT ?s, ?p, ?o_alias + FROM ?test + WHERE { + ?s ?p "turned"@[2016-03-01T00:00:00-08:00] AS ?o_alias . + FILTER latest(?o_alias) + };`, + nBindings: 3, + nRows: 1, + }, + { + q: `SELECT ?s, ?p, ?o + FROM ?test + WHERE { + ?s "bought"@[?time] ?o . + OPTIONAL { ?s ?p ?o } . + FILTER latest(?p) + };`, + nBindings: 3, + nRows: 6, + }, + { + q: `SELECT ?p + FROM ?test + WHERE { + /u ?p ?o1 . + /u ?p ?o2 . + FILTER latest(?p) + };`, + nBindings: 1, + nRows: 1, + }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /u ?p ?o . + FILTER lAtEsT(?p) . + };`, + nBindings: 2, + nRows: 1, }, } @@ -1065,6 +1192,40 @@ func TestPlannerQueryError(t *testing.T) { } HAVING ?s > /u;`, }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /u ?p ?o . + FILTER latest(?p) . + FILTER latest(?p) + };`, + }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /l ?p ?o . + FILTER latest(?p) . + FILTER latest(?o) + };`, + }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /u ?p ?o . + FILTER latest(?b_not_exist) + };`, + }, + { + q: `SELECT ?s, ?p, ?o + FROM ?test + WHERE { + ?s ID ?sID ?p ?o . + FILTER latest(?sID) + };`, + }, } s, ctx := memory.NewStore(), context.Background() diff --git a/bql/semantic/convert.go b/bql/semantic/convert.go index af62fc54..70e17beb 100644 --- a/bql/semantic/convert.go +++ b/bql/semantic/convert.go @@ -69,6 +69,11 @@ func (c ConsumedElement) Token() *lexer.Token { return c.token } +// Token returns the boxed token. +func (c ConsumedElement) String() string { + return fmt.Sprintf("{isSymbol=%v, symbol=%s, token=%s}", c.isSymbol, c.symbol, c.token) +} + // ToNode converts the node found by the lexer and converts it into a BadWolf // node. func ToNode(ce ConsumedElement) (*node.Node, error) { diff --git a/bql/semantic/hooks.go b/bql/semantic/hooks.go index d836be8d..10aa9894 100644 --- a/bql/semantic/hooks.go +++ b/bql/semantic/hooks.go @@ -21,6 +21,7 @@ import ( "time" "github.com/google/badwolf/bql/lexer" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/bql/table" "github.com/google/badwolf/triple" "github.com/google/badwolf/triple/literal" @@ -92,6 +93,12 @@ func WhereObjectClauseHook() ElementHook { return whereObjectClause() } +// WhereFilterClauseHook returns the singleton for the working filter clause hook that +// populates the filters list. +func WhereFilterClauseHook() ElementHook { + return whereFilterClause() +} + // VarAccumulatorHook returns the singleton for accumulating variable // projections. func VarAccumulatorHook() ElementHook { @@ -336,6 +343,7 @@ func whereInitWorkingClause() ClauseHook { var f ClauseHook f = func(s *Statement, _ Symbol) (ClauseHook, error) { s.ResetWorkingGraphClause() + s.ResetWorkingFilterClause() return f, nil } return f @@ -664,6 +672,57 @@ func whereObjectClause() ElementHook { return f } +// whereFilterClause returns an element hook that updates the working filter clause and, +// if the filter clause is complete, populates the filters list of the statement. +func whereFilterClause() ElementHook { + var hook ElementHook + + hook = func(st *Statement, ce ConsumedElement) (ElementHook, error) { + if ce.IsSymbol() { + return hook, nil + } + + tkn := ce.Token() + currFilter := st.WorkingFilter() + switch tkn.Type { + case lexer.ItemFilterFunction: + if currFilter == nil { + return nil, fmt.Errorf("could not add filter function %q to nil filter clause", tkn.Text) + } + if !currFilter.Operation.IsEmpty() { + return nil, fmt.Errorf("invalid filter function %q on filter clause since already set to %q", tkn.Text, currFilter.Operation) + } + lowercaseFilter := strings.ToLower(tkn.Text) + if _, ok := filter.SupportedOperations[lowercaseFilter]; !ok { + return nil, fmt.Errorf("filter function %q on filter clause is not supported", tkn.Text) + } + currFilter.Operation = filter.SupportedOperations[lowercaseFilter] + return hook, nil + case lexer.ItemBinding: + if currFilter == nil { + return nil, fmt.Errorf("could not add binding %q to nil filter clause", tkn.Text) + } + if currFilter.Operation.IsEmpty() { + return nil, fmt.Errorf("could not add binding %q to a filter clause that does not have a filter function previously set", tkn.Text) + } + if currFilter.Binding != "" { + return nil, fmt.Errorf("invalid binding %q on filter clause since already set to %q", tkn.Text, currFilter.Binding) + } + currFilter.Binding = tkn.Text + return hook, nil + case lexer.ItemRPar: + if currFilter == nil || currFilter.Operation.IsEmpty() || currFilter.Binding == "" { + return nil, fmt.Errorf("could not add invalid working filter %q to the statement filters list", currFilter) + } + st.AddWorkingFilterClause() + } + + return hook, nil + } + + return hook +} + // varAccumulator returns an element hook that updates the object // modifiers on the working graph clause. func varAccumulator() ElementHook { diff --git a/bql/semantic/hooks_test.go b/bql/semantic/hooks_test.go index 4494449d..de172379 100644 --- a/bql/semantic/hooks_test.go +++ b/bql/semantic/hooks_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/google/badwolf/bql/lexer" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/bql/table" "github.com/google/badwolf/storage" "github.com/google/badwolf/triple" @@ -179,8 +180,220 @@ func TestWhereInitClauseHook(t *testing.T) { f := whereInitWorkingClause() st := &Statement{} f(st, Symbol("FOO")) - if st.WorkingClause() == nil { - t.Errorf("semantic.WhereInitWorkingClause should have returned a valid working clause for statement %v", st) + if wc := st.WorkingClause(); wc == nil || !wc.IsEmpty() { + t.Errorf(`semantic.Statement.WorkingClause() = %q for statement "%v" after call to semantic.WhereInitWorkingClause; want empty GraphClause`, wc, st) + } + if wf := st.WorkingFilter(); wf == nil || !wf.IsEmpty() { + t.Errorf(`semantic.Statement.WorkingFilter() = %q for statement "%v" after call to semantic.WhereInitWorkingClause; want empty FilterClause`, wf, st) + } +} + +func TestWhereFilterClauseHook(t *testing.T) { + st := &Statement{} + f := whereFilterClause() + st.ResetWorkingFilterClause() + + testTable := []struct { + id string + ces []ConsumedElement + want *FilterClause + }{ + { + id: "FILTER latest(?p)", + ces: []ConsumedElement{ + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilter, + Text: "FILTER", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilterFunction, + Text: "latest", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemLPar, + Text: "(", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemBinding, + Text: "?p", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemRPar, + Text: ")", + }), + NewConsumedSymbol("FOO"), + }, + want: &FilterClause{ + Operation: filter.Latest, + Binding: "?p", + }, + }, + { + id: "FILTER latest(?o)", + ces: []ConsumedElement{ + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilter, + Text: "FILTER", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilterFunction, + Text: "latest", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemLPar, + Text: "(", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemBinding, + Text: "?o", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemRPar, + Text: ")", + }), + NewConsumedSymbol("FOO"), + }, + want: &FilterClause{ + Operation: filter.Latest, + Binding: "?o", + }, + }, + } + for i, entry := range testTable { + for _, ce := range entry.ces { + if _, err := f(st, ce); err != nil { + t.Errorf("%q: semantic.WhereFilterClauseHook(%s) = _, %v; want _, nil", entry.id, ce, err) + } + } + if got, want := st.FilterClauses()[i], entry.want; !reflect.DeepEqual(got, want) { + t.Errorf("%q: semantic.Statement.FilterClauses()[%d] = %s; want %s", entry.id, i, got, want) + } + } + + if got, want := len(st.FilterClauses()), len(testTable); got != want { + t.Errorf("len(semantic.Statement.FilterClauses()) = %d after consuming %d valid FILTER clauses; want %d", got, want, want) + } +} + +func TestWhereFilterClauseHookError(t *testing.T) { + st := &Statement{} + f := whereFilterClause() + st.ResetWorkingFilterClause() + + testTable := []struct { + id string + ces []ConsumedElement + ceIndexError int + }{ + { + id: "FILTER notSupportedFilterFunction(?p)", + ces: []ConsumedElement{ + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilter, + Text: "FILTER", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilterFunction, + Text: "notSupportedFilterFunction", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemLPar, + Text: "(", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemBinding, + Text: "?p", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemRPar, + Text: ")", + }), + }, + ceIndexError: 1, + }, + { + id: "FILTER latest latest(?p)", + ces: []ConsumedElement{ + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilter, + Text: "FILTER", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilterFunction, + Text: "latest", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilterFunction, + Text: "latest", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemLPar, + Text: "(", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemBinding, + Text: "?p", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemRPar, + Text: ")", + }), + }, + ceIndexError: 2, + }, + { + id: "FILTER latest(?p, ?o)", + ces: []ConsumedElement{ + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilter, + Text: "FILTER", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemFilterFunction, + Text: "latest", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemLPar, + Text: "(", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemBinding, + Text: "?p", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemComma, + Text: ",", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemBinding, + Text: "?o", + }), + NewConsumedToken(&lexer.Token{ + Type: lexer.ItemRPar, + Text: ")", + }), + }, + ceIndexError: 5, + }, + } + for _, entry := range testTable { + var err error + for i, ce := range entry.ces { + if _, err = f(st, ce); err != nil { + if i != entry.ceIndexError { + t.Errorf("%q: semantic.WhereFilterClauseHook(%v) = _, %v; want _, nil since the expected error should be when consuming %v", entry.id, ce, err, entry.ces[entry.ceIndexError]) + } + break + } + } + if err == nil { + t.Errorf("%q: semantic.WhereFilterClauseHook(%v) = _, nil; want _, error", entry.id, entry.ces[entry.ceIndexError]) + } + st.ResetWorkingFilterClause() + } + + if got, want := len(st.FilterClauses()), 0; got != want { + t.Errorf("len(semantic.Statement.FilterClauses()) = %d; want %d", got, want) } } diff --git a/bql/semantic/semantic.go b/bql/semantic/semantic.go index 4f6175aa..c753e1be 100644 --- a/bql/semantic/semantic.go +++ b/bql/semantic/semantic.go @@ -27,6 +27,7 @@ import ( "time" "github.com/google/badwolf/bql/lexer" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/bql/table" "github.com/google/badwolf/storage" "github.com/google/badwolf/triple" @@ -103,6 +104,8 @@ type Statement struct { limitSet bool limit int64 lookupOptions storage.LookupOptions + filters []*FilterClause + workingFilter *FilterClause } // GraphClause represents a clause of a graph pattern in a where clause. @@ -143,6 +146,16 @@ type GraphClause struct { OTemporal bool } +// FilterClause represents a FILTER clause inside WHERE. +// Operation below refers to the filter function being applied (eg: Latest), Binding refers to the binding it +// will be applied to and Value, when specified, contains the second argument of the filter function (not applicable for all +// Operations - some like Latest do not use it while others like GreaterThan do, see Issue 129). +type FilterClause struct { + Operation filter.Operation + Binding string + Value string +} + // ConstructClause represents a singular clause within a construct statement. type ConstructClause struct { S *node.Node @@ -399,6 +412,16 @@ func (c *GraphClause) IsEmpty() bool { return reflect.DeepEqual(c, &GraphClause{}) } +// IsEmpty will return true if there are no set values in the filter clause. +func (f *FilterClause) IsEmpty() bool { + return reflect.DeepEqual(f, &FilterClause{}) +} + +// String returns a string representation of the filter clause. +func (f *FilterClause) String() string { + return fmt.Sprintf("%+v", *f) +} + // String returns a readable representation of a construct clause. func (c *ConstructClause) String() string { b := bytes.NewBufferString("{ ") @@ -584,16 +607,31 @@ func (s *Statement) GraphPatternClauses() []*GraphClause { return s.pattern } +// FilterClauses returns the list of FILTER clauses. +func (s *Statement) FilterClauses() []*FilterClause { + return s.filters +} + // ResetWorkingGraphClause resets the current working graph clause. func (s *Statement) ResetWorkingGraphClause() { s.workingClause = &GraphClause{} } +// ResetWorkingFilterClause resets the current working filter clause. +func (s *Statement) ResetWorkingFilterClause() { + s.workingFilter = &FilterClause{} +} + // WorkingClause returns the current working clause. func (s *Statement) WorkingClause() *GraphClause { return s.workingClause } +// WorkingFilter returns the current working filter. +func (s *Statement) WorkingFilter() *FilterClause { + return s.workingFilter +} + // AddWorkingGraphClause adds the current working graph clause to the set of // clauses that form the graph pattern. func (s *Statement) AddWorkingGraphClause() { @@ -603,6 +641,12 @@ func (s *Statement) AddWorkingGraphClause() { s.ResetWorkingGraphClause() } +// AddWorkingFilterClause adds the current working filter clause to the filters list. +func (s *Statement) AddWorkingFilterClause() { + s.filters = append(s.filters, s.workingFilter) + s.ResetWorkingFilterClause() +} + // Projection returns the available projections in the statement. func (s *Statement) Projection() []*Projection { return s.projection diff --git a/bql/semantic/semantic_test.go b/bql/semantic/semantic_test.go index 6528276b..f69deeb7 100644 --- a/bql/semantic/semantic_test.go +++ b/bql/semantic/semantic_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/triple" "github.com/google/badwolf/triple/literal" "github.com/google/badwolf/triple/node" @@ -259,6 +260,32 @@ func TestGraphClauseManipulation(t *testing.T) { } } +func TestFilterClauseManipulation(t *testing.T) { + st := &Statement{} + + t.Run("test workingFilter initial states", func(t *testing.T) { + if wf := st.WorkingFilter(); wf != nil { + t.Fatalf(`semantic.Statement.WorkingFilter() = %q for statement "%v" without initialization; want nil`, wf, st) + } + st.ResetWorkingFilterClause() + if wf := st.WorkingFilter(); wf == nil || !wf.IsEmpty() { + t.Fatalf(`semantic.Statement.WorkingFilter() = %q for statement "%v" after call to ResetWorkingFilterClause; want empty FilterClause`, wf, st) + } + }) + + t.Run("test call to add workingFilter", func(t *testing.T) { + wf := st.WorkingFilter() + *wf = FilterClause{Operation: filter.Latest, Binding: "?p"} + st.AddWorkingFilterClause() + if got, want := len(st.FilterClauses()), 1; got != want { + t.Fatalf(`len(semantic.Statement.FilterClauses()) = %d for statement "%v"; want %d`, got, st, want) + } + if wf = st.WorkingFilter(); wf == nil || !wf.IsEmpty() { + t.Fatalf(`semantic.Statement.WorkingFilter() = %q for statement "%v"; want empty FilterClause`, wf, st) + } + }) +} + func TestBindingListing(t *testing.T) { stm := Statement{} stm.ResetWorkingGraphClause() @@ -324,6 +351,27 @@ func TestIsEmptyClause(t *testing.T) { } +func TestIsEmptyFilterClause(t *testing.T) { + testTable := []struct { + in *FilterClause + want bool + }{ + { + in: &FilterClause{}, + want: true, + }, + { + in: &FilterClause{Operation: filter.Latest, Binding: "?p"}, + want: false, + }, + } + for _, entry := range testTable { + if got := entry.in.IsEmpty(); got != entry.want { + t.Errorf("FilterClause.IsEmpty(%q) = %v; want %v", entry.in, got, entry.want) + } + } +} + func TestSortedGraphPatternClauses(t *testing.T) { s := &Statement{ pattern: []*GraphClause{ diff --git a/storage/memoization/memoization_test.go b/storage/memoization/memoization_test.go index 05d9ffa3..6c2718ef 100644 --- a/storage/memoization/memoization_test.go +++ b/storage/memoization/memoization_test.go @@ -31,7 +31,7 @@ import ( ) func TestCombinedUUID(t *testing.T) { - want := "op:9dae52f4-9b35-5d5f-bd8e-195d4b16fc30:00000000-0000-0000-0000-000000000000:00000000-0000-0000-0000-000000000000" + want := "op:420c34d3-9f56-5191-89de-57468c3e6a93:00000000-0000-0000-0000-000000000000:00000000-0000-0000-0000-000000000000" if got := combinedUUID("op", storage.DefaultLookup, uuid.NIL, uuid.NIL); got != want { t.Errorf("combinedUUID returned the wrong value; got %v, want %v", got, want) } diff --git a/storage/memory/memory.go b/storage/memory/memory.go index bcf3cb40..2b054305 100644 --- a/storage/memory/memory.go +++ b/storage/memory/memory.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/storage" "github.com/google/badwolf/triple" "github.com/google/badwolf/triple/node" @@ -274,6 +275,63 @@ func (c *checker) CheckAndUpdate(p *predicate.Predicate) bool { return true } +// latestFilter executes the latest filter operation over memoryTriples following filterOptions. +func latestFilter(memoryTriples map[string]*triple.Triple, pQuery *predicate.Predicate, filterOptions *filter.StorageOptions) (map[string]*triple.Triple, error) { + if filterOptions.Field != filter.PredicateField && filterOptions.Field != filter.ObjectField { + return nil, fmt.Errorf(`invalid field %q for %q filter operation, can accept only %q or %q`, filterOptions.Field, filter.Latest, filter.PredicateField, filter.ObjectField) + } + + lastTA := make(map[string]*time.Time) + trps := make(map[string]map[string]*triple.Triple) + for _, t := range memoryTriples { + if pQuery != nil && pQuery.String() != t.Predicate().String() { + continue + } + + var p *predicate.Predicate + if filterOptions.Field == filter.PredicateField { + p = t.Predicate() + } else if pObj, err := t.Object().Predicate(); filterOptions.Field == filter.ObjectField && err == nil { + p = pObj + } else { + continue + } + if p.Type() != predicate.Temporal { + continue + } + + ppUUID := p.PartialUUID().String() + ta, err := p.TimeAnchor() + if err != nil { + return nil, err + } + if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { + trps[ppUUID] = map[string]*triple.Triple{t.UUID().String(): t} + lastTA[ppUUID] = ta + } else if ta.Sub(*lta) == 0 { + trps[ppUUID][t.UUID().String()] = t + } + } + + trpsByUUID := make(map[string]*triple.Triple) + for _, m := range trps { + for tUUID, t := range m { + trpsByUUID[tUUID] = t + } + } + return trpsByUUID, nil +} + +// executeFilter executes the proper filter operation over memoryTriples following the specifications given in filterOptions. +func executeFilter(memoryTriples map[string]*triple.Triple, pQuery *predicate.Predicate, filterOptions *filter.StorageOptions) (map[string]*triple.Triple, error) { + switch filterOptions.Operation { + case filter.Latest: + return latestFilter(memoryTriples, pQuery, filterOptions) + default: + return nil, fmt.Errorf("filter operation %q not supported in the driver", filterOptions.Operation) + } +} + // Objects published the objects for the give object and predicate to the // provided channel. func (m *memory) Objects(ctx context.Context, s *node.Node, p *predicate.Predicate, lo *storage.LookupOptions, objs chan<- *triple.Object) error { @@ -289,21 +347,22 @@ func (m *memory) Objects(ctx context.Context, s *node.Node, p *predicate.Predica defer close(objs) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxSP[spIdx] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxSP[spIdx], p, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -318,6 +377,7 @@ func (m *memory) Objects(ctx context.Context, s *node.Node, p *predicate.Predica objs <- t.Object() } } + return nil } @@ -327,6 +387,7 @@ func (m *memory) Subjects(ctx context.Context, p *predicate.Predicate, o *triple if subjs == nil { return fmt.Errorf("cannot provide an empty channel") } + pUUID := UUIDToByteString(p.PartialUUID()) oUUID := UUIDToByteString(o.UUID()) poIdx := pUUID + oUUID @@ -335,21 +396,22 @@ func (m *memory) Subjects(ctx context.Context, p *predicate.Predicate, o *triple defer close(subjs) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxPO[poIdx] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxPO[poIdx], p, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -364,6 +426,7 @@ func (m *memory) Subjects(ctx context.Context, p *predicate.Predicate, o *triple subjs <- t.Subject() } } + return nil } @@ -373,6 +436,7 @@ func (m *memory) PredicatesForSubjectAndObject(ctx context.Context, s *node.Node if prds == nil { return fmt.Errorf("cannot provide an empty channel") } + sUUID := UUIDToByteString(s.UUID()) oUUID := UUIDToByteString(o.UUID()) soIdx := sUUID + oUUID @@ -381,21 +445,22 @@ func (m *memory) PredicatesForSubjectAndObject(ctx context.Context, s *node.Node defer close(prds) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxSO[soIdx] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxSO[soIdx], nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -410,6 +475,7 @@ func (m *memory) PredicatesForSubjectAndObject(ctx context.Context, s *node.Node prds <- t.Predicate() } } + return nil } @@ -419,27 +485,29 @@ func (m *memory) PredicatesForSubject(ctx context.Context, s *node.Node, lo *sto if prds == nil { return fmt.Errorf("cannot provide an empty channel") } + sUUID := UUIDToByteString(s.UUID()) m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(prds) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxS[sUUID] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxS[sUUID], nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -454,6 +522,7 @@ func (m *memory) PredicatesForSubject(ctx context.Context, s *node.Node, lo *sto prds <- t.Predicate() } } + return nil } @@ -463,27 +532,29 @@ func (m *memory) PredicatesForObject(ctx context.Context, o *triple.Object, lo * if prds == nil { return fmt.Errorf("cannot provide an empty channel") } + oUUID := UUIDToByteString(o.UUID()) m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(prds) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxO[oUUID] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxO[oUUID], nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -498,6 +569,7 @@ func (m *memory) PredicatesForObject(ctx context.Context, o *triple.Object, lo * prds <- t.Predicate() } } + return nil } @@ -507,27 +579,29 @@ func (m *memory) TriplesForSubject(ctx context.Context, s *node.Node, lo *storag if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + sUUID := UUIDToByteString(s.UUID()) m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxS[sUUID] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxS[sUUID], nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -542,6 +616,7 @@ func (m *memory) TriplesForSubject(ctx context.Context, s *node.Node, lo *storag trpls <- t } } + return nil } @@ -551,27 +626,29 @@ func (m *memory) TriplesForPredicate(ctx context.Context, p *predicate.Predicate if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + pUUID := UUIDToByteString(p.PartialUUID()) m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxP[pUUID] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxP[pUUID], p, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -586,6 +663,7 @@ func (m *memory) TriplesForPredicate(ctx context.Context, p *predicate.Predicate trpls <- t } } + return nil } @@ -595,27 +673,29 @@ func (m *memory) TriplesForObject(ctx context.Context, o *triple.Object, lo *sto if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + oUUID := UUIDToByteString(o.UUID()) m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxO[oUUID] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxO[oUUID], nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -630,6 +710,7 @@ func (m *memory) TriplesForObject(ctx context.Context, o *triple.Object, lo *sto trpls <- t } } + return nil } @@ -639,6 +720,7 @@ func (m *memory) TriplesForSubjectAndPredicate(ctx context.Context, s *node.Node if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + sUUID := UUIDToByteString(s.UUID()) pUUID := UUIDToByteString(p.PartialUUID()) spIdx := sUUID + pUUID @@ -647,21 +729,22 @@ func (m *memory) TriplesForSubjectAndPredicate(ctx context.Context, s *node.Node defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxSP[spIdx] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxSP[spIdx], p, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -676,6 +759,7 @@ func (m *memory) TriplesForSubjectAndPredicate(ctx context.Context, s *node.Node trpls <- t } } + return nil } @@ -685,6 +769,7 @@ func (m *memory) TriplesForPredicateAndObject(ctx context.Context, p *predicate. if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + pUUID := UUIDToByteString(p.PartialUUID()) oUUID := UUIDToByteString(o.UUID()) poIdx := pUUID + oUUID @@ -693,21 +778,22 @@ func (m *memory) TriplesForPredicateAndObject(ctx context.Context, p *predicate. defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxPO[poIdx] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxPO[poIdx], p, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -722,6 +808,7 @@ func (m *memory) TriplesForPredicateAndObject(ctx context.Context, p *predicate. trpls <- t } } + return nil } @@ -740,26 +827,28 @@ func (m *memory) Triples(ctx context.Context, lo *storage.LookupOptions, trpls c if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idx { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idx, nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -774,5 +863,6 @@ func (m *memory) Triples(ctx context.Context, lo *storage.LookupOptions, trpls c trpls <- t } } + return nil } diff --git a/storage/memory/memory_test.go b/storage/memory/memory_test.go index 8decd9ff..6e4b4815 100644 --- a/storage/memory/memory_test.go +++ b/storage/memory/memory_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/storage" "github.com/google/badwolf/tools/testutil" "github.com/google/badwolf/triple" @@ -237,6 +238,19 @@ func getTestTemporalTriples(t *testing.T) []*triple.Triple { }) } +func getTestTriplesFilter(t *testing.T) []*triple.Triple { + return createTriples(t, []string{ + "/u\t\"meet\"@[2012-04-10T04:21:00Z]\t/u", + "/u\t\"meet\"@[2013-04-10T04:21:00Z]\t/u", + "/u\t\"meet\"@[2014-04-10T04:21:00Z]\t/u", + "/u\t\"meet\"@[2014-04-10T04:21:00Z]\t/u", + "/u\t\"parent_of\"@[]\t/u", + "/_\t\"_predicate\"@[]\t\"meet\"@[2020-04-10T04:21:00Z]", + "/_\t\"_predicate\"@[]\t\"meet\"@[2021-04-10T04:21:00Z]", + "/_\t\"_predicate\"@[]\t\"height_cm\"@[]", + }) +} + func TestAddRemoveTriples(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -275,7 +289,7 @@ func TestObjects(t *testing.T) { } } -func TestObjectsLastestTemporal(t *testing.T) { +func TestObjectsLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -303,6 +317,65 @@ func TestObjectsLastestTemporal(t *testing.T) { } } +func TestObjectsFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + s *node.Node + p *predicate.Predicate + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + p: testutil.MustBuildPredicate(t, `"meet"@[2012-04-10T04:21:00Z]`), + want: map[string]int{"/u": 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + p: testutil.MustBuildPredicate(t, `"meet"@[2014-04-10T04:21:00Z]`), + want: map[string]int{"/u": 1, "/u": 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + s: testutil.MustBuildNodeFromStrings(t, "/_", "bn"), + p: testutil.MustBuildPredicate(t, `"_predicate"@[]`), + want: map[string]int{`"meet"@[2021-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + os := make(chan *triple.Object, 100) + s := entry.s + p := entry.p + if err := g.Objects(ctx, s, p, entry.lo, os); err != nil { + t.Fatalf("g.Objects(%s, %s, %s) = %v; want nil", s, p, entry.lo, err) + } + for o := range os { + oStr := o.String() + if _, ok := entry.want[oStr]; !ok { + t.Fatalf("g.Objects(%s, %s, %s) retrieved unexpected %s", s, p, entry.lo, oStr) + } + entry.want[oStr] = entry.want[oStr] - 1 + if entry.want[oStr] == 0 { + delete(entry.want, oStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.Objects(%s, %s, %s) failed to retrieve some expected elements: %v", s, p, entry.lo, entry.want) + } + } +} + func TestSubjects(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -329,7 +402,7 @@ func TestSubjects(t *testing.T) { } } -func TestSubjectsLatestTemporal(t *testing.T) { +func TestSubjectsLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -356,6 +429,65 @@ func TestSubjectsLatestTemporal(t *testing.T) { } } +func TestSubjectsFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + p *predicate.Predicate + o *triple.Object + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2012-04-10T04:21:00Z]`), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{"/u": 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2014-04-10T04:21:00Z]`), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{"/u": 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + p: testutil.MustBuildPredicate(t, `"_predicate"@[]`), + o: triple.NewPredicateObject(testutil.MustBuildPredicate(t, `"meet"@[2020-04-10T04:21:00Z]`)), + want: map[string]int{"/_": 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + ss := make(chan *node.Node, 100) + p := entry.p + o := entry.o + if err := g.Subjects(ctx, p, o, entry.lo, ss); err != nil { + t.Fatalf("g.Subjects(%s, %s, %s) = %v; want nil", p, o, entry.lo, err) + } + for s := range ss { + sStr := s.String() + if _, ok := entry.want[sStr]; !ok { + t.Fatalf("g.Subjects(%s, %s, %s) retrieved unexpected %s", p, o, entry.lo, sStr) + } + entry.want[sStr] = entry.want[sStr] - 1 + if entry.want[sStr] == 0 { + delete(entry.want, sStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.Subjects(%s, %s, %s) failed to retrieve some expected elements: %v", p, o, entry.lo, entry.want) + } + } +} + func TestPredicatesForSubjectAndObject(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -380,7 +512,8 @@ func TestPredicatesForSubjectAndObject(t *testing.T) { t.Errorf("g.PredicatesForSubjectAndObject(%s, %s) failed to retrieve 1 predicate, got %d instead", ts[0].Subject(), ts[0].Object(), cnt) } } -func TestPredicatesForSubjectAndObjectLatestTemporal(t *testing.T) { + +func TestPredicatesForSubjectAndObjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -406,6 +539,65 @@ func TestPredicatesForSubjectAndObjectLatestTemporal(t *testing.T) { } } +func TestPredicatesForSubjectAndObjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + s *node.Node + o *triple.Object + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{`"meet"@[2014-04-10T04:21:00Z]`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "bob")), + want: map[string]int{`"meet"@[2014-04-10T04:21:00Z]`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + s: testutil.MustBuildNodeFromStrings(t, "/_", "bn"), + o: triple.NewPredicateObject(testutil.MustBuildPredicate(t, `"meet"@[2020-04-10T04:21:00Z]`)), + want: map[string]int{`"_predicate"@[]`: 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + pp := make(chan *predicate.Predicate, 100) + s := entry.s + o := entry.o + if err := g.PredicatesForSubjectAndObject(ctx, s, o, entry.lo, pp); err != nil { + t.Fatalf("g.PredicatesForSubjectAndObject(%s, %s, %s) = %v; want nil", s, o, entry.lo, err) + } + for p := range pp { + pStr := p.String() + if _, ok := entry.want[pStr]; !ok { + t.Fatalf("g.PredicatesForSubjectAndObject(%s, %s, %s) retrieved unexpected %s", s, o, entry.lo, pStr) + } + entry.want[pStr] = entry.want[pStr] - 1 + if entry.want[pStr] == 0 { + delete(entry.want, pStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.PredicatesForSubjectAndObject(%s, %s, %s) failed to retrieve some expected elements: %v", s, o, entry.lo, entry.want) + } + } +} + func TestPredicatesForSubject(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -431,7 +623,7 @@ func TestPredicatesForSubject(t *testing.T) { } } -func TestPredicatesForSubjectLatestTemporal(t *testing.T) { +func TestPredicatesForSubjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -457,6 +649,55 @@ func TestPredicatesForSubjectLatestTemporal(t *testing.T) { } } +func TestPredicatesForSubjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + s *node.Node + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + want: map[string]int{`"meet"@[2014-04-10T04:21:00Z]`: 2}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + s: testutil.MustBuildNodeFromStrings(t, "/_", "bn"), + want: map[string]int{`"_predicate"@[]`: 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + pp := make(chan *predicate.Predicate, 100) + s := entry.s + if err := g.PredicatesForSubject(ctx, s, entry.lo, pp); err != nil { + t.Fatalf("g.PredicatesForSubject(%s, %s) = %v; want nil", s, entry.lo, err) + } + for p := range pp { + pStr := p.String() + if _, ok := entry.want[pStr]; !ok { + t.Fatalf("g.PredicatesForSubject(%s, %s) retrieved unexpected %s", s, entry.lo, pStr) + } + entry.want[pStr] = entry.want[pStr] - 1 + if entry.want[pStr] == 0 { + delete(entry.want, pStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.PredicatesForSubject(%s, %s) failed to retrieve some expected elements: %v", s, entry.lo, entry.want) + } + } +} + func TestPredicatesForObject(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -481,7 +722,8 @@ func TestPredicatesForObject(t *testing.T) { t.Errorf("g.PredicatesForObject(%s) failed to retrieve 1 predicate, got %d instead", ts[0].Object(), cnt) } } -func TestPredicatesForObjectLatestTemporal(t *testing.T) { + +func TestPredicatesForObjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -507,6 +749,60 @@ func TestPredicatesForObjectLatestTemporal(t *testing.T) { } } +func TestPredicatesForObjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + o *triple.Object + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{`"meet"@[2014-04-10T04:21:00Z]`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "bob")), + want: map[string]int{`"meet"@[2014-04-10T04:21:00Z]`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + o: triple.NewPredicateObject(testutil.MustBuildPredicate(t, `"meet"@[2020-04-10T04:21:00Z]`)), + want: map[string]int{`"_predicate"@[]`: 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + pp := make(chan *predicate.Predicate, 100) + o := entry.o + if err := g.PredicatesForObject(ctx, o, entry.lo, pp); err != nil { + t.Fatalf("g.PredicatesForObject(%s, %s) = %v; want nil", o, entry.lo, err) + } + for p := range pp { + pStr := p.String() + if _, ok := entry.want[pStr]; !ok { + t.Fatalf("g.PredicatesForObject(%s, %s) retrieved unexpected %s", o, entry.lo, pStr) + } + entry.want[pStr] = entry.want[pStr] - 1 + if entry.want[pStr] == 0 { + delete(entry.want, pStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.PredicatesForObject(%s, %s) failed to retrieve some expected elements: %v", o, entry.lo, entry.want) + } + } +} + func TestTriplesForSubject(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -529,7 +825,7 @@ func TestTriplesForSubject(t *testing.T) { } } -func TestTriplesForSubjectLatestTemporal(t *testing.T) { +func TestTriplesForSubjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -555,6 +851,55 @@ func TestTriplesForSubjectLatestTemporal(t *testing.T) { } } +func TestTriplesForSubjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + s *node.Node + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1, `/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + s: testutil.MustBuildNodeFromStrings(t, "/_", "bn"), + want: map[string]int{`/_ "_predicate"@[] "meet"@[2021-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + s := entry.s + if err := g.TriplesForSubject(ctx, s, entry.lo, trpls); err != nil { + t.Fatalf("g.TriplesForSubject(%s, %s) = %v; want nil", s, entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.TriplesForSubject(%s, %s) retrieved unexpected %s", s, entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.TriplesForSubject(%s, %s) failed to retrieve some expected elements: %v", s, entry.lo, entry.want) + } + } +} + func TestTriplesForPredicate(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -576,7 +921,8 @@ func TestTriplesForPredicate(t *testing.T) { t.Errorf("g.triplesForPredicate(%s) failed to retrieve 3 predicates, got %d instead", ts[0].Predicate(), cnt) } } -func TestTriplesForPredicateLatestTemporal(t *testing.T) { + +func TestTriplesForPredicateLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -593,12 +939,66 @@ func TestTriplesForPredicateLatestTemporal(t *testing.T) { cnt := 0 for rts := range trpls { cnt++ - if !reflect.DeepEqual(rts.Predicate().UUID(), ts[len(ts)-1].Predicate().UUID()) { - t.Errorf("g.PredicatesForObject(%s) failed to return a valid predicate; returned %s instead", ts[0].Object(), rts.Predicate()) + if !reflect.DeepEqual(rts.Predicate().UUID(), ts[0].Predicate().UUID()) { + t.Errorf("g.TriplesForPredicate(%s) = %s for LatestAnchor; want %s", ts[0].Predicate(), rts.Predicate(), ts[0].Predicate()) } } if cnt != 1 { - t.Errorf("g.triplesForPredicate(%s) failed to retrieve 3 predicates, got %d instead", ts[0].Predicate(), cnt) + t.Errorf("g.triplesForPredicate(%s) retrieved %d predicates; want 1", ts[0].Predicate(), cnt) + } +} + +func TestTriplesForPredicateFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + p *predicate.Predicate + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2012-04-10T04:21:00Z]`), + want: map[string]int{`/u "meet"@[2012-04-10T04:21:00Z] /u`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2014-04-10T04:21:00Z]`), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1, `/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + p: testutil.MustBuildPredicate(t, `"_predicate"@[]`), + want: map[string]int{`/_ "_predicate"@[] "meet"@[2021-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + p := entry.p + if err := g.TriplesForPredicate(ctx, p, entry.lo, trpls); err != nil { + t.Fatalf("g.TriplesForPredicate(%s, %s) = %v; want nil", p, entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.TriplesForPredicate(%s, %s) retrieved unexpected %s", p, entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.TriplesForPredicate(%s, %s) failed to retrieve some expected elements: %v", p, entry.lo, entry.want) + } } } @@ -624,7 +1024,7 @@ func TestTriplesForObject(t *testing.T) { } } -func TestTriplesForObjectLatestTemporal(t *testing.T) { +func TestTriplesForObjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -650,6 +1050,60 @@ func TestTriplesForObjectLatestTemporal(t *testing.T) { } } +func TestTriplesForObjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + o *triple.Object + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "bob")), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + o: triple.NewPredicateObject(testutil.MustBuildPredicate(t, `"meet"@[2020-04-10T04:21:00Z]`)), + want: map[string]int{`/_ "_predicate"@[] "meet"@[2020-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + o := entry.o + if err := g.TriplesForObject(ctx, o, entry.lo, trpls); err != nil { + t.Fatalf("g.TriplesForObject(%s, %s) = %v; want nil", o, entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.TriplesForObject(%s, %s) retrieved unexpected %s", o, entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.TriplesForObject(%s, %s) failed to retrieve some expected elements: %v", o, entry.lo, entry.want) + } + } +} + func TestTriplesForObjectWithLimit(t *testing.T) { ts := createTriples(t, []string{ "/u\t\"kissed\"@[2015-01-01T00:00:00-09:00]\t/u", @@ -714,7 +1168,8 @@ func TestTriplesForSubjectAndPredicate(t *testing.T) { t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) failed to retrieve 3 predicates, got %d instead", ts[0].Subject(), ts[0].Predicate(), cnt) } } -func TestTriplesForSubjectAndPredicateLatestTemporal(t *testing.T) { + +func TestTriplesForSubjectAndPredicateLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -731,12 +1186,71 @@ func TestTriplesForSubjectAndPredicateLatestTemporal(t *testing.T) { cnt := 0 for rts := range trpls { cnt++ - if !reflect.DeepEqual(rts.Predicate().UUID(), ts[len(ts)-1].Predicate().UUID()) { - t.Errorf("g.PredicatesForObject(%s) failed to return a valid predicate; returned %s instead", ts[0].Object(), rts.Predicate()) + if !reflect.DeepEqual(rts.Predicate().UUID(), ts[0].Predicate().UUID()) { + t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) = %s for LatestAnchor; want %s", ts[0].Subject(), ts[0].Predicate(), rts.Predicate(), ts[0].Predicate()) } } if cnt != 1 { - t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) failed to retrieve 3 predicates, got %d instead", ts[0].Subject(), ts[0].Predicate(), cnt) + t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) retrieved %d predicates; want 1", ts[0].Subject(), ts[0].Predicate(), cnt) + } +} + +func TestTriplesForSubjectAndPredicateFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + s *node.Node + p *predicate.Predicate + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + p: testutil.MustBuildPredicate(t, `"meet"@[2012-04-10T04:21:00Z]`), + want: map[string]int{`/u "meet"@[2012-04-10T04:21:00Z] /u`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + p: testutil.MustBuildPredicate(t, `"meet"@[2014-04-10T04:21:00Z]`), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1, `/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + s: testutil.MustBuildNodeFromStrings(t, "/_", "bn"), + p: testutil.MustBuildPredicate(t, `"_predicate"@[]`), + want: map[string]int{`/_ "_predicate"@[] "meet"@[2021-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + s := entry.s + p := entry.p + if err := g.TriplesForSubjectAndPredicate(ctx, s, p, entry.lo, trpls); err != nil { + t.Fatalf("g.TriplesForSubjectAndPredicate(%s, %s, %s) = %v; want nil", s, p, entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.TriplesForSubjectAndPredicate(%s, %s, %s) retrieved unexpected %s", s, p, entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s, %s) failed to retrieve some expected elements: %v", s, p, entry.lo, entry.want) + } } } @@ -762,7 +1276,7 @@ func TestTriplesForPredicateAndObject(t *testing.T) { } } -func TestTriplesForPredicateAndObjectLatestTemporal(t *testing.T) { +func TestTriplesForPredicateAndObjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -779,12 +1293,71 @@ func TestTriplesForPredicateAndObjectLatestTemporal(t *testing.T) { cnt := 0 for rts := range trpls { cnt++ - if !reflect.DeepEqual(rts.Predicate().UUID(), ts[len(ts)-1].Predicate().UUID()) { - t.Errorf("g.PredicatesForObject(%s) failed to return a valid predicate; returned %s instead", ts[0].Object(), rts.Predicate()) + if !reflect.DeepEqual(rts.Predicate().UUID(), ts[0].Predicate().UUID()) { + t.Errorf("g.TriplesForPredicateAndObject(%s, %s) = %s for LatestAnchor; want %s", ts[0].Predicate(), ts[0].Object(), rts.Predicate(), ts[0].Predicate()) } } if cnt != 1 { - t.Errorf("g.TriplesForPredicateAndObject(%s, %s) failed to retrieve 1 predicates, got %d instead", ts[0].Predicate(), ts[0].Object(), cnt) + t.Errorf("g.TriplesForPredicateAndObject(%s, %s) retrieved %d predicates; want 1", ts[0].Predicate(), ts[0].Object(), cnt) + } +} + +func TestTriplesForPredicateAndObjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + p *predicate.Predicate + o *triple.Object + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2012-04-10T04:21:00Z]`), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{`/u "meet"@[2012-04-10T04:21:00Z] /u`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2014-04-10T04:21:00Z]`), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + p: testutil.MustBuildPredicate(t, `"_predicate"@[]`), + o: triple.NewPredicateObject(testutil.MustBuildPredicate(t, `"meet"@[2020-04-10T04:21:00Z]`)), + want: map[string]int{`/_ "_predicate"@[] "meet"@[2020-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + p := entry.p + o := entry.o + if err := g.TriplesForPredicateAndObject(ctx, p, o, entry.lo, trpls); err != nil { + t.Fatalf("g.TriplesForPredicateAndObject(%s, %s, %s) = %v; want nil", p, o, entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.TriplesForPredicateAndObject(%s, %s, %s) retrieved unexpected %s", p, o, entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.TriplesForPredicateAndObject(%s, %s, %s) failed to retrieve some expected elements: %v", p, o, entry.lo, entry.want) + } } } @@ -827,7 +1400,7 @@ func TestTriples(t *testing.T) { } } -func TestTriplesLastestTemporal(t *testing.T) { +func TestTriplesLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -852,3 +1425,48 @@ func TestTriplesLastestTemporal(t *testing.T) { t.Errorf("g.TriplesForPredicateAndObject(%s, %s) failed to retrieve 1 predicates, got %d instead", ts[0].Predicate(), ts[0].Object(), cnt) } } + +func TestTriplesFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed failed to add test triples with error: %v", err) + } + + testTable := []struct { + lo *storage.LookupOptions + want map[string]int + }{ + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1, `/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + want: map[string]int{`/_ "_predicate"@[] "meet"@[2021-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + if err := g.Triples(ctx, entry.lo, trpls); err != nil { + t.Fatalf("g.Triples(%s) = %v; want nil", entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.Triples(%s) retrieved unexpected %s", entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.Triples(%s) failed to retrieve some expected elements: %v", entry.lo, entry.want) + } + } +} diff --git a/storage/storage.go b/storage/storage.go index e787fbd0..de903e63 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -22,6 +22,7 @@ import ( "strconv" "time" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/triple" "github.com/google/badwolf/triple/node" "github.com/google/badwolf/triple/predicate" @@ -43,6 +44,9 @@ type LookupOptions struct { // LatestAnchor only. If set, it will ignore the time boundaries provided and // just use the last available anchor. LatestAnchor bool + + // FilterOptions, if provided, represent the specifications for the filtering to be executed. + FilterOptions *filter.StorageOptions } // String returns a readable version of the LookupOptions instance. @@ -62,7 +66,7 @@ func (l *LookupOptions) String() string { b.WriteString("nil") } b.WriteString(fmt.Sprintf(", LatestAnchor=%v", l.LatestAnchor)) - b.WriteString(">") + b.WriteString(fmt.Sprintf(", FilterOptions=%s>", l.FilterOptions)) return b.String() }