Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 36 additions & 65 deletions internal/olm/operator/internal/registry_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ import (
"text/template"
"time"

"github.com/operator-framework/operator-sdk/internal/flags"
"github.com/operator-framework/operator-sdk/internal/util/k8sutil"

"github.com/spf13/viper"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
Comment thread
rashmigottipati marked this conversation as resolved.

"github.com/operator-framework/operator-sdk/internal/util/k8sutil"
)

// BundleAddModeType - type of BundleAddMode in RegistryPod struct
Expand All @@ -51,6 +49,11 @@ const (
defaultGRPCPort = 50051
)

var (
// Internal error
errPodNotInit = errors.New("internal error: RegistryPod not initialized")
)

// RegistryPod holds resources necessary for creation of a registry server
type RegistryPod struct {
// BundleAddMode specifies the graph update mode that defines how channel graphs are updated
Expand All @@ -71,18 +74,18 @@ type RegistryPod struct {
// Namespace refers to the specific namespace in which the registry pod will be created and scoped to
Namespace string

// Kubeclient refers to a Kubernetes clientset that implements kubernetes.Interface.
Kubeclient kubernetes.Interface

// GRPCPort is the container grpc port which is defaulted to 50051
GRPCPort int32

// client refers to a controller runtime client
client client.Client

// pod represents a kubernetes *corev1.pod that will be created on a cluster using an index image
pod *corev1.Pod
}

// NewRegistryPod initializes the RegistryPod struct and sets defaults for empty fields
func NewRegistryPod(kubeclient kubernetes.Interface, dbPath, bundleImage, namespace string) (*RegistryPod, error) {
func NewRegistryPod(client client.Client, dbPath, bundleImage, namespace string) (*RegistryPod, error) {
rp := &RegistryPod{}

if rp.GRPCPort == 0 {
Expand All @@ -101,7 +104,7 @@ func NewRegistryPod(kubeclient kubernetes.Interface, dbPath, bundleImage, namesp
}
}

rp.Kubeclient = kubeclient
rp.client = client
rp.DBPath = dbPath
rp.BundleImage = bundleImage
rp.Namespace = namespace
Expand All @@ -125,58 +128,49 @@ func NewRegistryPod(kubeclient kubernetes.Interface, dbPath, bundleImage, namesp
// and returns error
func (rp *RegistryPod) Create(ctx context.Context) error {
if rp.pod == nil {
return errors.New("internal error: uninitialized RegistryPod cannot be used")
return errPodNotInit
}
var (
pod *corev1.Pod
err error
)

// Check if registry pod already exists
if pod, err = rp.Kubeclient.CoreV1().Pods(rp.pod.Namespace).Get(ctx,
rp.pod.Name, metav1.GetOptions{}); err != nil {
// if error exists and the error is due to pod not found, then create a new pod

podKey, err := client.ObjectKeyFromObject(rp.pod)
if err != nil {
return fmt.Errorf("error in getting object key from the registry pod name %s: %v", rp.pod.Name, err)
}

if err := rp.client.Get(ctx, podKey, rp.pod); err != nil {
if k8serrors.IsNotFound(err) {
// create registry pod in kubernetes cluster
if pod, err = rp.Kubeclient.CoreV1().Pods(rp.pod.Namespace).Create(ctx,
rp.pod, metav1.CreateOptions{}); err != nil {
if err = rp.client.Create(ctx, rp.pod); err != nil {
return fmt.Errorf("error creating registry pod: %v", err)
}
// assign rp.pod to the newly created pod
rp.pod = pod
} else {
return fmt.Errorf("error getting existing registry pod: %v", err)
return fmt.Errorf("registry pod name %s already exists: %v", rp.pod.Name, err)
}
} else {
// if an existing pod matching rp.pod.Name is found, assign rp.pod to the existing pod
rp.pod = pod
}

return nil
}

// VerifyPodRunning calls checkPodStatus to verify pod status
// and returns error if pod is not running
func (rp *RegistryPod) VerifyPodRunning(ctx context.Context) error {
if rp.pod == nil {
return errPodNotInit
}

podKey, err := client.ObjectKeyFromObject(rp.pod)
if err != nil {
return fmt.Errorf("error in getting object key from the registry pod name %s: %v", rp.pod.Name, err)
}

// upon creation of new pod, poll and verify that pod status is running
podCheck := wait.ConditionFunc(func() (done bool, err error) {
p, err := rp.Kubeclient.CoreV1().Pods(rp.pod.Namespace).Get(ctx,
rp.pod.Name, metav1.GetOptions{})
err = rp.client.Get(ctx, podKey, rp.pod)
if err != nil {
return false, fmt.Errorf("error getting pod %s: %w", rp.pod.Name, err)
}
return p.Status.Phase == corev1.PodRunning, nil
return rp.pod.Status.Phase == corev1.PodRunning, nil
})

// check pod status to be Running
if err := rp.checkPodStatus(ctx, podCheck); err != nil {
podLogs, logErr := rp.GetLogs(ctx)
if logErr != nil {
return fmt.Errorf("error verifying pod creation: %v: and fetching logs: %v", err, logErr)
}
if viper.GetBool(flags.VerboseOpt) && podLogs != "" {
fmt.Println(podLogs)
}
return fmt.Errorf("registry pod did not become ready: %w", err)
}
return nil
Expand Down Expand Up @@ -237,7 +231,7 @@ func (rp *RegistryPod) podForBundleRegistry() (*corev1.Pod, error) {
}

// make the pod definition
pod := &corev1.Pod{
rp.pod = &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: getPodName(rp.BundleImage),
Namespace: rp.Namespace,
Expand All @@ -260,7 +254,7 @@ func (rp *RegistryPod) podForBundleRegistry() (*corev1.Pod, error) {
},
}

return pod, nil
return rp.pod, nil
}

// getContainerCmd uses templating to construct the container command
Expand Down Expand Up @@ -296,26 +290,3 @@ func (rp *RegistryPod) getContainerCmd() (string, error) {

return out.String(), nil
}

// GetLogs gets the logs for the registry pod
// and throws error if failed to get pod logs
func (rp *RegistryPod) GetLogs(ctx context.Context) (string, error) {
if rp.pod == nil {
return "", errors.New("a registry pod must be created before getting pod logs")
}

// get the logs of rp.pod.Name
req := rp.Kubeclient.CoreV1().Pods(rp.pod.Namespace).GetLogs(rp.pod.Name, &corev1.PodLogOptions{})
podLogs, err := req.Stream(ctx)
if err != nil {
return "", fmt.Errorf("failed to get logs: %v", err)
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = buf.ReadFrom(podLogs)
if err != nil {
return "", fmt.Errorf("failed to read pod logs: %v", err)
}
return buf.String(), nil
}
26 changes: 8 additions & 18 deletions internal/olm/operator/internal/registry_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"

"sigs.k8s.io/controller-runtime/pkg/client"
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
)

// newFakeClient() returns a clientset
func newFakeClient() kubernetes.Interface {
return fake.NewSimpleClientset()
// newFakeClient() returns a fake controller runtime client
func newFakeClient() client.Client {
return fakeclient.NewFakeClient()
}

func TestCreateRegistryPod(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Test Registry Pod Suite")
Expand Down Expand Up @@ -186,22 +188,10 @@ var _ = Describe("RegistryPod", func() {

It("Create should fail when registry pod is not initialized", func() {
rp := RegistryPod{}
expectedErr := "internal error: uninitialized RegistryPod cannot be used"

err := rp.Create(context.Background())

Expect(err).NotTo(BeNil())
Expect(err.Error()).Should(ContainSubstring(expectedErr))
})

It("should not be able to get pod logs if pod is not initialized", func() {
rp := RegistryPod{}
expectedErr := "a registry pod must be created before getting pod logs"

_, err := rp.GetLogs(context.Background())

Expect(err).ToNot(BeNil())
Expect(err.Error()).Should(ContainSubstring(expectedErr))
Expect(err).To(MatchError(errPodNotInit))
})

// todo(rashmigottipati): add test to check VerifyPodRunning returning error
Expand Down