Skip to content
Merged
202 changes: 117 additions & 85 deletions ocp_resources/datavolume.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from ocp_resources.utils.constants import (
TIMEOUT_1MINUTE,
TIMEOUT_2MINUTES,
Expand All @@ -10,6 +12,11 @@
from timeout_sampler import TimeoutExpiredError, TimeoutSampler
from warnings import warn

from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
from ocp_resources.secret import Secret


class DataVolume(NamespacedResource):
"""
Expand Down Expand Up @@ -70,69 +77,58 @@ class Status(Resource.Condition.Status):

def __init__(
self,
name=None,
namespace=None,
source=None,
size=None,
storage_class=None,
url=None,
content_type=ContentType.KUBEVIRT,
access_modes=None,
cert_configmap=None,
secret=None,
client=None,
volume_mode=None,
hostpath_node=None,
source_pvc=None,
source_namespace=None,
multus_annotation=None,
bind_immediate_annotation=None,
preallocation=None,
teardown=True,
yaml_file=None,
delete_timeout=TIMEOUT_4MINUTES,
api_name="pvc",
delete_after_completion=None,
**kwargs,
):
source: str | None = None,
source_dict: dict[str, Any] | None = None,
size: str | None = None,
storage_class: str | None = None,
url: str | None = None,
content_type: str | None = None,
access_modes: str | None = None,
volume_mode: str | None = None,
cert_configmap: str | None = None,
secret: Secret | None = None,
hostpath_node: str | None = None,
source_pvc: str | None = None,
source_namespace: str | None = None,
source_ref: dict[str, Any] | None = None,
multus_annotation: str | None = None,
bind_immediate_annotation: bool | None = None,
preallocation: bool | None = None,
api_name: str | None = "pvc",
checkpoints: list[Any] | None = None,
final_checkpoint: bool | None = None,
priority_class_name: str | None = None,
**kwargs: Any,
) -> None:
"""
DataVolume object

Args:
name (str): DataVolume name.
namespace (str): DataVolume namespace.
source (str): source of DV - upload/http/pvc/registry.
size (str): DataVolume size - format size+size unit, for example: "5Gi".
source (str, default: None): source of DV - upload/http/pvc/registry/blank.
source_dict (dict[str, Any], default: None): DataVolume.source dictionary.
size (str, default: None): DataVolume size - format size+size unit, for example: "5Gi".
storage_class (str, default: None): storage class name for DataVolume.
url (str, default: None): url for importing DV, when source is http/registry.
content_type (str, default: "kubevirt"): DataVolume content type.
access_modes (str, default: None): DataVolume access mode.
content_type (str, default: None): DataVolume content type (e.g., "kubevirt", "archive").
access_modes (str, default: None): DataVolume access mode (e.g., "ReadWriteOnce", "ReadWriteMany").
volume_mode (str, default: None): DataVolume volume mode (e.g., "Filesystem", "Block").
cert_configmap (str, default: None): name of config map for TLS certificates.
secret (Secret, default: None): to be set as secretRef.
client (DynamicClient): DynamicClient to use.
volume_mode (str, default: None): DataVolume volume mode.
hostpath_node (str, default: None): Node name to provision the DV on.
source_pvc (str, default: None): PVC name for when cloning the DV.
source_namespace (str, default: None): PVC namespace for when cloning the DV.
source_ref (dict[str, Any], default: None): SourceRef is an indirect reference to the source of data for the
requested DataVolume. Currently only "DataSource" is supported. Fields: kind (str), name (str), namespace (str)
multus_annotation (str, default: None): network nad name.
bind_immediate_annotation (bool, default: None): when WaitForFirstConsumer is set in StorageClass and DV
should be bound immediately.
bind_immediate_annotation (bool, default: None): when WaitForFirstConsumer is set in StorageClass and DV
should be bound immediately.
preallocation (bool, default: None): preallocate disk space.
teardown (bool, default: True): Indicates if this resource would need to be deleted.
yaml_file (yaml, default: None): yaml file for the resource.
delete_timeout (int, default: 4 minutes): timeout associated with delete action.
api_name (str, default: "pvc"): api used for DV, pvc/storage
delete_after_completion (str, default: None): annotation for garbage collector - "true"/"false"
api_name (str, default: "pvc"): api used for DV (e.g., "storage", "pvc").
checkpoints (list[Any], default: None): list of DataVolumeCheckpoints for snapshot operations.
final_checkpoint (bool, default: None): indicates whether the current DataVolumeCheckpoint is the final one.
priority_class_name (str, default: None): priority class name for the DataVolume pod.
"""
super().__init__(
name=name,
namespace=namespace,
client=client,
teardown=teardown,
yaml_file=yaml_file,
delete_timeout=delete_timeout,
**kwargs,
)
super().__init__(**kwargs)
self.source = source
self.url = url
self.cert_configmap = cert_configmap
Expand All @@ -145,62 +141,98 @@ def __init__(
self.hostpath_node = hostpath_node
self.source_pvc = source_pvc
self.source_namespace = source_namespace
self.source_dict = source_dict
self.source_ref = source_ref
self.multus_annotation = multus_annotation
self.bind_immediate_annotation = bind_immediate_annotation
self.preallocation = preallocation
self.api_name = api_name
self.delete_after_completion = delete_after_completion
self.checkpoints = checkpoints
self.final_checkpoint = final_checkpoint
self.priority_class_name = priority_class_name

def to_dict(self) -> None:
super().to_dict()
if not self.kind_dict and not self.yaml_file:
self.res.update({
"spec": {
"source": {self.source: {"url": self.url}},
self.api_name: {
"resources": {"requests": {"storage": self.size}},
},
}
})
if self.access_modes:
self.res["spec"][self.api_name]["accessModes"] = [self.access_modes]
if self.content_type:
self.res["spec"]["contentType"] = self.content_type
if self.storage_class:
self.res["spec"][self.api_name]["storageClassName"] = self.storage_class
if self.secret:
self.res["spec"]["source"][self.source]["secretRef"] = self.secret.name
if self.volume_mode:
self.res["spec"][self.api_name]["volumeMode"] = self.volume_mode
if self.source == "http" or "registry":
self.res["spec"]["source"][self.source]["url"] = self.url
if self.cert_configmap:
self.res["spec"]["source"][self.source]["certConfigMap"] = self.cert_configmap
if self.source == "upload" or self.source == "blank":
self.res["spec"]["source"][self.source] = {}
self.res["spec"] = {}
_spec = self.res["spec"]

if self.checkpoints is not None:
_spec["checkpoints"] = self.checkpoints

if self.content_type is not None:
_spec["contentType"] = self.content_type

if self.final_checkpoint is not None:
_spec["finalCheckpoint"] = self.final_checkpoint

if self.preallocation is not None:
_spec["preallocation"] = self.preallocation

if self.priority_class_name is not None:
_spec["priorityClassName"] = self.priority_class_name

Comment thread
jpeimer marked this conversation as resolved.
# Set api_name spec fields (pvc/storage)
if self.api_name is not None:
_spec[self.api_name] = {}

if self.access_modes is not None:
_spec[self.api_name]["accessModes"] = [self.access_modes]

if self.volume_mode is not None:
_spec[self.api_name]["volumeMode"] = self.volume_mode

if self.storage_class is not None:
_spec[self.api_name]["storageClassName"] = self.storage_class

if self.size is not None:
_spec[self.api_name]["resources"] = {"requests": {"storage": self.size}}

# Handle source configuration
if self.source_dict is not None:
_spec["source"] = self.source_dict
elif self.source is not None:
warn(
"source is deprecated and will be removed in the next version. Use source_dict instead.",
DeprecationWarning,
stacklevel=2,
)

_spec["source"] = {}
source_spec = _spec["source"]

if self.source in ["http", "registry"]:
source_spec[self.source] = {"url": self.url}
elif self.source in ["upload", "blank"]:
source_spec[self.source] = {}
elif self.source == "pvc":
source_spec[self.source] = {
"name": self.source_pvc,
"namespace": self.source_namespace or self.namespace,
}

if self.secret is not None:
source_spec[self.source]["secretRef"] = self.secret.name
if self.cert_configmap is not None:
source_spec[self.source]["certConfigMap"] = self.cert_configmap

if self.source_ref is not None:
_spec["sourceRef"] = self.source_ref

if self.hostpath_node:
self.res["metadata"].setdefault("annotations", {}).update({
f"{NamespacedResource.ApiGroup.KUBEVIRT_IO}/provisionOnNode": (self.hostpath_node)
})

if self.multus_annotation:
self.res["metadata"].setdefault("annotations", {}).update({
f"{NamespacedResource.ApiGroup.K8S_V1_CNI_CNCF_IO}/networks": (self.multus_annotation)
})

if self.bind_immediate_annotation:
self.res["metadata"].setdefault("annotations", {}).update({
f"{self.api_group}/storage.bind.immediate.requested": "true"
})
if self.source == "pvc":
self.res["spec"]["source"]["pvc"] = {
"name": self.source_pvc or "dv-source",
"namespace": self.source_namespace or self.namespace,
}
if self.preallocation is not None:
self.res["spec"]["preallocation"] = self.preallocation
if self.delete_after_completion:
self.res["metadata"].setdefault("annotations", {}).update({
f"{self.api_group}/storage.deleteAfterCompletion": (self.delete_after_completion)
})

def wait_deleted(self, timeout=TIMEOUT_4MINUTES):
"""
Expand Down