diff --git a/staging/operator-registry/cmd/opm/registry/serve.go b/staging/operator-registry/cmd/opm/registry/serve.go index 74957a2c41..ff402c58b3 100644 --- a/staging/operator-registry/cmd/opm/registry/serve.go +++ b/staging/operator-registry/cmd/opm/registry/serve.go @@ -90,12 +90,16 @@ func serveFunc(cmd *cobra.Command, args []string) error { return err } + if _, err := db.ExecContext(context.TODO(), `PRAGMA soft_heap_limit=1`); err != nil { + logger.WithError(err).Warnf("error setting soft heap limit for sqlite") + } + // migrate to the latest version if err := migrate(cmd, db); err != nil { logger.WithError(err).Warnf("couldn't migrate db") } - store := sqlite.NewSQLLiteQuerierFromDb(db) + store := sqlite.NewSQLLiteQuerierFromDb(db, sqlite.OmitManifests(true)) // sanity check that the db is available tables, err := store.ListTables(context.TODO()) diff --git a/staging/operator-registry/cmd/registry-server/main.go b/staging/operator-registry/cmd/registry-server/main.go index 9cd9e63ff3..529a02a9cb 100644 --- a/staging/operator-registry/cmd/registry-server/main.go +++ b/staging/operator-registry/cmd/registry-server/main.go @@ -91,12 +91,16 @@ func runCmdFunc(cmd *cobra.Command, args []string) error { return err } + if _, err := db.ExecContext(context.TODO(), `PRAGMA soft_heap_limit=1`); err != nil { + logger.WithError(err).Warnf("error setting soft heap limit for sqlite") + } + // migrate to the latest version if err := migrate(cmd, db); err != nil { logger.WithError(err).Warnf("couldn't migrate db") } - store := sqlite.NewSQLLiteQuerierFromDb(db) + store := sqlite.NewSQLLiteQuerierFromDb(db, sqlite.OmitManifests(true)) // sanity check that the db is available tables, err := store.ListTables(context.TODO()) diff --git a/staging/operator-registry/pkg/registry/empty.go b/staging/operator-registry/pkg/registry/empty.go index fd4676eb6c..936f39ccab 100644 --- a/staging/operator-registry/pkg/registry/empty.go +++ b/staging/operator-registry/pkg/registry/empty.go @@ -100,6 +100,10 @@ func (EmptyQuery) ListBundles(ctx context.Context) ([]*api.Bundle, error) { return nil, errors.New("empty querier: cannot list bundles") } +func (EmptyQuery) SendBundles(ctx context.Context, stream BundleSender) error { + return errors.New("empty querier: cannot stream bundles") +} + func (EmptyQuery) GetDependenciesForBundle(ctx context.Context, name, version, path string) (dependencies []*api.Dependency, err error) { return nil, errors.New("empty querier: cannot get dependencies for bundle") } diff --git a/staging/operator-registry/pkg/registry/interface.go b/staging/operator-registry/pkg/registry/interface.go index fd77636117..454cec3869 100644 --- a/staging/operator-registry/pkg/registry/interface.go +++ b/staging/operator-registry/pkg/registry/interface.go @@ -17,10 +17,17 @@ type Load interface { ClearNonHeadBundles() error } +type BundleSender interface { + Send(*api.Bundle) error +} + type GRPCQuery interface { // List all available package names in the index ListPackages(ctx context.Context) ([]string, error) + // Sends all available bundles in the index + SendBundles(ctx context.Context, stream BundleSender) error + // List all available bundles in the index ListBundles(ctx context.Context) (bundles []*api.Bundle, err error) diff --git a/staging/operator-registry/pkg/registry/query.go b/staging/operator-registry/pkg/registry/query.go index 9acd9d48fa..20f0e9f95d 100644 --- a/staging/operator-registry/pkg/registry/query.go +++ b/staging/operator-registry/pkg/registry/query.go @@ -13,6 +13,14 @@ type Querier struct { pkgs model.Model } +type SliceBundleSender []*api.Bundle + +func (s *SliceBundleSender) Send(b *api.Bundle) error { + + *s = append(*s, b) + return nil +} + var _ GRPCQuery = &Querier{} func NewQuerier(packages model.Model) *Querier { @@ -29,21 +37,40 @@ func (q Querier) ListPackages(_ context.Context) ([]string, error) { return packages, nil } -func (q Querier) ListBundles(_ context.Context) ([]*api.Bundle, error) { - var bundles []*api.Bundle +func (q Querier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { + var bundleSender SliceBundleSender + + err := q.SendBundles(ctx, &bundleSender) + if err != nil { + return nil, err + } + + return bundleSender, nil +} +func (q Querier) SendBundles(_ context.Context, s BundleSender) error { for _, pkg := range q.pkgs { for _, ch := range pkg.Channels { for _, b := range ch.Bundles { apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) + return fmt.Errorf("convert bundle %q: %v", b.Name, err) + } + if apiBundle.BundlePath != "" { + // The SQLite-based server + // configures its querier to + // omit these fields when + // bundle path is set. + apiBundle.CsvJson = "" + apiBundle.Object = nil + } + if err := s.Send(apiBundle); err != nil { + return err } - bundles = append(bundles, apiBundle) } } } - return bundles, nil + return nil } func (q Querier) GetPackage(_ context.Context, name string) (*PackageManifest, error) { diff --git a/staging/operator-registry/pkg/registry/query_test.go b/staging/operator-registry/pkg/registry/query_test.go index 0973230e3c..c5951a73d7 100644 --- a/staging/operator-registry/pkg/registry/query_test.go +++ b/staging/operator-registry/pkg/registry/query_test.go @@ -164,7 +164,11 @@ func TestQuerier_ListBundles(t *testing.T) { bundles, err := testModelQuerier.ListBundles(context.TODO()) require.NoError(t, err) require.NotNil(t, bundles) - require.Equal(t, 12, len(bundles)) + require.Len(t, bundles, 12) + for _, b := range bundles { + require.Zero(t, b.CsvJson) + require.Zero(t, b.Object) + } } func TestQuerier_ListPackages(t *testing.T) { diff --git a/staging/operator-registry/pkg/server/server.go b/staging/operator-registry/pkg/server/server.go index 9fe1592c3e..64b87ce70e 100644 --- a/staging/operator-registry/pkg/server/server.go +++ b/staging/operator-registry/pkg/server/server.go @@ -33,17 +33,7 @@ func (s *RegistryServer) ListPackages(req *api.ListPackageRequest, stream api.Re } func (s *RegistryServer) ListBundles(req *api.ListBundlesRequest, stream api.Registry_ListBundlesServer) error { - bundles, err := s.store.ListBundles(stream.Context()) - if err != nil { - return err - } - for _, b := range bundles { - if err := stream.Send(b); err != nil { - return err - } - } - - return nil + return s.store.SendBundles(stream.Context(), stream) } func (s *RegistryServer) GetPackage(ctx context.Context, req *api.GetPackageRequest) (*api.Package, error) { diff --git a/staging/operator-registry/pkg/sqlite/query.go b/staging/operator-registry/pkg/sqlite/query.go index 5b68340359..fb24204941 100644 --- a/staging/operator-registry/pkg/sqlite/query.go +++ b/staging/operator-registry/pkg/sqlite/query.go @@ -35,25 +35,44 @@ func (a dbQuerierAdapter) QueryContext(ctx context.Context, query string, args . type SQLQuerier struct { db Querier + querierConfig } var _ registry.Query = &SQLQuerier{} -func NewSQLLiteQuerier(dbFilename string) (*SQLQuerier, error) { +type querierConfig struct { + omitManifests bool +} + +type SQLiteQuerierOption func(*querierConfig) + +// If true, ListBundles will omit inline manifests (the object and +// csvJson fields) from response elements that contain a bundle image +// reference. +func OmitManifests(b bool) SQLiteQuerierOption { + return func(c *querierConfig) { + c.omitManifests = b + } +} + +func NewSQLLiteQuerier(dbFilename string, opts ...SQLiteQuerierOption) (*SQLQuerier, error) { db, err := OpenReadOnly(dbFilename) if err != nil { return nil, err } - - return &SQLQuerier{dbQuerierAdapter{db}}, nil + return NewSQLLiteQuerierFromDb(db, opts...), nil } -func NewSQLLiteQuerierFromDb(db *sql.DB) *SQLQuerier { - return &SQLQuerier{dbQuerierAdapter{db}} +func NewSQLLiteQuerierFromDb(db *sql.DB, opts ...SQLiteQuerierOption) *SQLQuerier { + return NewSQLLiteQuerierFromDBQuerier(dbQuerierAdapter{db}, opts...) } -func NewSQLLiteQuerierFromDBQuerier(q Querier) *SQLQuerier { - return &SQLQuerier{q} +func NewSQLLiteQuerierFromDBQuerier(q Querier, opts ...SQLiteQuerierOption) *SQLQuerier { + sq := SQLQuerier{db: q} + for _, opt := range opts { + opt(&sq.querierConfig) + } + return &sq } func (s *SQLQuerier) ListTables(ctx context.Context) ([]string, error) { @@ -970,10 +989,20 @@ tip (depth) AS ( INNER JOIN channel_entry AS skipped_entry ON skips_entry.skips = skipped_entry.entry_id GROUP BY all_entry.operatorbundle_name, all_entry.package_name, all_entry.channel_name +), +merged_properties (bundle_name, merged) AS ( + SELECT operatorbundle_name, json_group_array(json_object('type', properties.type, 'value', properties.value)) + FROM properties + GROUP BY operatorbundle_name +), +merged_dependencies (bundle_name, merged) AS ( + SELECT operatorbundle_name, json_group_array(json_object('type', dependencies.type, 'value', CAST(dependencies.value AS TEXT))) + FROM dependencies + GROUP BY operatorbundle_name ) SELECT replaces_bundle.entry_id, - operatorbundle.bundle, + CASE WHEN :omit_manifests AND length(coalesce(operatorbundle.bundlepath, "")) > 0 THEN NULL ELSE operatorbundle.bundle END, operatorbundle.bundlepath, operatorbundle.name, replaces_bundle.package_name, @@ -982,10 +1011,8 @@ SELECT skips_bundle.skips, operatorbundle.version, operatorbundle.skiprange, - dependencies.type, - dependencies.value, - properties.type, - properties.value + merged_dependencies.merged, + merged_properties.merged FROM replaces_bundle INNER JOIN operatorbundle ON replaces_bundle.operatorbundle_name = operatorbundle.name @@ -993,20 +1020,18 @@ SELECT ON replaces_bundle.operatorbundle_name = skips_bundle.operatorbundle_name AND replaces_bundle.package_name = skips_bundle.package_name AND replaces_bundle.channel_name = skips_bundle.channel_name - LEFT OUTER JOIN dependencies - ON operatorbundle.name = dependencies.operatorbundle_name - LEFT OUTER JOIN properties - ON operatorbundle.name = properties.operatorbundle_name` + LEFT OUTER JOIN merged_dependencies + ON operatorbundle.name = merged_dependencies.bundle_name + LEFT OUTER JOIN merged_properties + ON operatorbundle.name = merged_properties.bundle_name` -func (s *SQLQuerier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { - rows, err := s.db.QueryContext(ctx, listBundlesQuery) +func (s *SQLQuerier) SendBundles(ctx context.Context, stream registry.BundleSender) error { + rows, err := s.db.QueryContext(ctx, listBundlesQuery, sql.Named("omit_manifests", s.omitManifests)) if err != nil { - return nil, err + return err } defer rows.Close() - var bundles []*api.Bundle - bundlesMap := map[string]*api.Bundle{} for rows.Next() { var ( entryID sql.NullInt64 @@ -1019,101 +1044,109 @@ func (s *SQLQuerier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { skips sql.NullString version sql.NullString skipRange sql.NullString - depType sql.NullString - depValue sql.NullString - propType sql.NullString - propValue sql.NullString + deps sql.NullString + props sql.NullString ) - if err := rows.Scan(&entryID, &bundle, &bundlePath, &bundleName, &pkgName, &channelName, &replaces, &skips, &version, &skipRange, &depType, &depValue, &propType, &propValue); err != nil { - return nil, err + if err := rows.Scan(&entryID, &bundle, &bundlePath, &bundleName, &pkgName, &channelName, &replaces, &skips, &version, &skipRange, &deps, &props); err != nil { + return err } if !bundleName.Valid || !version.Valid || !bundlePath.Valid || !channelName.Valid { continue } - bundleKey := fmt.Sprintf("%s/%s/%s/%s", bundleName.String, version.String, bundlePath.String, channelName.String) - bundleItem, ok := bundlesMap[bundleKey] - if ok { - if depType.Valid && depValue.Valid { - bundleItem.Dependencies = append(bundleItem.Dependencies, &api.Dependency{ - Type: depType.String, - Value: depValue.String, - }) + out := &api.Bundle{} + if bundle.Valid && bundle.String != "" { + out, err = registry.BundleStringToAPIBundle(bundle.String) + if err != nil { + return err } + } + out.CsvName = bundleName.String + out.PackageName = pkgName.String + out.ChannelName = channelName.String + out.BundlePath = bundlePath.String + out.Version = version.String + out.SkipRange = skipRange.String + out.Replaces = replaces.String + + if skips.Valid { + out.Skips = strings.Split(skips.String, ",") + } - if propType.Valid && propValue.Valid { - bundleItem.Properties = append(bundleItem.Properties, &api.Property{ - Type: propType.String, - Value: propValue.String, - }) - } - } else { - // Create new bundle - out := &api.Bundle{} - if bundle.Valid && bundle.String != "" { - out, err = registry.BundleStringToAPIBundle(bundle.String) - if err != nil { - return nil, err - } + if deps.Valid { + if err := json.Unmarshal([]byte(deps.String), &out.Dependencies); err != nil { + return err } + } + buildLegacyRequiredAPIs(out.Dependencies, &out.RequiredApis) + out.Dependencies = uniqueDeps(out.Dependencies) - out.CsvName = bundleName.String - out.PackageName = pkgName.String - out.ChannelName = channelName.String - out.BundlePath = bundlePath.String - out.Version = version.String - out.SkipRange = skipRange.String - out.Replaces = replaces.String - if skips.Valid { - out.Skips = strings.Split(skips.String, ",") + if props.Valid { + if err := json.Unmarshal([]byte(props.String), &out.Properties); err != nil { + return err } + } + buildLegacyProvidedAPIs(out.Properties, &out.ProvidedApis) + out.Properties = uniqueProps(out.Properties) + if err := stream.Send(out); err != nil { + return err + } + } - provided, required, err := s.GetApisForEntry(ctx, entryID.Int64) - if err != nil { - return nil, err - } - if len(provided) > 0 { - out.ProvidedApis = provided - } - if len(required) > 0 { - out.RequiredApis = required - } + return nil +} - if depType.Valid && depValue.Valid { - out.Dependencies = []*api.Dependency{{ - Type: depType.String, - Value: depValue.String, - }} - } +func (s *SQLQuerier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { + var bundleSender registry.SliceBundleSender + err := s.SendBundles(ctx, &bundleSender) + if err != nil { + return nil, err + } + return bundleSender, nil - if propType.Valid && propValue.Valid { - out.Properties = []*api.Property{{ - Type: propType.String, - Value: propValue.String, - }} - } +} - bundlesMap[bundleKey] = out +func buildLegacyRequiredAPIs(src []*api.Dependency, dst *[]*api.GroupVersionKind) error { + for _, p := range src { + if p.GetType() != registry.GVKType { + continue + } + var value registry.GVKDependency + if err := json.Unmarshal([]byte(p.GetValue()), &value); err != nil { + return err } + *dst = append(*dst, &api.GroupVersionKind{ + Group: value.Group, + Version: value.Version, + Kind: value.Kind, + }) } + return nil +} - for _, v := range bundlesMap { - if len(v.Dependencies) > 1 { - newDeps := unique(v.Dependencies) - v.Dependencies = newDeps +func buildLegacyProvidedAPIs(src []*api.Property, dst *[]*api.GroupVersionKind) error { + for _, p := range src { + if p.GetType() != registry.GVKType { + continue } - if len(v.Properties) > 1 { - newProps := uniqueProps(v.Properties) - v.Properties = newProps + var value registry.GVKProperty + if err := json.Unmarshal([]byte(p.GetValue()), &value); err != nil { + return err } - bundles = append(bundles, v) + *dst = append(*dst, &api.GroupVersionKind{ + Group: value.Group, + Version: value.Version, + Kind: value.Kind, + }) } - - return bundles, nil + return nil } -func unique(deps []*api.Dependency) []*api.Dependency { +func uniqueDeps(deps []*api.Dependency) []*api.Dependency { + if len(deps) <= 1 { + return deps + } keys := make(map[string]struct{}) var list []*api.Dependency for _, entry := range deps { @@ -1127,6 +1160,9 @@ func unique(deps []*api.Dependency) []*api.Dependency { } func uniqueProps(props []*api.Property) []*api.Property { + if len(props) <= 1 { + return props + } keys := make(map[string]struct{}) var list []*api.Property for _, entry := range props { diff --git a/staging/operator-registry/pkg/sqlite/query_sql_test.go b/staging/operator-registry/pkg/sqlite/query_sql_test.go index 81468231e7..73a19db705 100644 --- a/staging/operator-registry/pkg/sqlite/query_sql_test.go +++ b/staging/operator-registry/pkg/sqlite/query_sql_test.go @@ -12,9 +12,10 @@ import ( func TestListBundlesQuery(t *testing.T) { for _, tt := range []struct { - Name string - Setup func(t *testing.T, db *sql.DB) - Expect func(t *testing.T, rows *sql.Rows) + Name string + Setup func(t *testing.T, db *sql.DB) + Expect func(t *testing.T, rows *sql.Rows) + OmitManfests bool }{ { Name: "replacement comes from channel entry", @@ -42,7 +43,7 @@ func TestListBundlesQuery(t *testing.T) { c interface{} name, actual sql.NullString ) - if err := rows.Scan(&c, &c, &c, &name, &c, &c, &actual, &c, &c, &c, &c, &c, &c, &c); err != nil { + if err := rows.Scan(&c, &c, &c, &name, &c, &c, &actual, &c, &c, &c, &c, &c); err != nil { t.Fatalf("unexpected error during row scan: %v", err) } expected, ok := replacements[name] @@ -107,7 +108,7 @@ func TestListBundlesQuery(t *testing.T) { c interface{} actual result ) - if err := rows.Scan(&c, &c, &c, &actual.Name, &c, &c, &actual.Replaces, &actual.Skips, &c, &c, &c, &c, &c, &c); err != nil { + if err := rows.Scan(&c, &c, &c, &actual.Name, &c, &c, &actual.Replaces, &actual.Skips, &c, &c, &c, &c); err != nil { t.Fatalf("unexpected error during row scan: %v", err) } r, ok := expected[actual.Name] @@ -126,6 +127,62 @@ func TestListBundlesQuery(t *testing.T) { } }, }, + { + Name: "manifests omitted without bundlepath", + OmitManfests: true, + Setup: func(t *testing.T, db *sql.DB) { + for _, stmt := range []string{ + `insert into package (name, default_channel) values ("package", "channel")`, + `insert into channel (name, package_name, head_operatorbundle_name) values ("channel", "package", "bundle")`, + `insert into operatorbundle (name, bundle) values ("bundle-a", "{}")`, + `insert into channel_entry (package_name, channel_name, operatorbundle_name, entry_id, depth) values ("package", "channel", "bundle-a", 1, 0)`, + } { + if _, err := db.Exec(stmt); err != nil { + t.Fatalf("unexpected error executing setup statements: %v", err) + } + } + + }, + Expect: func(t *testing.T, rows *sql.Rows) { + require := require.New(t) + require.True(rows.Next()) + var ( + c interface{} + bundle sql.NullString + ) + require.NoError(rows.Scan(&c, &bundle, &c, &c, &c, &c, &c, &c, &c, &c, &c, &c)) + require.Equal(sql.NullString{Valid: true, String: "{}"}, bundle) + require.False(rows.Next()) + }, + }, + { + Name: "manifests not omitted with bundlepath", + OmitManfests: true, + Setup: func(t *testing.T, db *sql.DB) { + for _, stmt := range []string{ + `insert into package (name, default_channel) values ("package", "channel")`, + `insert into channel (name, package_name, head_operatorbundle_name) values ("channel", "package", "bundle")`, + `insert into operatorbundle (name, bundle, bundlepath) values ("bundle-a", "{}", "path")`, + `insert into channel_entry (package_name, channel_name, operatorbundle_name, entry_id, depth) values ("package", "channel", "bundle-a", 1, 0)`, + } { + if _, err := db.Exec(stmt); err != nil { + t.Fatalf("unexpected error executing setup statements: %v", err) + } + } + + }, + Expect: func(t *testing.T, rows *sql.Rows) { + require := require.New(t) + require.True(rows.Next()) + var ( + c interface{} + bundle sql.NullString + ) + require.NoError(rows.Scan(&c, &bundle, &c, &c, &c, &c, &c, &c, &c, &c, &c, &c)) + require.Equal(sql.NullString{Valid: false, String: ""}, bundle) + require.False(rows.Next()) + }, + }, } { t.Run(tt.Name, func(t *testing.T) { ctx := context.Background() @@ -143,7 +200,7 @@ func TestListBundlesQuery(t *testing.T) { _, err = db.Exec("PRAGMA foreign_keys = ON") require.NoError(t, err) - rows, err := db.QueryContext(ctx, listBundlesQuery) + rows, err := db.QueryContext(ctx, listBundlesQuery, sql.Named("omit_manifests", tt.OmitManfests)) if err != nil { t.Fatalf("unexpected error executing list bundles query: %v", err) } diff --git a/staging/operator-registry/pkg/sqlite/query_test.go b/staging/operator-registry/pkg/sqlite/query_test.go index c657771b73..110c4e14e0 100644 --- a/staging/operator-registry/pkg/sqlite/query_test.go +++ b/staging/operator-registry/pkg/sqlite/query_test.go @@ -15,20 +15,18 @@ import ( func TestListBundles(t *testing.T) { type Columns struct { - EntryID sql.NullInt64 - Bundle sql.NullString - BundlePath sql.NullString - BundleName sql.NullString - PackageName sql.NullString - ChannelName sql.NullString - Replaces sql.NullString - Skips sql.NullString - Version sql.NullString - SkipRange sql.NullString - DependencyType sql.NullString - DependencyValue sql.NullString - PropertyType sql.NullString - PropertyValue sql.NullString + EntryID sql.NullInt64 + Bundle sql.NullString + BundlePath sql.NullString + BundleName sql.NullString + PackageName sql.NullString + ChannelName sql.NullString + Replaces sql.NullString + Skips sql.NullString + Version sql.NullString + SkipRange sql.NullString + Dependencies sql.NullString + Properties sql.NullString } var NoRows sqlitefakes.FakeRowScanner @@ -195,27 +193,14 @@ func TestListBundles(t *testing.T) { q.QueryContextReturns(&NoRows, nil) q.QueryContextReturnsOnCall(0, &r, nil) r.NextReturnsOnCall(0, true) - r.NextReturnsOnCall(1, true) cols := []Columns{ { - BundleName: sql.NullString{Valid: true, String: "BundleName"}, - Version: sql.NullString{Valid: true, String: "Version"}, - ChannelName: sql.NullString{Valid: true, String: "ChannelName"}, - BundlePath: sql.NullString{Valid: true, String: "BundlePath"}, - DependencyType: sql.NullString{Valid: true, String: "Dependency1Type"}, - DependencyValue: sql.NullString{Valid: true, String: "Dependency1Value"}, - PropertyType: sql.NullString{Valid: true, String: "Property1Type"}, - PropertyValue: sql.NullString{Valid: true, String: "Property1Value"}, - }, - { - BundleName: sql.NullString{Valid: true, String: "BundleName"}, - Version: sql.NullString{Valid: true, String: "Version"}, - ChannelName: sql.NullString{Valid: true, String: "ChannelName"}, - BundlePath: sql.NullString{Valid: true, String: "BundlePath"}, - DependencyType: sql.NullString{Valid: true, String: "Dependency2Type"}, - DependencyValue: sql.NullString{Valid: true, String: "Dependency2Value"}, - PropertyType: sql.NullString{Valid: true, String: "Property2Type"}, - PropertyValue: sql.NullString{Valid: true, String: "Property2Value"}, + BundleName: sql.NullString{Valid: true, String: "BundleName"}, + Version: sql.NullString{Valid: true, String: "Version"}, + ChannelName: sql.NullString{Valid: true, String: "ChannelName"}, + BundlePath: sql.NullString{Valid: true, String: "BundlePath"}, + Dependencies: sql.NullString{Valid: true, String: `[{"type":"Dependency1Type","value":"Dependency1Value"},{"type":"Dependency2Type","value":"Dependency2Value"}]`}, + Properties: sql.NullString{Valid: true, String: `[{"type":"Property1Type","value":"Property1Value"},{"type":"Property2Type","value":"Property2Value"}]`}, }, } var i int diff --git a/vendor/github.com/operator-framework/operator-registry/cmd/opm/registry/serve.go b/vendor/github.com/operator-framework/operator-registry/cmd/opm/registry/serve.go index 74957a2c41..ff402c58b3 100644 --- a/vendor/github.com/operator-framework/operator-registry/cmd/opm/registry/serve.go +++ b/vendor/github.com/operator-framework/operator-registry/cmd/opm/registry/serve.go @@ -90,12 +90,16 @@ func serveFunc(cmd *cobra.Command, args []string) error { return err } + if _, err := db.ExecContext(context.TODO(), `PRAGMA soft_heap_limit=1`); err != nil { + logger.WithError(err).Warnf("error setting soft heap limit for sqlite") + } + // migrate to the latest version if err := migrate(cmd, db); err != nil { logger.WithError(err).Warnf("couldn't migrate db") } - store := sqlite.NewSQLLiteQuerierFromDb(db) + store := sqlite.NewSQLLiteQuerierFromDb(db, sqlite.OmitManifests(true)) // sanity check that the db is available tables, err := store.ListTables(context.TODO()) diff --git a/vendor/github.com/operator-framework/operator-registry/cmd/registry-server/main.go b/vendor/github.com/operator-framework/operator-registry/cmd/registry-server/main.go index 9cd9e63ff3..529a02a9cb 100644 --- a/vendor/github.com/operator-framework/operator-registry/cmd/registry-server/main.go +++ b/vendor/github.com/operator-framework/operator-registry/cmd/registry-server/main.go @@ -91,12 +91,16 @@ func runCmdFunc(cmd *cobra.Command, args []string) error { return err } + if _, err := db.ExecContext(context.TODO(), `PRAGMA soft_heap_limit=1`); err != nil { + logger.WithError(err).Warnf("error setting soft heap limit for sqlite") + } + // migrate to the latest version if err := migrate(cmd, db); err != nil { logger.WithError(err).Warnf("couldn't migrate db") } - store := sqlite.NewSQLLiteQuerierFromDb(db) + store := sqlite.NewSQLLiteQuerierFromDb(db, sqlite.OmitManifests(true)) // sanity check that the db is available tables, err := store.ListTables(context.TODO()) diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/empty.go b/vendor/github.com/operator-framework/operator-registry/pkg/registry/empty.go index fd4676eb6c..936f39ccab 100644 --- a/vendor/github.com/operator-framework/operator-registry/pkg/registry/empty.go +++ b/vendor/github.com/operator-framework/operator-registry/pkg/registry/empty.go @@ -100,6 +100,10 @@ func (EmptyQuery) ListBundles(ctx context.Context) ([]*api.Bundle, error) { return nil, errors.New("empty querier: cannot list bundles") } +func (EmptyQuery) SendBundles(ctx context.Context, stream BundleSender) error { + return errors.New("empty querier: cannot stream bundles") +} + func (EmptyQuery) GetDependenciesForBundle(ctx context.Context, name, version, path string) (dependencies []*api.Dependency, err error) { return nil, errors.New("empty querier: cannot get dependencies for bundle") } diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/interface.go b/vendor/github.com/operator-framework/operator-registry/pkg/registry/interface.go index fd77636117..454cec3869 100644 --- a/vendor/github.com/operator-framework/operator-registry/pkg/registry/interface.go +++ b/vendor/github.com/operator-framework/operator-registry/pkg/registry/interface.go @@ -17,10 +17,17 @@ type Load interface { ClearNonHeadBundles() error } +type BundleSender interface { + Send(*api.Bundle) error +} + type GRPCQuery interface { // List all available package names in the index ListPackages(ctx context.Context) ([]string, error) + // Sends all available bundles in the index + SendBundles(ctx context.Context, stream BundleSender) error + // List all available bundles in the index ListBundles(ctx context.Context) (bundles []*api.Bundle, err error) diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go b/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go index 9acd9d48fa..20f0e9f95d 100644 --- a/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go +++ b/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go @@ -13,6 +13,14 @@ type Querier struct { pkgs model.Model } +type SliceBundleSender []*api.Bundle + +func (s *SliceBundleSender) Send(b *api.Bundle) error { + + *s = append(*s, b) + return nil +} + var _ GRPCQuery = &Querier{} func NewQuerier(packages model.Model) *Querier { @@ -29,21 +37,40 @@ func (q Querier) ListPackages(_ context.Context) ([]string, error) { return packages, nil } -func (q Querier) ListBundles(_ context.Context) ([]*api.Bundle, error) { - var bundles []*api.Bundle +func (q Querier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { + var bundleSender SliceBundleSender + + err := q.SendBundles(ctx, &bundleSender) + if err != nil { + return nil, err + } + + return bundleSender, nil +} +func (q Querier) SendBundles(_ context.Context, s BundleSender) error { for _, pkg := range q.pkgs { for _, ch := range pkg.Channels { for _, b := range ch.Bundles { apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) + return fmt.Errorf("convert bundle %q: %v", b.Name, err) + } + if apiBundle.BundlePath != "" { + // The SQLite-based server + // configures its querier to + // omit these fields when + // bundle path is set. + apiBundle.CsvJson = "" + apiBundle.Object = nil + } + if err := s.Send(apiBundle); err != nil { + return err } - bundles = append(bundles, apiBundle) } } } - return bundles, nil + return nil } func (q Querier) GetPackage(_ context.Context, name string) (*PackageManifest, error) { diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/server/server.go b/vendor/github.com/operator-framework/operator-registry/pkg/server/server.go index 9fe1592c3e..64b87ce70e 100644 --- a/vendor/github.com/operator-framework/operator-registry/pkg/server/server.go +++ b/vendor/github.com/operator-framework/operator-registry/pkg/server/server.go @@ -33,17 +33,7 @@ func (s *RegistryServer) ListPackages(req *api.ListPackageRequest, stream api.Re } func (s *RegistryServer) ListBundles(req *api.ListBundlesRequest, stream api.Registry_ListBundlesServer) error { - bundles, err := s.store.ListBundles(stream.Context()) - if err != nil { - return err - } - for _, b := range bundles { - if err := stream.Send(b); err != nil { - return err - } - } - - return nil + return s.store.SendBundles(stream.Context(), stream) } func (s *RegistryServer) GetPackage(ctx context.Context, req *api.GetPackageRequest) (*api.Package, error) { diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/sqlite/query.go b/vendor/github.com/operator-framework/operator-registry/pkg/sqlite/query.go index 5b68340359..fb24204941 100644 --- a/vendor/github.com/operator-framework/operator-registry/pkg/sqlite/query.go +++ b/vendor/github.com/operator-framework/operator-registry/pkg/sqlite/query.go @@ -35,25 +35,44 @@ func (a dbQuerierAdapter) QueryContext(ctx context.Context, query string, args . type SQLQuerier struct { db Querier + querierConfig } var _ registry.Query = &SQLQuerier{} -func NewSQLLiteQuerier(dbFilename string) (*SQLQuerier, error) { +type querierConfig struct { + omitManifests bool +} + +type SQLiteQuerierOption func(*querierConfig) + +// If true, ListBundles will omit inline manifests (the object and +// csvJson fields) from response elements that contain a bundle image +// reference. +func OmitManifests(b bool) SQLiteQuerierOption { + return func(c *querierConfig) { + c.omitManifests = b + } +} + +func NewSQLLiteQuerier(dbFilename string, opts ...SQLiteQuerierOption) (*SQLQuerier, error) { db, err := OpenReadOnly(dbFilename) if err != nil { return nil, err } - - return &SQLQuerier{dbQuerierAdapter{db}}, nil + return NewSQLLiteQuerierFromDb(db, opts...), nil } -func NewSQLLiteQuerierFromDb(db *sql.DB) *SQLQuerier { - return &SQLQuerier{dbQuerierAdapter{db}} +func NewSQLLiteQuerierFromDb(db *sql.DB, opts ...SQLiteQuerierOption) *SQLQuerier { + return NewSQLLiteQuerierFromDBQuerier(dbQuerierAdapter{db}, opts...) } -func NewSQLLiteQuerierFromDBQuerier(q Querier) *SQLQuerier { - return &SQLQuerier{q} +func NewSQLLiteQuerierFromDBQuerier(q Querier, opts ...SQLiteQuerierOption) *SQLQuerier { + sq := SQLQuerier{db: q} + for _, opt := range opts { + opt(&sq.querierConfig) + } + return &sq } func (s *SQLQuerier) ListTables(ctx context.Context) ([]string, error) { @@ -970,10 +989,20 @@ tip (depth) AS ( INNER JOIN channel_entry AS skipped_entry ON skips_entry.skips = skipped_entry.entry_id GROUP BY all_entry.operatorbundle_name, all_entry.package_name, all_entry.channel_name +), +merged_properties (bundle_name, merged) AS ( + SELECT operatorbundle_name, json_group_array(json_object('type', properties.type, 'value', properties.value)) + FROM properties + GROUP BY operatorbundle_name +), +merged_dependencies (bundle_name, merged) AS ( + SELECT operatorbundle_name, json_group_array(json_object('type', dependencies.type, 'value', CAST(dependencies.value AS TEXT))) + FROM dependencies + GROUP BY operatorbundle_name ) SELECT replaces_bundle.entry_id, - operatorbundle.bundle, + CASE WHEN :omit_manifests AND length(coalesce(operatorbundle.bundlepath, "")) > 0 THEN NULL ELSE operatorbundle.bundle END, operatorbundle.bundlepath, operatorbundle.name, replaces_bundle.package_name, @@ -982,10 +1011,8 @@ SELECT skips_bundle.skips, operatorbundle.version, operatorbundle.skiprange, - dependencies.type, - dependencies.value, - properties.type, - properties.value + merged_dependencies.merged, + merged_properties.merged FROM replaces_bundle INNER JOIN operatorbundle ON replaces_bundle.operatorbundle_name = operatorbundle.name @@ -993,20 +1020,18 @@ SELECT ON replaces_bundle.operatorbundle_name = skips_bundle.operatorbundle_name AND replaces_bundle.package_name = skips_bundle.package_name AND replaces_bundle.channel_name = skips_bundle.channel_name - LEFT OUTER JOIN dependencies - ON operatorbundle.name = dependencies.operatorbundle_name - LEFT OUTER JOIN properties - ON operatorbundle.name = properties.operatorbundle_name` + LEFT OUTER JOIN merged_dependencies + ON operatorbundle.name = merged_dependencies.bundle_name + LEFT OUTER JOIN merged_properties + ON operatorbundle.name = merged_properties.bundle_name` -func (s *SQLQuerier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { - rows, err := s.db.QueryContext(ctx, listBundlesQuery) +func (s *SQLQuerier) SendBundles(ctx context.Context, stream registry.BundleSender) error { + rows, err := s.db.QueryContext(ctx, listBundlesQuery, sql.Named("omit_manifests", s.omitManifests)) if err != nil { - return nil, err + return err } defer rows.Close() - var bundles []*api.Bundle - bundlesMap := map[string]*api.Bundle{} for rows.Next() { var ( entryID sql.NullInt64 @@ -1019,101 +1044,109 @@ func (s *SQLQuerier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { skips sql.NullString version sql.NullString skipRange sql.NullString - depType sql.NullString - depValue sql.NullString - propType sql.NullString - propValue sql.NullString + deps sql.NullString + props sql.NullString ) - if err := rows.Scan(&entryID, &bundle, &bundlePath, &bundleName, &pkgName, &channelName, &replaces, &skips, &version, &skipRange, &depType, &depValue, &propType, &propValue); err != nil { - return nil, err + if err := rows.Scan(&entryID, &bundle, &bundlePath, &bundleName, &pkgName, &channelName, &replaces, &skips, &version, &skipRange, &deps, &props); err != nil { + return err } if !bundleName.Valid || !version.Valid || !bundlePath.Valid || !channelName.Valid { continue } - bundleKey := fmt.Sprintf("%s/%s/%s/%s", bundleName.String, version.String, bundlePath.String, channelName.String) - bundleItem, ok := bundlesMap[bundleKey] - if ok { - if depType.Valid && depValue.Valid { - bundleItem.Dependencies = append(bundleItem.Dependencies, &api.Dependency{ - Type: depType.String, - Value: depValue.String, - }) + out := &api.Bundle{} + if bundle.Valid && bundle.String != "" { + out, err = registry.BundleStringToAPIBundle(bundle.String) + if err != nil { + return err } + } + out.CsvName = bundleName.String + out.PackageName = pkgName.String + out.ChannelName = channelName.String + out.BundlePath = bundlePath.String + out.Version = version.String + out.SkipRange = skipRange.String + out.Replaces = replaces.String + + if skips.Valid { + out.Skips = strings.Split(skips.String, ",") + } - if propType.Valid && propValue.Valid { - bundleItem.Properties = append(bundleItem.Properties, &api.Property{ - Type: propType.String, - Value: propValue.String, - }) - } - } else { - // Create new bundle - out := &api.Bundle{} - if bundle.Valid && bundle.String != "" { - out, err = registry.BundleStringToAPIBundle(bundle.String) - if err != nil { - return nil, err - } + if deps.Valid { + if err := json.Unmarshal([]byte(deps.String), &out.Dependencies); err != nil { + return err } + } + buildLegacyRequiredAPIs(out.Dependencies, &out.RequiredApis) + out.Dependencies = uniqueDeps(out.Dependencies) - out.CsvName = bundleName.String - out.PackageName = pkgName.String - out.ChannelName = channelName.String - out.BundlePath = bundlePath.String - out.Version = version.String - out.SkipRange = skipRange.String - out.Replaces = replaces.String - if skips.Valid { - out.Skips = strings.Split(skips.String, ",") + if props.Valid { + if err := json.Unmarshal([]byte(props.String), &out.Properties); err != nil { + return err } + } + buildLegacyProvidedAPIs(out.Properties, &out.ProvidedApis) + out.Properties = uniqueProps(out.Properties) + if err := stream.Send(out); err != nil { + return err + } + } - provided, required, err := s.GetApisForEntry(ctx, entryID.Int64) - if err != nil { - return nil, err - } - if len(provided) > 0 { - out.ProvidedApis = provided - } - if len(required) > 0 { - out.RequiredApis = required - } + return nil +} - if depType.Valid && depValue.Valid { - out.Dependencies = []*api.Dependency{{ - Type: depType.String, - Value: depValue.String, - }} - } +func (s *SQLQuerier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { + var bundleSender registry.SliceBundleSender + err := s.SendBundles(ctx, &bundleSender) + if err != nil { + return nil, err + } + return bundleSender, nil - if propType.Valid && propValue.Valid { - out.Properties = []*api.Property{{ - Type: propType.String, - Value: propValue.String, - }} - } +} - bundlesMap[bundleKey] = out +func buildLegacyRequiredAPIs(src []*api.Dependency, dst *[]*api.GroupVersionKind) error { + for _, p := range src { + if p.GetType() != registry.GVKType { + continue + } + var value registry.GVKDependency + if err := json.Unmarshal([]byte(p.GetValue()), &value); err != nil { + return err } + *dst = append(*dst, &api.GroupVersionKind{ + Group: value.Group, + Version: value.Version, + Kind: value.Kind, + }) } + return nil +} - for _, v := range bundlesMap { - if len(v.Dependencies) > 1 { - newDeps := unique(v.Dependencies) - v.Dependencies = newDeps +func buildLegacyProvidedAPIs(src []*api.Property, dst *[]*api.GroupVersionKind) error { + for _, p := range src { + if p.GetType() != registry.GVKType { + continue } - if len(v.Properties) > 1 { - newProps := uniqueProps(v.Properties) - v.Properties = newProps + var value registry.GVKProperty + if err := json.Unmarshal([]byte(p.GetValue()), &value); err != nil { + return err } - bundles = append(bundles, v) + *dst = append(*dst, &api.GroupVersionKind{ + Group: value.Group, + Version: value.Version, + Kind: value.Kind, + }) } - - return bundles, nil + return nil } -func unique(deps []*api.Dependency) []*api.Dependency { +func uniqueDeps(deps []*api.Dependency) []*api.Dependency { + if len(deps) <= 1 { + return deps + } keys := make(map[string]struct{}) var list []*api.Dependency for _, entry := range deps { @@ -1127,6 +1160,9 @@ func unique(deps []*api.Dependency) []*api.Dependency { } func uniqueProps(props []*api.Property) []*api.Property { + if len(props) <= 1 { + return props + } keys := make(map[string]struct{}) var list []*api.Property for _, entry := range props {