diff --git a/bql/planner/data_access.go b/bql/planner/data_access.go index 59e5b5ef..9aacd627 100644 --- a/bql/planner/data_access.go +++ b/bql/planner/data_access.go @@ -45,9 +45,10 @@ var _ error = (*skippableError)(nil) // provided graph clause. func updateTimeBounds(lo *storage.LookupOptions, cls *semantic.GraphClause) *storage.LookupOptions { nlo := &storage.LookupOptions{ - MaxElements: lo.MaxElements, - LowerAnchor: lo.LowerAnchor, - UpperAnchor: lo.UpperAnchor, + MaxElements: lo.MaxElements, + LowerAnchor: lo.LowerAnchor, + UpperAnchor: lo.UpperAnchor, + FilterOptions: lo.FilterOptions, } if cls.PLowerBound != nil { if lo.LowerAnchor == nil || (lo.LowerAnchor != nil && cls.PLowerBound.After(*lo.LowerAnchor)) { diff --git a/bql/planner/filter/filter.go b/bql/planner/filter/filter.go new file mode 100644 index 00000000..3311757e --- /dev/null +++ b/bql/planner/filter/filter.go @@ -0,0 +1,88 @@ +// Copyright 2020 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package filter isolates core FILTER related implementation. +package filter + +import ( + "fmt" +) + +// Operation represents a filter operation supported in BadWolf. +type Operation int + +// List of supported filter operations. +const ( + Latest Operation = iota + 1 +) + +// Field represents the position of the semantic.GraphClause that will be operated by the filter at storage level. +type Field int + +// List of filter fields. +const ( + SubjectField Field = iota + 1 + PredicateField + ObjectField +) + +// SupportedOperations maps suported filter operation strings to their correspondant Operation. +// Note that the string keys here must be in lowercase letters only (for compatibility with the WhereFilterClauseHook). +var SupportedOperations = map[string]Operation{ + "latest": Latest, +} + +// StorageOptions represent the storage level specifications for the filtering to be executed. +// Operation below refers to the filter function being applied (eg: Latest), Field refers to the position of the graph clause it +// will be applied to (subject, predicate, or object) and Value, when specified, contains the second argument of the filter +// function (not applicable for all Operations - some like Latest do not use it while others like GreaterThan do, see Issue 129). +type StorageOptions struct { + Operation Operation + Field Field + Value string +} + +// String returns the string representation of Operation. +func (op Operation) String() string { + switch op { + case Latest: + return "latest" + default: + return fmt.Sprintf(`not defined filter operation "%d"`, op) + } +} + +// IsEmpty returns true if the Operation was not set yet. +func (op Operation) IsEmpty() bool { + return op == Operation(0) +} + +// String returns the string representation of Field. +func (f Field) String() string { + switch f { + case SubjectField: + return "subject field" + case PredicateField: + return "predicate field" + case ObjectField: + return "object field" + default: + return fmt.Sprintf(`not defined filter field "%d"`, f) + } +} + +// String returns the string representation of StorageOptions. +func (so *StorageOptions) String() string { + return fmt.Sprintf("%+v", *so) +} diff --git a/bql/planner/planner.go b/bql/planner/planner.go index 33ae06fd..0fb74240 100644 --- a/bql/planner/planner.go +++ b/bql/planner/planner.go @@ -28,6 +28,7 @@ import ( "sync" "github.com/google/badwolf/bql/lexer" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/bql/planner/tracer" "github.com/google/badwolf/bql/semantic" "github.com/google/badwolf/bql/table" @@ -266,7 +267,8 @@ type queryPlan struct { bndgs []string grfsNames []string grfs []storage.Graph - cls []*semantic.GraphClause + clauses []*semantic.GraphClause + filters []*semantic.FilterClause tbl *table.Table chanSize int tracer io.Writer @@ -292,7 +294,8 @@ func newQueryPlan(ctx context.Context, store storage.Store, stm *semantic.Statem store: store, bndgs: bs, grfsNames: stm.InputGraphNames(), - cls: stm.GraphPatternClauses(), + clauses: stm.GraphPatternClauses(), + filters: stm.FilterClauses(), tbl: t, chanSize: chanSize, tracer: w, @@ -639,28 +642,119 @@ func (p *queryPlan) filterOnExistence(ctx context.Context, cls *semantic.GraphCl return grp.Wait() } +// organizeClausesByBinding takes the graph clauses received as input and organize them in a map +// on which the keys are the bindings of these clauses. +func organizeClausesByBinding(clauses []*semantic.GraphClause) map[string][]*semantic.GraphClause { + clausesByBinding := map[string][]*semantic.GraphClause{} + for _, cls := range clauses { + for b := range cls.BindingsMap() { + clausesByBinding[b] = append(clausesByBinding[b], cls) + } + } + + return clausesByBinding +} + +// compatibleBindingsInClauseForFilterOperation returns a function that, for each given clause, returns the bindings that are +// compatible with the specified filter operation. +func compatibleBindingsInClauseForFilterOperation(operation filter.Operation) (compatibleBindingsInClause func(cls *semantic.GraphClause) (bindingsByField map[filter.Field]map[string]bool), err error) { + switch operation { + case filter.Latest: + compatibleBindingsInClause = func(cls *semantic.GraphClause) (bindingsByField map[filter.Field]map[string]bool) { + bindingsByField = map[filter.Field]map[string]bool{ + filter.PredicateField: {cls.PBinding: true, cls.PAlias: true}, + filter.ObjectField: {cls.OBinding: true, cls.OAlias: true}, + } + return bindingsByField + } + return compatibleBindingsInClause, nil + default: + return nil, fmt.Errorf("filter function %q has no bindings in clause specified for it (planner level)", operation) + } +} + +// organizeFilterOptionsByClause processes all the given filters and organize them in a map that has as keys the +// clauses to which they must be applied. +func organizeFilterOptionsByClause(filters []*semantic.FilterClause, clauses []*semantic.GraphClause) (map[*semantic.GraphClause]*filter.StorageOptions, error) { + clausesByBinding := organizeClausesByBinding(clauses) + filterOptionsByClause := map[*semantic.GraphClause]*filter.StorageOptions{} + + for _, f := range filters { + if _, ok := clausesByBinding[f.Binding]; !ok { + return nil, fmt.Errorf("binding %q referenced by filter clause %q does not exist in the graph pattern", f.Binding, f) + } + + compatibleBindingsInClause, err := compatibleBindingsInClauseForFilterOperation(f.Operation) + if err != nil { + return nil, err + } + for _, cls := range clausesByBinding[f.Binding] { + if _, ok := filterOptionsByClause[cls]; ok { + return nil, fmt.Errorf("multiple filters for the same graph clause or same binding are not supported at the moment") + } + + compatibleBindingsByField := compatibleBindingsInClause(cls) + filterBindingIsCompatible := false + for field, bndgs := range compatibleBindingsByField { + if bndgs[f.Binding] { + filterBindingIsCompatible = true + filterOptionsByClause[cls] = &filter.StorageOptions{ + Operation: f.Operation, + Field: field, + Value: f.Value, + } + break + } + } + if !filterBindingIsCompatible { + return nil, fmt.Errorf("binding %q occupies a position in graph clause %q that is incompatible with filter function %q", f.Binding, cls, f.Operation) + } + } + } + + return filterOptionsByClause, nil +} + +// addFilterOptions adds FilterOptions to lookup options if the given clause has bindings for which +// filters were defined (organized in filterOptionsByClause). +func addFilterOptions(lo *storage.LookupOptions, cls *semantic.GraphClause, filterOptionsByClause map[*semantic.GraphClause]*filter.StorageOptions) { + if _, ok := filterOptionsByClause[cls]; ok { + lo.FilterOptions = filterOptionsByClause[cls] + } +} + +// resetFilterOptions resets FilterOptions in lookup options to nil. +func resetFilterOptions(lo *storage.LookupOptions) { + lo.FilterOptions = (*filter.StorageOptions)(nil) +} + // processGraphPattern process the query graph pattern to retrieve the // data from the specified graphs. func (p *queryPlan) processGraphPattern(ctx context.Context, lo *storage.LookupOptions) error { tracer.Trace(p.tracer, func() *tracer.Arguments { var res []string - for i, cls := range p.cls { + for i, cls := range p.clauses { res = append(res, fmt.Sprintf("Clause %d to process: %v", i, cls)) } return &tracer.Arguments{ Msgs: res, } }) - for i, c := range p.cls { - i, cls := i, *c + + filterOptionsByClause, err := organizeFilterOptionsByClause(p.filters, p.clauses) + if err != nil { + return err + } + for i, cls := range p.clauses { tracer.Trace(p.tracer, func() *tracer.Arguments { return &tracer.Arguments{ - Msgs: []string{fmt.Sprintf("Processing clause %d: %v", i, &cls)}, + Msgs: []string{fmt.Sprintf("Processing clause %d: %v", i, cls)}, } }) - // The current planner is based on naively executing clauses by - // specificity. - unresolvable, err := p.processClause(ctx, &cls, lo) + + addFilterOptions(lo, cls, filterOptionsByClause) + unresolvable, err := p.processClause(ctx, cls, lo) + resetFilterOptions(lo) if err != nil { return err } @@ -669,6 +763,7 @@ func (p *queryPlan) processGraphPattern(ctx context.Context, lo *storage.LookupO return nil } } + return nil } @@ -876,11 +971,17 @@ func (p *queryPlan) String(ctx context.Context) string { b.WriteString("using store(\"") b.WriteString(p.store.Name(nil)) b.WriteString(fmt.Sprintf("\") graphs %v\nresolve\n", p.grfsNames)) - for _, c := range p.cls { + for _, c := range p.clauses { b.WriteString("\t") b.WriteString(c.String()) b.WriteString("\n") } + b.WriteString("with filters\n") + for _, f := range p.filters { + b.WriteString("\t") + b.WriteString(f.String()) + b.WriteString("\n") + } b.WriteString("project results using\n") for _, p := range p.stm.Projection() { b.WriteString("\t") diff --git a/bql/planner/planner_test.go b/bql/planner/planner_test.go index 0ed651f8..2135ccfd 100644 --- a/bql/planner/planner_test.go +++ b/bql/planner/planner_test.go @@ -39,6 +39,8 @@ const ( /u "bought"@[2016-02-01T00:00:00-08:00] /c /u "bought"@[2016-03-01T00:00:00-08:00] /c /u "bought"@[2016-04-01T00:00:00-08:00] /c + /u "bought"@[2016-01-01T00:00:00-08:00] /c + /u "bought"@[2016-04-01T00:00:00-08:00] /c /c "is_a"@[] /t /c "is_a"@[] /t /c "is_a"@[] /t @@ -48,6 +50,7 @@ const ( /l "predicate"@[] "turned"@[2016-03-01T00:00:00-08:00] /l "predicate"@[] "turned"@[2016-04-01T00:00:00-08:00] /l "predicate"@[] "immutable_predicate"@[] + /l "predicate"@[] "turned"@[2016-04-01T00:00:00-08:00] /u "height_cm"@[] "174"^^type:int64 /u "tag"@[] "abc"^^type:text /u "height_cm"@[] "151"^^type:int64 @@ -840,7 +843,7 @@ func TestPlannerQuery(t *testing.T) { } HAVING (?p_id < "in"^^type:text) AND (?time > 2016-02-01T00:00:00-08:00);`, nBindings: 4, - nRows: 2, + nRows: 3, }, { q: `SELECT ?p, ?time @@ -958,7 +961,7 @@ func TestPlannerQuery(t *testing.T) { ?s ?p ?o AT ?o_time };`, nBindings: 3, - nRows: 4, + nRows: 5, }, { q: `SELECT ?s, ?o, ?o_time @@ -978,7 +981,131 @@ func TestPlannerQuery(t *testing.T) { ?s ?p "turned"@[?o_time] };`, nBindings: 2, - nRows: 4, + nRows: 5, + }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /u ?p ?o . + FILTER latest(?p) + };`, + nBindings: 2, + nRows: 1, + }, + { + q: `SELECT ?p_alias, ?o + FROM ?test + WHERE { + /u ?p AS ?p_alias ?o . + FILTER latest(?p_alias) + };`, + nBindings: 2, + nRows: 1, + }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /l ?p ?o . + FILTER latest(?o) + };`, + nBindings: 2, + nRows: 1, + }, + { + q: `SELECT ?p, ?o_alias + FROM ?test + WHERE { + /l ?p ?o AS ?o_alias . + FILTER latest(?o_alias) + };`, + nBindings: 2, + nRows: 1, + }, + { + q: `SELECT ?p1, ?p2 + FROM ?test + WHERE { + /u ?p1 ?o1 . + /item/book<000> ?p2 ?o2 . + FILTER latest(?p1) . + FILTER latest(?p2) + };`, + nBindings: 2, + nRows: 1, + }, + { + q: `SELECT ?s, ?p, ?o + FROM ?test + WHERE { + ?s ?p ?o . + FILTER latest(?p) + };`, + nBindings: 3, + nRows: 3, + }, + { + q: `SELECT ?s, ?p, ?o + FROM ?test + WHERE { + ?s ?p ?o . + FILTER latest(?o) + };`, + nBindings: 3, + nRows: 2, + }, + { + q: `SELECT ?s, ?p_alias, ?o + FROM ?test + WHERE { + ?s "bought"@[2016-03-01T00:00:00-08:00] AS ?p_alias ?o . + FILTER latest(?p_alias) + };`, + nBindings: 3, + nRows: 1, + }, + { + q: `SELECT ?s, ?p, ?o_alias + FROM ?test + WHERE { + ?s ?p "turned"@[2016-03-01T00:00:00-08:00] AS ?o_alias . + FILTER latest(?o_alias) + };`, + nBindings: 3, + nRows: 1, + }, + { + q: `SELECT ?s, ?p, ?o + FROM ?test + WHERE { + ?s "bought"@[?time] ?o . + OPTIONAL { ?s ?p ?o } . + FILTER latest(?p) + };`, + nBindings: 3, + nRows: 6, + }, + { + q: `SELECT ?p + FROM ?test + WHERE { + /u ?p ?o1 . + /u ?p ?o2 . + FILTER latest(?p) + };`, + nBindings: 1, + nRows: 1, + }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /u ?p ?o . + FILTER lAtEsT(?p) . + };`, + nBindings: 2, + nRows: 1, }, } @@ -1065,6 +1192,40 @@ func TestPlannerQueryError(t *testing.T) { } HAVING ?s > /u;`, }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /u ?p ?o . + FILTER latest(?p) . + FILTER latest(?p) + };`, + }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /l ?p ?o . + FILTER latest(?p) . + FILTER latest(?o) + };`, + }, + { + q: `SELECT ?p, ?o + FROM ?test + WHERE { + /u ?p ?o . + FILTER latest(?b_not_exist) + };`, + }, + { + q: `SELECT ?s, ?p, ?o + FROM ?test + WHERE { + ?s ID ?sID ?p ?o . + FILTER latest(?sID) + };`, + }, } s, ctx := memory.NewStore(), context.Background() diff --git a/bql/semantic/hooks.go b/bql/semantic/hooks.go index 76b5b7dc..f11e52f3 100644 --- a/bql/semantic/hooks.go +++ b/bql/semantic/hooks.go @@ -21,6 +21,7 @@ import ( "time" "github.com/google/badwolf/bql/lexer" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/bql/table" "github.com/google/badwolf/triple" "github.com/google/badwolf/triple/literal" @@ -671,13 +672,49 @@ func whereObjectClause() ElementHook { return hook } +// addOperationToWorkingFilter takes the filter operation in its string format and tries to add the +// correspondent filter.Operation to workingFilter. +func addOperationToWorkingFilter(op string, workingFilter *FilterClause) error { + if workingFilter == nil { + return fmt.Errorf("could not add filter function %q to nil filter clause (which is still nil probably because a call to st.ResetWorkingFilterClause was not made before start processing the first filter clause)", op) + } + if !workingFilter.Operation.IsEmpty() { + return fmt.Errorf("invalid filter function %q on filter clause since already set to %q", op, workingFilter.Operation) + } + lowercaseOp := strings.ToLower(op) + if _, ok := filter.SupportedOperations[lowercaseOp]; !ok { + return fmt.Errorf("filter function %q on filter clause is not supported", op) + } + + workingFilter.Operation = filter.SupportedOperations[lowercaseOp] + return nil +} + +// addBindingToWorkingFilter takes the given binding and tries to add it to workingFilter. +func addBindingToWorkingFilter(bndg string, workingFilter *FilterClause) error { + if workingFilter == nil { + return fmt.Errorf("could not add binding %q to nil filter clause (which is still nil probably because a call to st.ResetWorkingFilterClause was not made before start processing the first filter clause)", bndg) + } + if workingFilter.Operation.IsEmpty() { + return fmt.Errorf("could not add binding %q to a filter clause that does not have a filter function previously set", bndg) + } + if workingFilter.Binding != "" { + return fmt.Errorf("invalid binding %q on filter clause since already set to %q", bndg, workingFilter.Binding) + } + + workingFilter.Binding = bndg + return nil +} + +// isValidFilterClause returns true if the given filter clause is valid and complete. +func isValidFilterClause(f *FilterClause) bool { + return f != nil && !f.Operation.IsEmpty() && f.Binding != "" +} + // whereFilterClause returns an element hook that updates the working filter clause and, // if the filter clause is complete, populates the filters list of the statement. func whereFilterClause() ElementHook { var hook ElementHook - supportedFilterFunctions := map[string]bool{ - "latest": true, - } hook = func(st *Statement, ce ConsumedElement) (ElementHook, error) { if ce.IsSymbol() { @@ -685,35 +722,22 @@ func whereFilterClause() ElementHook { } tkn := ce.Token() - currFilter := st.WorkingFilter() switch tkn.Type { case lexer.ItemFilterFunction: - if currFilter == nil { - return nil, fmt.Errorf("could not add filter function %q to nil filter clause", tkn.Text) - } - if currFilter.Operation != "" { - return nil, fmt.Errorf("invalid filter function %q on filter clause since already set to %q", tkn.Text, currFilter.Operation) - } - if !supportedFilterFunctions[tkn.Text] { - return nil, fmt.Errorf("filter function %q on filter clause is not supported", tkn.Text) + err := addOperationToWorkingFilter(tkn.Text, st.WorkingFilter()) + if err != nil { + return nil, err } - currFilter.Operation = tkn.Text return hook, nil case lexer.ItemBinding: - if currFilter == nil { - return nil, fmt.Errorf("could not add binding %q to nil filter clause", tkn.Text) - } - if currFilter.Operation == "" { - return nil, fmt.Errorf("could not add binding %q to a filter clause that does not have a filter function previously set", tkn.Text) - } - if currFilter.Binding != "" { - return nil, fmt.Errorf("invalid binding %q on filter clause since already set to %q", tkn.Text, currFilter.Binding) + err := addBindingToWorkingFilter(tkn.Text, st.WorkingFilter()) + if err != nil { + return nil, err } - currFilter.Binding = tkn.Text return hook, nil case lexer.ItemRPar: - if currFilter == nil || currFilter.Operation == "" || currFilter.Binding == "" { - return nil, fmt.Errorf("could not add invalid working filter %q to the statement filters list", currFilter) + if !isValidFilterClause(st.WorkingFilter()) { + return nil, fmt.Errorf("could not add invalid working filter %q to the statement filters list", st.WorkingFilter()) } st.AddWorkingFilterClause() } diff --git a/bql/semantic/hooks_test.go b/bql/semantic/hooks_test.go index b9dd18fe..4d1a0a3d 100644 --- a/bql/semantic/hooks_test.go +++ b/bql/semantic/hooks_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/google/badwolf/bql/lexer" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/bql/table" "github.com/google/badwolf/storage" "github.com/google/badwolf/triple" @@ -223,7 +224,7 @@ func TestWhereFilterClauseHook(t *testing.T) { NewConsumedSymbol("FOO"), }, want: &FilterClause{ - Operation: "latest", + Operation: filter.Latest, Binding: "?p", }, }, @@ -253,7 +254,7 @@ func TestWhereFilterClauseHook(t *testing.T) { NewConsumedSymbol("FOO"), }, want: &FilterClause{ - Operation: "latest", + Operation: filter.Latest, Binding: "?o", }, }, diff --git a/bql/semantic/semantic.go b/bql/semantic/semantic.go index fe556d0a..c753e1be 100644 --- a/bql/semantic/semantic.go +++ b/bql/semantic/semantic.go @@ -27,6 +27,7 @@ import ( "time" "github.com/google/badwolf/bql/lexer" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/bql/table" "github.com/google/badwolf/storage" "github.com/google/badwolf/triple" @@ -146,11 +147,11 @@ type GraphClause struct { } // FilterClause represents a FILTER clause inside WHERE. -// Operation below refers to the filter function being applied (eg: "latest"), Binding refers to the binding it +// Operation below refers to the filter function being applied (eg: Latest), Binding refers to the binding it // will be applied to and Value, when specified, contains the second argument of the filter function (not applicable for all -// Operations - some like "latest" do not use it while others like "greaterThan" do, see Issue 129). +// Operations - some like Latest do not use it while others like GreaterThan do, see Issue 129). type FilterClause struct { - Operation string + Operation filter.Operation Binding string Value string } diff --git a/bql/semantic/semantic_test.go b/bql/semantic/semantic_test.go index 5bcd16c4..0fe5e60a 100644 --- a/bql/semantic/semantic_test.go +++ b/bql/semantic/semantic_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/triple" "github.com/google/badwolf/triple/literal" "github.com/google/badwolf/triple/node" @@ -274,7 +275,7 @@ func TestFilterClauseManipulation(t *testing.T) { t.Run("add workingFilter success", func(t *testing.T) { wf := st.WorkingFilter() - *wf = FilterClause{Operation: "latest", Binding: "?p"} + *wf = FilterClause{Operation: filter.Latest, Binding: "?p"} st.AddWorkingFilterClause() if got, want := len(st.FilterClauses()), 1; got != want { t.Fatalf(`len(semantic.Statement.FilterClauses()) = %d for statement "%v"; want %d`, got, st, want) @@ -360,7 +361,7 @@ func TestIsEmptyFilterClause(t *testing.T) { want: true, }, { - in: &FilterClause{Operation: "latest", Binding: "?p"}, + in: &FilterClause{Operation: filter.Latest, Binding: "?p"}, want: false, }, } diff --git a/storage/memoization/memoization_test.go b/storage/memoization/memoization_test.go index 05d9ffa3..6c2718ef 100644 --- a/storage/memoization/memoization_test.go +++ b/storage/memoization/memoization_test.go @@ -31,7 +31,7 @@ import ( ) func TestCombinedUUID(t *testing.T) { - want := "op:9dae52f4-9b35-5d5f-bd8e-195d4b16fc30:00000000-0000-0000-0000-000000000000:00000000-0000-0000-0000-000000000000" + want := "op:420c34d3-9f56-5191-89de-57468c3e6a93:00000000-0000-0000-0000-000000000000:00000000-0000-0000-0000-000000000000" if got := combinedUUID("op", storage.DefaultLookup, uuid.NIL, uuid.NIL); got != want { t.Errorf("combinedUUID returned the wrong value; got %v, want %v", got, want) } diff --git a/storage/memory/memory.go b/storage/memory/memory.go index bcf3cb40..0fc88821 100644 --- a/storage/memory/memory.go +++ b/storage/memory/memory.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/storage" "github.com/google/badwolf/triple" "github.com/google/badwolf/triple/node" @@ -274,6 +275,63 @@ func (c *checker) CheckAndUpdate(p *predicate.Predicate) bool { return true } +// latestFilter executes the latest filter operation over memoryTriples following filterOptions. +func latestFilter(memoryTriples map[string]*triple.Triple, pQuery *predicate.Predicate, filterOptions *filter.StorageOptions) (map[string]*triple.Triple, error) { + if filterOptions.Field != filter.PredicateField && filterOptions.Field != filter.ObjectField { + return nil, fmt.Errorf("invalid field %q for %q filter operation, can accept only %q or %q", filterOptions.Field, filter.Latest, filter.PredicateField, filter.ObjectField) + } + + lastTA := make(map[string]*time.Time) + trps := make(map[string]map[string]*triple.Triple) + for _, t := range memoryTriples { + if pQuery != nil && pQuery.String() != t.Predicate().String() { + continue + } + + var p *predicate.Predicate + if filterOptions.Field == filter.PredicateField { + p = t.Predicate() + } else if pObj, err := t.Object().Predicate(); filterOptions.Field == filter.ObjectField && err == nil { + p = pObj + } else { + continue + } + if p.Type() != predicate.Temporal { + continue + } + + ppUUID := p.PartialUUID().String() + ta, err := p.TimeAnchor() + if err != nil { + return nil, err + } + if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { + trps[ppUUID] = map[string]*triple.Triple{t.UUID().String(): t} + lastTA[ppUUID] = ta + } else if ta.Sub(*lta) == 0 { + trps[ppUUID][t.UUID().String()] = t + } + } + + trpsByUUID := make(map[string]*triple.Triple) + for _, m := range trps { + for tUUID, t := range m { + trpsByUUID[tUUID] = t + } + } + return trpsByUUID, nil +} + +// executeFilter executes the proper filter operation over memoryTriples following the specifications given in filterOptions. +func executeFilter(memoryTriples map[string]*triple.Triple, pQuery *predicate.Predicate, filterOptions *filter.StorageOptions) (map[string]*triple.Triple, error) { + switch filterOptions.Operation { + case filter.Latest: + return latestFilter(memoryTriples, pQuery, filterOptions) + default: + return nil, fmt.Errorf("filter operation %q not supported in the driver", filterOptions.Operation) + } +} + // Objects published the objects for the give object and predicate to the // provided channel. func (m *memory) Objects(ctx context.Context, s *node.Node, p *predicate.Predicate, lo *storage.LookupOptions, objs chan<- *triple.Object) error { @@ -289,21 +347,22 @@ func (m *memory) Objects(ctx context.Context, s *node.Node, p *predicate.Predica defer close(objs) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxSP[spIdx] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxSP[spIdx], p, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -318,6 +377,7 @@ func (m *memory) Objects(ctx context.Context, s *node.Node, p *predicate.Predica objs <- t.Object() } } + return nil } @@ -327,6 +387,7 @@ func (m *memory) Subjects(ctx context.Context, p *predicate.Predicate, o *triple if subjs == nil { return fmt.Errorf("cannot provide an empty channel") } + pUUID := UUIDToByteString(p.PartialUUID()) oUUID := UUIDToByteString(o.UUID()) poIdx := pUUID + oUUID @@ -335,21 +396,22 @@ func (m *memory) Subjects(ctx context.Context, p *predicate.Predicate, o *triple defer close(subjs) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxPO[poIdx] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxPO[poIdx], p, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -364,6 +426,7 @@ func (m *memory) Subjects(ctx context.Context, p *predicate.Predicate, o *triple subjs <- t.Subject() } } + return nil } @@ -373,6 +436,7 @@ func (m *memory) PredicatesForSubjectAndObject(ctx context.Context, s *node.Node if prds == nil { return fmt.Errorf("cannot provide an empty channel") } + sUUID := UUIDToByteString(s.UUID()) oUUID := UUIDToByteString(o.UUID()) soIdx := sUUID + oUUID @@ -381,21 +445,22 @@ func (m *memory) PredicatesForSubjectAndObject(ctx context.Context, s *node.Node defer close(prds) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxSO[soIdx] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxSO[soIdx], nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -410,6 +475,7 @@ func (m *memory) PredicatesForSubjectAndObject(ctx context.Context, s *node.Node prds <- t.Predicate() } } + return nil } @@ -419,27 +485,29 @@ func (m *memory) PredicatesForSubject(ctx context.Context, s *node.Node, lo *sto if prds == nil { return fmt.Errorf("cannot provide an empty channel") } + sUUID := UUIDToByteString(s.UUID()) m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(prds) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxS[sUUID] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxS[sUUID], nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -454,6 +522,7 @@ func (m *memory) PredicatesForSubject(ctx context.Context, s *node.Node, lo *sto prds <- t.Predicate() } } + return nil } @@ -463,27 +532,29 @@ func (m *memory) PredicatesForObject(ctx context.Context, o *triple.Object, lo * if prds == nil { return fmt.Errorf("cannot provide an empty channel") } + oUUID := UUIDToByteString(o.UUID()) m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(prds) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxO[oUUID] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxO[oUUID], nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -498,6 +569,7 @@ func (m *memory) PredicatesForObject(ctx context.Context, o *triple.Object, lo * prds <- t.Predicate() } } + return nil } @@ -507,27 +579,29 @@ func (m *memory) TriplesForSubject(ctx context.Context, s *node.Node, lo *storag if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + sUUID := UUIDToByteString(s.UUID()) m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxS[sUUID] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxS[sUUID], nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -542,6 +616,7 @@ func (m *memory) TriplesForSubject(ctx context.Context, s *node.Node, lo *storag trpls <- t } } + return nil } @@ -551,27 +626,29 @@ func (m *memory) TriplesForPredicate(ctx context.Context, p *predicate.Predicate if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + pUUID := UUIDToByteString(p.PartialUUID()) m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxP[pUUID] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxP[pUUID], p, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -586,6 +663,7 @@ func (m *memory) TriplesForPredicate(ctx context.Context, p *predicate.Predicate trpls <- t } } + return nil } @@ -595,27 +673,29 @@ func (m *memory) TriplesForObject(ctx context.Context, o *triple.Object, lo *sto if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + oUUID := UUIDToByteString(o.UUID()) m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxO[oUUID] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxO[oUUID], nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -630,6 +710,7 @@ func (m *memory) TriplesForObject(ctx context.Context, o *triple.Object, lo *sto trpls <- t } } + return nil } @@ -639,6 +720,7 @@ func (m *memory) TriplesForSubjectAndPredicate(ctx context.Context, s *node.Node if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + sUUID := UUIDToByteString(s.UUID()) pUUID := UUIDToByteString(p.PartialUUID()) spIdx := sUUID + pUUID @@ -647,21 +729,22 @@ func (m *memory) TriplesForSubjectAndPredicate(ctx context.Context, s *node.Node defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxSP[spIdx] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxSP[spIdx], p, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -676,6 +759,7 @@ func (m *memory) TriplesForSubjectAndPredicate(ctx context.Context, s *node.Node trpls <- t } } + return nil } @@ -685,6 +769,7 @@ func (m *memory) TriplesForPredicateAndObject(ctx context.Context, p *predicate. if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + pUUID := UUIDToByteString(p.PartialUUID()) oUUID := UUIDToByteString(o.UUID()) poIdx := pUUID + oUUID @@ -693,21 +778,22 @@ func (m *memory) TriplesForPredicateAndObject(ctx context.Context, p *predicate. defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idxPO[poIdx] { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idxPO[poIdx], p, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -722,6 +808,7 @@ func (m *memory) TriplesForPredicateAndObject(ctx context.Context, p *predicate. trpls <- t } } + return nil } @@ -740,26 +827,28 @@ func (m *memory) Triples(ctx context.Context, lo *storage.LookupOptions, trpls c if trpls == nil { return fmt.Errorf("cannot provide an empty channel") } + m.rwmu.RLock() defer m.rwmu.RUnlock() defer close(trpls) if lo.LatestAnchor { - lastTA := make(map[string]*time.Time) - trps := make(map[string]*triple.Triple) - for _, t := range m.idx { - p := t.Predicate() - ppUUID := p.PartialUUID().String() - if p.Type() == predicate.Temporal { - ta, err := p.TimeAnchor() - if err != nil { - return err - } - if lta := lastTA[ppUUID]; lta == nil || ta.Sub(*lta) > 0 { - trps[ppUUID] = t - lastTA[ppUUID] = ta - } - } + if lo.FilterOptions != nil { + return fmt.Errorf("cannot have LatestAnchor and FilterOptions used at the same time inside lookup options") + } + lo.FilterOptions = &filter.StorageOptions{ + Operation: filter.Latest, + Field: filter.PredicateField, + } + // To guarantee that "lo.FilterOptions" will be cleaned at the driver level, since it was artificially created at the driver level for "LatestAnchor". + defer func() { + lo.FilterOptions = (*filter.StorageOptions)(nil) + }() + } + if lo.FilterOptions != nil { + trps, err := executeFilter(m.idx, nil, lo.FilterOptions) + if err != nil { + return err } for _, trp := range trps { if trp != nil { @@ -774,5 +863,6 @@ func (m *memory) Triples(ctx context.Context, lo *storage.LookupOptions, trpls c trpls <- t } } + return nil } diff --git a/storage/memory/memory_test.go b/storage/memory/memory_test.go index 8decd9ff..760a7c0b 100644 --- a/storage/memory/memory_test.go +++ b/storage/memory/memory_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/storage" "github.com/google/badwolf/tools/testutil" "github.com/google/badwolf/triple" @@ -237,6 +238,19 @@ func getTestTemporalTriples(t *testing.T) []*triple.Triple { }) } +func getTestTriplesFilter(t *testing.T) []*triple.Triple { + return createTriples(t, []string{ + "/u\t\"meet\"@[2012-04-10T04:21:00Z]\t/u", + "/u\t\"meet\"@[2013-04-10T04:21:00Z]\t/u", + "/u\t\"meet\"@[2014-04-10T04:21:00Z]\t/u", + "/u\t\"meet\"@[2014-04-10T04:21:00Z]\t/u", + "/u\t\"parent_of\"@[]\t/u", + "/_\t\"_predicate\"@[]\t\"meet\"@[2020-04-10T04:21:00Z]", + "/_\t\"_predicate\"@[]\t\"meet\"@[2021-04-10T04:21:00Z]", + "/_\t\"_predicate\"@[]\t\"height_cm\"@[]", + }) +} + func TestAddRemoveTriples(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -275,7 +289,7 @@ func TestObjects(t *testing.T) { } } -func TestObjectsLastestTemporal(t *testing.T) { +func TestObjectsLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -303,6 +317,71 @@ func TestObjectsLastestTemporal(t *testing.T) { } } +func TestObjectsFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + s *node.Node + p *predicate.Predicate + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + p: testutil.MustBuildPredicate(t, `"meet"@[2012-04-10T04:21:00Z]`), + want: map[string]int{"/u": 1}, + }, + { + id: "FILTER latest predicate duplicate timestamp", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + p: testutil.MustBuildPredicate(t, `"meet"@[2014-04-10T04:21:00Z]`), + want: map[string]int{"/u": 1, "/u": 1}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + s: testutil.MustBuildNodeFromStrings(t, "/_", "bn"), + p: testutil.MustBuildPredicate(t, `"_predicate"@[]`), + want: map[string]int{`"meet"@[2021-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + os := make(chan *triple.Object, 100) + s := entry.s + p := entry.p + if err := g.Objects(ctx, s, p, entry.lo, os); err != nil { + t.Fatalf("g.Objects(%s, %s, %s) = %v; want nil", s, p, entry.lo, err) + } + for o := range os { + oStr := o.String() + if _, ok := entry.want[oStr]; !ok { + t.Fatalf("g.Objects(%s, %s, %s) retrieved unexpected %s", s, p, entry.lo, oStr) + } + entry.want[oStr] = entry.want[oStr] - 1 + if entry.want[oStr] == 0 { + delete(entry.want, oStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.Objects(%s, %s, %s) failed to retrieve some expected elements: %v", s, p, entry.lo, entry.want) + } + }) + } +} + func TestSubjects(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -329,7 +408,7 @@ func TestSubjects(t *testing.T) { } } -func TestSubjectsLatestTemporal(t *testing.T) { +func TestSubjectsLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -356,6 +435,71 @@ func TestSubjectsLatestTemporal(t *testing.T) { } } +func TestSubjectsFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + p *predicate.Predicate + o *triple.Object + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2012-04-10T04:21:00Z]`), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{"/u": 1}, + }, + { + id: "FILTER latest predicate duplicate timestamp", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2014-04-10T04:21:00Z]`), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{"/u": 1}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + p: testutil.MustBuildPredicate(t, `"_predicate"@[]`), + o: triple.NewPredicateObject(testutil.MustBuildPredicate(t, `"meet"@[2020-04-10T04:21:00Z]`)), + want: map[string]int{"/_": 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + ss := make(chan *node.Node, 100) + p := entry.p + o := entry.o + if err := g.Subjects(ctx, p, o, entry.lo, ss); err != nil { + t.Fatalf("g.Subjects(%s, %s, %s) = %v; want nil", p, o, entry.lo, err) + } + for s := range ss { + sStr := s.String() + if _, ok := entry.want[sStr]; !ok { + t.Fatalf("g.Subjects(%s, %s, %s) retrieved unexpected %s", p, o, entry.lo, sStr) + } + entry.want[sStr] = entry.want[sStr] - 1 + if entry.want[sStr] == 0 { + delete(entry.want, sStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.Subjects(%s, %s, %s) failed to retrieve some expected elements: %v", p, o, entry.lo, entry.want) + } + }) + } +} + func TestPredicatesForSubjectAndObject(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -380,7 +524,8 @@ func TestPredicatesForSubjectAndObject(t *testing.T) { t.Errorf("g.PredicatesForSubjectAndObject(%s, %s) failed to retrieve 1 predicate, got %d instead", ts[0].Subject(), ts[0].Object(), cnt) } } -func TestPredicatesForSubjectAndObjectLatestTemporal(t *testing.T) { + +func TestPredicatesForSubjectAndObjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -406,6 +551,64 @@ func TestPredicatesForSubjectAndObjectLatestTemporal(t *testing.T) { } } +func TestPredicatesForSubjectAndObjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + s *node.Node + o *triple.Object + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{`"meet"@[2014-04-10T04:21:00Z]`: 1}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + s: testutil.MustBuildNodeFromStrings(t, "/_", "bn"), + o: triple.NewPredicateObject(testutil.MustBuildPredicate(t, `"meet"@[2020-04-10T04:21:00Z]`)), + want: map[string]int{`"_predicate"@[]`: 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + pp := make(chan *predicate.Predicate, 100) + s := entry.s + o := entry.o + if err := g.PredicatesForSubjectAndObject(ctx, s, o, entry.lo, pp); err != nil { + t.Fatalf("g.PredicatesForSubjectAndObject(%s, %s, %s) = %v; want nil", s, o, entry.lo, err) + } + for p := range pp { + pStr := p.String() + if _, ok := entry.want[pStr]; !ok { + t.Fatalf("g.PredicatesForSubjectAndObject(%s, %s, %s) retrieved unexpected %s", s, o, entry.lo, pStr) + } + entry.want[pStr] = entry.want[pStr] - 1 + if entry.want[pStr] == 0 { + delete(entry.want, pStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.PredicatesForSubjectAndObject(%s, %s, %s) failed to retrieve some expected elements: %v", s, o, entry.lo, entry.want) + } + }) + } +} + func TestPredicatesForSubject(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -431,7 +634,7 @@ func TestPredicatesForSubject(t *testing.T) { } } -func TestPredicatesForSubjectLatestTemporal(t *testing.T) { +func TestPredicatesForSubjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -457,6 +660,60 @@ func TestPredicatesForSubjectLatestTemporal(t *testing.T) { } } +func TestPredicatesForSubjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + s *node.Node + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + want: map[string]int{`"meet"@[2014-04-10T04:21:00Z]`: 2}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + s: testutil.MustBuildNodeFromStrings(t, "/_", "bn"), + want: map[string]int{`"_predicate"@[]`: 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + pp := make(chan *predicate.Predicate, 100) + s := entry.s + if err := g.PredicatesForSubject(ctx, s, entry.lo, pp); err != nil { + t.Fatalf("g.PredicatesForSubject(%s, %s) = %v; want nil", s, entry.lo, err) + } + for p := range pp { + pStr := p.String() + if _, ok := entry.want[pStr]; !ok { + t.Fatalf("g.PredicatesForSubject(%s, %s) retrieved unexpected %s", s, entry.lo, pStr) + } + entry.want[pStr] = entry.want[pStr] - 1 + if entry.want[pStr] == 0 { + delete(entry.want, pStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.PredicatesForSubject(%s, %s) failed to retrieve some expected elements: %v", s, entry.lo, entry.want) + } + }) + } +} + func TestPredicatesForObject(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -481,7 +738,8 @@ func TestPredicatesForObject(t *testing.T) { t.Errorf("g.PredicatesForObject(%s) failed to retrieve 1 predicate, got %d instead", ts[0].Object(), cnt) } } -func TestPredicatesForObjectLatestTemporal(t *testing.T) { + +func TestPredicatesForObjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -507,6 +765,60 @@ func TestPredicatesForObjectLatestTemporal(t *testing.T) { } } +func TestPredicatesForObjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + o *triple.Object + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{`"meet"@[2014-04-10T04:21:00Z]`: 1}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + o: triple.NewPredicateObject(testutil.MustBuildPredicate(t, `"meet"@[2020-04-10T04:21:00Z]`)), + want: map[string]int{`"_predicate"@[]`: 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + pp := make(chan *predicate.Predicate, 100) + o := entry.o + if err := g.PredicatesForObject(ctx, o, entry.lo, pp); err != nil { + t.Fatalf("g.PredicatesForObject(%s, %s) = %v; want nil", o, entry.lo, err) + } + for p := range pp { + pStr := p.String() + if _, ok := entry.want[pStr]; !ok { + t.Fatalf("g.PredicatesForObject(%s, %s) retrieved unexpected %s", o, entry.lo, pStr) + } + entry.want[pStr] = entry.want[pStr] - 1 + if entry.want[pStr] == 0 { + delete(entry.want, pStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.PredicatesForObject(%s, %s) failed to retrieve some expected elements: %v", o, entry.lo, entry.want) + } + }) + } +} + func TestTriplesForSubject(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -529,7 +841,7 @@ func TestTriplesForSubject(t *testing.T) { } } -func TestTriplesForSubjectLatestTemporal(t *testing.T) { +func TestTriplesForSubjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -555,6 +867,60 @@ func TestTriplesForSubjectLatestTemporal(t *testing.T) { } } +func TestTriplesForSubjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + s *node.Node + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1, `/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + s: testutil.MustBuildNodeFromStrings(t, "/_", "bn"), + want: map[string]int{`/_ "_predicate"@[] "meet"@[2021-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + s := entry.s + if err := g.TriplesForSubject(ctx, s, entry.lo, trpls); err != nil { + t.Fatalf("g.TriplesForSubject(%s, %s) = %v; want nil", s, entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.TriplesForSubject(%s, %s) retrieved unexpected %s", s, entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.TriplesForSubject(%s, %s) failed to retrieve some expected elements: %v", s, entry.lo, entry.want) + } + }) + } +} + func TestTriplesForPredicate(t *testing.T) { ts, ctx := getTestTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") @@ -576,7 +942,8 @@ func TestTriplesForPredicate(t *testing.T) { t.Errorf("g.triplesForPredicate(%s) failed to retrieve 3 predicates, got %d instead", ts[0].Predicate(), cnt) } } -func TestTriplesForPredicateLatestTemporal(t *testing.T) { + +func TestTriplesForPredicateLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -593,12 +960,72 @@ func TestTriplesForPredicateLatestTemporal(t *testing.T) { cnt := 0 for rts := range trpls { cnt++ - if !reflect.DeepEqual(rts.Predicate().UUID(), ts[len(ts)-1].Predicate().UUID()) { - t.Errorf("g.PredicatesForObject(%s) failed to return a valid predicate; returned %s instead", ts[0].Object(), rts.Predicate()) + if !reflect.DeepEqual(rts.Predicate().UUID(), ts[0].Predicate().UUID()) { + t.Errorf("g.TriplesForPredicate(%s) = %s for LatestAnchor; want %s", ts[0].Predicate(), rts.Predicate(), ts[0].Predicate()) } } if cnt != 1 { - t.Errorf("g.triplesForPredicate(%s) failed to retrieve 3 predicates, got %d instead", ts[0].Predicate(), cnt) + t.Errorf("g.triplesForPredicate(%s) retrieved %d predicates; want 1", ts[0].Predicate(), cnt) + } +} + +func TestTriplesForPredicateFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + p *predicate.Predicate + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2012-04-10T04:21:00Z]`), + want: map[string]int{`/u "meet"@[2012-04-10T04:21:00Z] /u`: 1}, + }, + { + id: "FILTER latest predicate duplicate timestamp", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2014-04-10T04:21:00Z]`), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1, `/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + p: testutil.MustBuildPredicate(t, `"_predicate"@[]`), + want: map[string]int{`/_ "_predicate"@[] "meet"@[2021-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + p := entry.p + if err := g.TriplesForPredicate(ctx, p, entry.lo, trpls); err != nil { + t.Fatalf("g.TriplesForPredicate(%s, %s) = %v; want nil", p, entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.TriplesForPredicate(%s, %s) retrieved unexpected %s", p, entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.TriplesForPredicate(%s, %s) failed to retrieve some expected elements: %v", p, entry.lo, entry.want) + } + }) } } @@ -624,7 +1051,7 @@ func TestTriplesForObject(t *testing.T) { } } -func TestTriplesForObjectLatestTemporal(t *testing.T) { +func TestTriplesForObjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -650,6 +1077,60 @@ func TestTriplesForObjectLatestTemporal(t *testing.T) { } } +func TestTriplesForObjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + o *triple.Object + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + o: triple.NewPredicateObject(testutil.MustBuildPredicate(t, `"meet"@[2020-04-10T04:21:00Z]`)), + want: map[string]int{`/_ "_predicate"@[] "meet"@[2020-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + o := entry.o + if err := g.TriplesForObject(ctx, o, entry.lo, trpls); err != nil { + t.Fatalf("g.TriplesForObject(%s, %s) = %v; want nil", o, entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.TriplesForObject(%s, %s) retrieved unexpected %s", o, entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.TriplesForObject(%s, %s) failed to retrieve some expected elements: %v", o, entry.lo, entry.want) + } + }) + } +} + func TestTriplesForObjectWithLimit(t *testing.T) { ts := createTriples(t, []string{ "/u\t\"kissed\"@[2015-01-01T00:00:00-09:00]\t/u", @@ -714,7 +1195,8 @@ func TestTriplesForSubjectAndPredicate(t *testing.T) { t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) failed to retrieve 3 predicates, got %d instead", ts[0].Subject(), ts[0].Predicate(), cnt) } } -func TestTriplesForSubjectAndPredicateLatestTemporal(t *testing.T) { + +func TestTriplesForSubjectAndPredicateLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -731,12 +1213,77 @@ func TestTriplesForSubjectAndPredicateLatestTemporal(t *testing.T) { cnt := 0 for rts := range trpls { cnt++ - if !reflect.DeepEqual(rts.Predicate().UUID(), ts[len(ts)-1].Predicate().UUID()) { - t.Errorf("g.PredicatesForObject(%s) failed to return a valid predicate; returned %s instead", ts[0].Object(), rts.Predicate()) + if !reflect.DeepEqual(rts.Predicate().UUID(), ts[0].Predicate().UUID()) { + t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) = %s for LatestAnchor; want %s", ts[0].Subject(), ts[0].Predicate(), rts.Predicate(), ts[0].Predicate()) } } if cnt != 1 { - t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) failed to retrieve 3 predicates, got %d instead", ts[0].Subject(), ts[0].Predicate(), cnt) + t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s) retrieved %d predicates; want 1", ts[0].Subject(), ts[0].Predicate(), cnt) + } +} + +func TestTriplesForSubjectAndPredicateFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + s *node.Node + p *predicate.Predicate + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + p: testutil.MustBuildPredicate(t, `"meet"@[2012-04-10T04:21:00Z]`), + want: map[string]int{`/u "meet"@[2012-04-10T04:21:00Z] /u`: 1}, + }, + { + id: "FILTER latest predicate duplicate timestamp", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + s: testutil.MustBuildNodeFromStrings(t, "/u", "john"), + p: testutil.MustBuildPredicate(t, `"meet"@[2014-04-10T04:21:00Z]`), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1, `/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + s: testutil.MustBuildNodeFromStrings(t, "/_", "bn"), + p: testutil.MustBuildPredicate(t, `"_predicate"@[]`), + want: map[string]int{`/_ "_predicate"@[] "meet"@[2021-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + s := entry.s + p := entry.p + if err := g.TriplesForSubjectAndPredicate(ctx, s, p, entry.lo, trpls); err != nil { + t.Fatalf("g.TriplesForSubjectAndPredicate(%s, %s, %s) = %v; want nil", s, p, entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.TriplesForSubjectAndPredicate(%s, %s, %s) retrieved unexpected %s", s, p, entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.TriplesForSubjectAndPredicate(%s, %s, %s) failed to retrieve some expected elements: %v", s, p, entry.lo, entry.want) + } + }) } } @@ -762,7 +1309,7 @@ func TestTriplesForPredicateAndObject(t *testing.T) { } } -func TestTriplesForPredicateAndObjectLatestTemporal(t *testing.T) { +func TestTriplesForPredicateAndObjectLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -779,12 +1326,77 @@ func TestTriplesForPredicateAndObjectLatestTemporal(t *testing.T) { cnt := 0 for rts := range trpls { cnt++ - if !reflect.DeepEqual(rts.Predicate().UUID(), ts[len(ts)-1].Predicate().UUID()) { - t.Errorf("g.PredicatesForObject(%s) failed to return a valid predicate; returned %s instead", ts[0].Object(), rts.Predicate()) + if !reflect.DeepEqual(rts.Predicate().UUID(), ts[0].Predicate().UUID()) { + t.Errorf("g.TriplesForPredicateAndObject(%s, %s) = %s for LatestAnchor; want %s", ts[0].Predicate(), ts[0].Object(), rts.Predicate(), ts[0].Predicate()) } } if cnt != 1 { - t.Errorf("g.TriplesForPredicateAndObject(%s, %s) failed to retrieve 1 predicates, got %d instead", ts[0].Predicate(), ts[0].Object(), cnt) + t.Errorf("g.TriplesForPredicateAndObject(%s, %s) retrieved %d predicates; want 1", ts[0].Predicate(), ts[0].Object(), cnt) + } +} + +func TestTriplesForPredicateAndObjectFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + p *predicate.Predicate + o *triple.Object + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2012-04-10T04:21:00Z]`), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{`/u "meet"@[2012-04-10T04:21:00Z] /u`: 1}, + }, + { + id: "FILTER latest predicate duplicate timestamp", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + p: testutil.MustBuildPredicate(t, `"meet"@[2014-04-10T04:21:00Z]`), + o: triple.NewNodeObject(testutil.MustBuildNodeFromStrings(t, "/u", "mary")), + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + p: testutil.MustBuildPredicate(t, `"_predicate"@[]`), + o: triple.NewPredicateObject(testutil.MustBuildPredicate(t, `"meet"@[2020-04-10T04:21:00Z]`)), + want: map[string]int{`/_ "_predicate"@[] "meet"@[2020-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + p := entry.p + o := entry.o + if err := g.TriplesForPredicateAndObject(ctx, p, o, entry.lo, trpls); err != nil { + t.Fatalf("g.TriplesForPredicateAndObject(%s, %s, %s) = %v; want nil", p, o, entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.TriplesForPredicateAndObject(%s, %s, %s) retrieved unexpected %s", p, o, entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.TriplesForPredicateAndObject(%s, %s, %s) failed to retrieve some expected elements: %v", p, o, entry.lo, entry.want) + } + }) } } @@ -827,7 +1439,7 @@ func TestTriples(t *testing.T) { } } -func TestTriplesLastestTemporal(t *testing.T) { +func TestTriplesLatestAnchor(t *testing.T) { ts, ctx := getTestTemporalTriples(t), context.Background() g, _ := NewStore().NewGraph(ctx, "test") if err := g.AddTriples(ctx, ts); err != nil { @@ -852,3 +1464,53 @@ func TestTriplesLastestTemporal(t *testing.T) { t.Errorf("g.TriplesForPredicateAndObject(%s, %s) failed to retrieve 1 predicates, got %d instead", ts[0].Predicate(), ts[0].Object(), cnt) } } + +func TestTriplesFilter(t *testing.T) { + ts, ctx := getTestTriplesFilter(t), context.Background() + g, _ := NewStore().NewGraph(ctx, "test") + if err := g.AddTriples(ctx, ts); err != nil { + t.Fatalf("g.AddTriples(_) failed to add test triples with error: %v", err) + } + + testTable := []struct { + id string + lo *storage.LookupOptions + want map[string]int + }{ + { + id: "FILTER latest predicate", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.PredicateField}}, + want: map[string]int{`/u "meet"@[2014-04-10T04:21:00Z] /u`: 1, `/u "meet"@[2014-04-10T04:21:00Z] /u`: 1}, + }, + { + id: "FILTER latest object", + lo: &storage.LookupOptions{FilterOptions: &filter.StorageOptions{Operation: filter.Latest, Field: filter.ObjectField}}, + want: map[string]int{`/_ "_predicate"@[] "meet"@[2021-04-10T04:21:00Z]`: 1}, + }, + } + + for _, entry := range testTable { + t.Run(entry.id, func(t *testing.T) { + // To avoid blocking on the test we use a buffered channel of size 100. On a real + // usage of the driver you would like to call the graph operation on a separated + // goroutine using a sync.WaitGroup to collect the error code eventually. + trpls := make(chan *triple.Triple, 100) + if err := g.Triples(ctx, entry.lo, trpls); err != nil { + t.Fatalf("g.Triples(%s) = %v; want nil", entry.lo, err) + } + for trpl := range trpls { + tStr := trpl.String() + if _, ok := entry.want[tStr]; !ok { + t.Fatalf("g.Triples(%s) retrieved unexpected %s", entry.lo, tStr) + } + entry.want[tStr] = entry.want[tStr] - 1 + if entry.want[tStr] == 0 { + delete(entry.want, tStr) + } + } + if len(entry.want) != 0 { + t.Errorf("g.Triples(%s) failed to retrieve some expected elements: %v", entry.lo, entry.want) + } + }) + } +} diff --git a/storage/storage.go b/storage/storage.go index e787fbd0..de903e63 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -22,6 +22,7 @@ import ( "strconv" "time" + "github.com/google/badwolf/bql/planner/filter" "github.com/google/badwolf/triple" "github.com/google/badwolf/triple/node" "github.com/google/badwolf/triple/predicate" @@ -43,6 +44,9 @@ type LookupOptions struct { // LatestAnchor only. If set, it will ignore the time boundaries provided and // just use the last available anchor. LatestAnchor bool + + // FilterOptions, if provided, represent the specifications for the filtering to be executed. + FilterOptions *filter.StorageOptions } // String returns a readable version of the LookupOptions instance. @@ -62,7 +66,7 @@ func (l *LookupOptions) String() string { b.WriteString("nil") } b.WriteString(fmt.Sprintf(", LatestAnchor=%v", l.LatestAnchor)) - b.WriteString(">") + b.WriteString(fmt.Sprintf(", FilterOptions=%s>", l.FilterOptions)) return b.String() }