Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion staging/operator-registry/cmd/opm/registry/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,16 @@ func serveFunc(cmd *cobra.Command, args []string) error {
return err
}

if _, err := db.ExecContext(context.TODO(), `PRAGMA soft_heap_limit=1`); err != nil {
logger.WithError(err).Warnf("error setting soft heap limit for sqlite")
}

// migrate to the latest version
if err := migrate(cmd, db); err != nil {
logger.WithError(err).Warnf("couldn't migrate db")
}

store := sqlite.NewSQLLiteQuerierFromDb(db)
store := sqlite.NewSQLLiteQuerierFromDb(db, sqlite.OmitManifests(true))

// sanity check that the db is available
tables, err := store.ListTables(context.TODO())
Expand Down
6 changes: 5 additions & 1 deletion staging/operator-registry/cmd/registry-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,16 @@ func runCmdFunc(cmd *cobra.Command, args []string) error {
return err
}

if _, err := db.ExecContext(context.TODO(), `PRAGMA soft_heap_limit=1`); err != nil {
logger.WithError(err).Warnf("error setting soft heap limit for sqlite")
}

// migrate to the latest version
if err := migrate(cmd, db); err != nil {
logger.WithError(err).Warnf("couldn't migrate db")
}

store := sqlite.NewSQLLiteQuerierFromDb(db)
store := sqlite.NewSQLLiteQuerierFromDb(db, sqlite.OmitManifests(true))

// sanity check that the db is available
tables, err := store.ListTables(context.TODO())
Expand Down
4 changes: 4 additions & 0 deletions staging/operator-registry/pkg/registry/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (EmptyQuery) ListBundles(ctx context.Context) ([]*api.Bundle, error) {
return nil, errors.New("empty querier: cannot list bundles")
}

func (EmptyQuery) SendBundles(ctx context.Context, stream BundleSender) error {
return errors.New("empty querier: cannot stream bundles")
}

func (EmptyQuery) GetDependenciesForBundle(ctx context.Context, name, version, path string) (dependencies []*api.Dependency, err error) {
return nil, errors.New("empty querier: cannot get dependencies for bundle")
}
Expand Down
7 changes: 7 additions & 0 deletions staging/operator-registry/pkg/registry/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@ type Load interface {
ClearNonHeadBundles() error
}

type BundleSender interface {
Send(*api.Bundle) error
}

type GRPCQuery interface {
// List all available package names in the index
ListPackages(ctx context.Context) ([]string, error)

// Sends all available bundles in the index
SendBundles(ctx context.Context, stream BundleSender) error

// List all available bundles in the index
ListBundles(ctx context.Context) (bundles []*api.Bundle, err error)

Expand Down
37 changes: 32 additions & 5 deletions staging/operator-registry/pkg/registry/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ type Querier struct {
pkgs model.Model
}

type SliceBundleSender []*api.Bundle

func (s *SliceBundleSender) Send(b *api.Bundle) error {

*s = append(*s, b)
return nil
}

var _ GRPCQuery = &Querier{}

func NewQuerier(packages model.Model) *Querier {
Expand All @@ -29,21 +37,40 @@ func (q Querier) ListPackages(_ context.Context) ([]string, error) {
return packages, nil
}

func (q Querier) ListBundles(_ context.Context) ([]*api.Bundle, error) {
var bundles []*api.Bundle
func (q Querier) ListBundles(ctx context.Context) ([]*api.Bundle, error) {
var bundleSender SliceBundleSender

err := q.SendBundles(ctx, &bundleSender)
if err != nil {
return nil, err
}

return bundleSender, nil
}

func (q Querier) SendBundles(_ context.Context, s BundleSender) error {
for _, pkg := range q.pkgs {
for _, ch := range pkg.Channels {
for _, b := range ch.Bundles {
apiBundle, err := api.ConvertModelBundleToAPIBundle(*b)
if err != nil {
return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err)
return fmt.Errorf("convert bundle %q: %v", b.Name, err)
}
if apiBundle.BundlePath != "" {
// The SQLite-based server
// configures its querier to
// omit these fields when
// bundle path is set.
apiBundle.CsvJson = ""
apiBundle.Object = nil
}
if err := s.Send(apiBundle); err != nil {
return err
}
bundles = append(bundles, apiBundle)
}
}
}
return bundles, nil
return nil
}

func (q Querier) GetPackage(_ context.Context, name string) (*PackageManifest, error) {
Expand Down
6 changes: 5 additions & 1 deletion staging/operator-registry/pkg/registry/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ func TestQuerier_ListBundles(t *testing.T) {
bundles, err := testModelQuerier.ListBundles(context.TODO())
require.NoError(t, err)
require.NotNil(t, bundles)
require.Equal(t, 12, len(bundles))
require.Len(t, bundles, 12)
for _, b := range bundles {
require.Zero(t, b.CsvJson)
require.Zero(t, b.Object)
}
}

func TestQuerier_ListPackages(t *testing.T) {
Expand Down
12 changes: 1 addition & 11 deletions staging/operator-registry/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,7 @@ func (s *RegistryServer) ListPackages(req *api.ListPackageRequest, stream api.Re
}

func (s *RegistryServer) ListBundles(req *api.ListBundlesRequest, stream api.Registry_ListBundlesServer) error {
bundles, err := s.store.ListBundles(stream.Context())
if err != nil {
return err
}
for _, b := range bundles {
if err := stream.Send(b); err != nil {
return err
}
}

return nil
return s.store.SendBundles(stream.Context(), stream)
}

func (s *RegistryServer) GetPackage(ctx context.Context, req *api.GetPackageRequest) (*api.Package, error) {
Expand Down
Loading