Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 120 additions & 10 deletions internal/operator/internal/index_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,31 @@ package internal

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/operator-framework/operator-sdk/internal/operator"
registryutil "github.com/operator-framework/operator-sdk/internal/registry"
"github.com/operator-framework/operator-sdk/internal/util/k8sutil"
)

const (
defaultSourceType = "grpc"
)

type IndexImageCatalogCreator struct {
IndexImage string
InjectBundles []string
InjectBundleMode string
BundleImage string

cfg *operator.Configuration
}
Expand All @@ -50,22 +62,60 @@ func (c IndexImageCatalogCreator) CreateCatalog(ctx context.Context, name string
fmt.Printf("IndexImageCatalogCreator.InjectBundles: %q\n", strings.Join(c.InjectBundles, ","))
fmt.Printf("IndexImageCatalogCreator.InjectBundleMode: %q\n", c.InjectBundleMode)

// Create barebones catalog source
// create a basic catalog source type
cs := newCatalogSource(name, c.cfg.Namespace)

// Create registry pod, assigning its owner as the catalog source
// initialize and create the registry pod with provided index image
registryPod, err := c.createRegistryPod(ctx, dbPath)
if err != nil {
return nil, fmt.Errorf("error in creating registry pod: %v", err)
}

// Wait for registry pod to be ready
// get registry pod i.e corev1.Pod type
pod, err := registryPod.GetPod()
if err != nil {
return nil, fmt.Errorf("error in getting registry pod: %v", err)
}

// Update catalog source with `spec.Address = pod.status.podIP`
// make catalog source the owner of registry pod object
if err := controllerutil.SetOwnerReference(cs, pod, c.cfg.Scheme); err != nil {
return nil, fmt.Errorf("error in setting registry pod owner reference: %v", err)
}

// Update catalog source with annotations for index image,
// injected bundle, and registry add mode
// wait for registry pod to be running
if err := registryPod.VerifyPodRunning(ctx); err != nil {
return nil, fmt.Errorf("registry pod is not running: %v", err)
}

// Wait for catalog source status to indicate a successful
// connection with the registry pod
// update catalog source with source type, address and annotations
if err := c.updateCatalogSource(pod.Status.PodIP, cs); err != nil {
return nil, fmt.Errorf("error in updating catalog source: %v", err)
}

// wait for catalog source to be ready
if err := c.waitForCatalogSource(ctx, cs); err != nil {
return nil, err
}

return cs, nil
}

// Return the catalog source
return nil, nil
// newCatalogSource creates a new catalog source with name and namespace
func newCatalogSource(name, namespace string) *v1alpha1.CatalogSource {
return &v1alpha1.CatalogSource{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-cs", k8sutil.FormatOperatorNameDNS1123(name)),
Namespace: namespace,
},
Spec: v1alpha1.CatalogSourceSpec{
DisplayName: "CatalogSource",
Publisher: "operator-sdk",
},
TypeMeta: metav1.TypeMeta{
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Kind: v1alpha1.CatalogSourceKind,
},
}
}

const defaultDBPath = "/database/index.db"
Expand All @@ -80,3 +130,63 @@ func (c IndexImageCatalogCreator) getDBPath(ctx context.Context) (string, error)
}
return defaultDBPath, nil
}

func (c IndexImageCatalogCreator) createRegistryPod(ctx context.Context, dbPath string) (*RegistryPod, error) {
// Create registry pod, assigning its owner as the catalog source
registryPod, err := NewRegistryPod(c.cfg.Client, dbPath, c.BundleImage, c.cfg.Namespace)
if err != nil {
return nil, fmt.Errorf("error in initializing registry pod")
}

if err = registryPod.Create(ctx); err != nil {
return nil, fmt.Errorf("error in creating registry pod")
}

return registryPod, nil
}

func (c IndexImageCatalogCreator) updateCatalogSource(podAddr string, cs *v1alpha1.CatalogSource) error {
// Update catalog source with source type as grpc and address to point to the pod IP
cs.Spec.SourceType = defaultSourceType
cs.Spec.Address = fmt.Sprintf("%s:%v", podAddr, defaultGRPCPort)

// Update catalog source with annotations for index image,
// injected bundle, and registry add mode
injectedBundlesJSON, err := json.Marshal(c.InjectBundles)
if err != nil {
return fmt.Errorf("error in json marshal injected bundles: %v", err)
}
cs.ObjectMeta.Annotations = map[string]string{
"operators.operatorframework.io/index-image": c.IndexImage,
"operators.operatorframework.io/inject-bundle-mode": c.InjectBundleMode,
"operators.operatorframework.io/injected-bundles": string(injectedBundlesJSON),
}

return nil
}

func (c IndexImageCatalogCreator) waitForCatalogSource(ctx context.Context, cs *v1alpha1.CatalogSource) error {
catSrcKey, err := client.ObjectKeyFromObject(cs)
if err != nil {
return fmt.Errorf("error in getting catalog source key: %v", err)
}

// verify that catalog source connection status is READY
catSrcCheck := wait.ConditionFunc(func() (done bool, err error) {
if err := c.cfg.Client.Get(ctx, catSrcKey, cs); err != nil {
return false, err
}
if cs.Status.GRPCConnectionState != nil {
if cs.Status.GRPCConnectionState.LastObservedState == "READY" {
return true, nil
}
}
return false, nil
})

if err := wait.PollImmediateUntil(200*time.Millisecond, catSrcCheck, ctx.Done()); err != nil {
return fmt.Errorf("catalog source connection is not ready: %v", err)
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package olm
package internal

import (
"bytes"
"context"
"errors"
"fmt"
"path"
"strings"
"text/template"
"time"
Expand All @@ -43,10 +42,11 @@ const (
ReplacesBundleAddMode BundleAddModeType = "replaces"
)
const (
// defaultGRPCPort is the default grpc container port that the registry pod exposes
defaultGRPCPort = 50051
defaultIndexImage = "quay.io/operator-framework/upstream-opm-builder:latest"
defaultContainerName = "registry-grpc"
defaultContainerPortName = "grpc"
defaultGRPCPort = 50051
)

var (
Expand Down Expand Up @@ -74,7 +74,7 @@ type RegistryPod struct {
// Namespace refers to the specific namespace in which the registry pod will be created and scoped to
Namespace string

// GRPCPort is the container grpc port which is defaulted to 50051
// GRPCPort is the container grpc port
GRPCPort int32

// client refers to a controller runtime client
Expand Down Expand Up @@ -259,9 +259,9 @@ func (rp *RegistryPod) podForBundleRegistry() (*corev1.Pod, error) {
// getContainerCmd uses templating to construct the container command
// and throws error if unable to parse and execute the container command
func (rp *RegistryPod) getContainerCmd() (string, error) {
const containerCommand = "/bin/mkdir -p {{ .DBPath | basename }} &&" +
"/bin/opm registry add -d {{ .DBPath | basename }} -b {{.BundleImage}} --mode={{.BundleAddMode}} &&" +
"/bin/opm registry serve -d {{ .DBPath | basename }} -p {{.GRPCPort}}"
const containerCommand = "/bin/mkdir -p {{ .DBPath }} &&" +
"/bin/opm registry add -d {{ .DBPath }} -b {{.BundleImage}} --mode={{.BundleAddMode}} &&" +
"/bin/opm registry serve -d {{ .DBPath }} -p {{.GRPCPort}}"
type bundleCmd struct {
BundleImage, DBPath, BundleAddMode string
GRPCPort int32
Expand All @@ -272,14 +272,9 @@ func (rp *RegistryPod) getContainerCmd() (string, error) {

out := &bytes.Buffer{}

// create a custom basename template function
funcMap := template.FuncMap{
"basename": path.Base,
}

// add the custom basename template function to the
// template's FuncMap and parse the containerCommand
tmp := template.Must(template.New("containerCommand").Funcs(funcMap).Parse(containerCommand))
tmp := template.Must(template.New("containerCommand").Parse(containerCommand))

// execute the command by applying the parsed tmp to command
// and write command output to out
Expand All @@ -289,3 +284,11 @@ func (rp *RegistryPod) getContainerCmd() (string, error) {

return out.String(), nil
}

// GetPod returns the registry pod
func (rp *RegistryPod) GetPod() (*corev1.Pod, error) {
if rp == nil {
return nil, errPodNotInit
}
return rp.pod, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package olm
package internal

import (
"context"
Expand Down Expand Up @@ -43,9 +43,9 @@ var _ = Describe("RegistryPod", func() {

Context("with valid registry pod values", func() {
expectedPodName := "quay-io-example-example-operator-bundle-0-2-0"
expectedOutput := "/bin/mkdir -p index.db &&" +
"/bin/opm registry add -d index.db -b quay.io/example/example-operator-bundle:0.2.0 --mode=semver &&" +
"/bin/opm registry serve -d index.db -p 50051"
expectedOutput := "/bin/mkdir -p /database/index.db &&" +
"/bin/opm registry add -d /database/index.db -b quay.io/example/example-operator-bundle:0.2.0 --mode=semver &&" +
"/bin/opm registry serve -d /database/index.db -p 50051"

var rp *RegistryPod
var err error
Expand Down