diff --git a/test/orchestratord/mzcompose.py b/test/orchestratord/mzcompose.py index 045b2e64a7968..8139309fd31b1 100644 --- a/test/orchestratord/mzcompose.py +++ b/test/orchestratord/mzcompose.py @@ -1566,32 +1566,10 @@ def workflow_documentation_defaults( with open(os.path.join(dir, file), "wb") as f: f.write(response.content) - spawn.runv( - [ - "helm", - "repo", - "add", - "materialize", - "https://materializeinc.github.io/materialize", - ] - ) - spawn.runv(["helm", "repo", "update", "materialize"]) - spawn.runv( - [ - "helm", - "install", - "my-materialize-operator", - MZ_ROOT / "misc" / "helm-charts" / "operator", - "--namespace=materialize", - "--create-namespace", - "--version", - "v26.0.0", - "--set", - "observability.podMetrics.enabled=true", - "-f", - os.path.join(dir, "sample-values.yaml"), - ] - ) + with open(os.path.join(dir, "sample-values.yaml")) as f: + sample_values = yaml.load(f, Loader=yaml.Loader) + helm_install_operator(sample_values, upgrade=False) + spawn.runv( ["kubectl", "apply", "-f", os.path.join(dir, "sample-postgres.yaml")] ) @@ -1619,28 +1597,8 @@ def workflow_documentation_defaults( "args={--kubelet-insecure-tls,--kubelet-preferred-address-types=InternalIP,Hostname,ExternalIP}", ] ) - for i in range(120): - try: - spawn.capture( - [ - "kubectl", - "get", - "crd", - "materializes.materialize.cloud", - "-n", - "materialize", - "-o", - "name", - ], - stderr=subprocess.DEVNULL, - ) - break - except subprocess.CalledProcessError: - pass - time.sleep(1) - else: - raise ValueError("Never completed") + wait_for_crd_established() with open(os.path.join(dir, "sample-materialize.yaml")) as f: materialize_setup = list(yaml.load_all(f, Loader=yaml.Loader)) @@ -2113,24 +2071,7 @@ def check(): for version in versions[-2:]: print(f"running orchestratord {version}") definition["operator"]["operator"]["image"]["tag"] = str(version) - spawn.runv( - [ - "helm", - "upgrade", - "operator", - MZ_ROOT / "misc" / "helm-charts" / "operator", - "--namespace=materialize", - "--create-namespace", - "--version", - "v26.0.0", - "--wait", - "-f", - "-", - ], - stdin=yaml.dump(definition["operator"]).encode(), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) + helm_install_operator(definition["operator"], upgrade=True) check_orchestratord_version(version) print(f"running environmentd {version}") @@ -2165,24 +2106,7 @@ def check(): print(f"running orchestratord {versions[-1]}") definition["operator"]["operator"]["image"]["tag"] = str(versions[-1]) - spawn.runv( - [ - "helm", - "upgrade", - "operator", - MZ_ROOT / "misc" / "helm-charts" / "operator", - "--namespace=materialize", - "--create-namespace", - "--version", - "v26.0.0", - "--wait", - "-f", - "-", - ], - stdin=yaml.dump(definition["operator"]).encode(), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) + helm_install_operator(definition["operator"], upgrade=True) check_orchestratord_version(versions[-1]) print(f"running environmentd {versions[-1]}") @@ -2583,7 +2507,10 @@ def run_scenario( run(definition, expect_fail) initialize = False # only initialize once else: - upgrade_operator_helm_chart(definition, expect_fail) + helm_install_operator( + values=definition["operator"], + upgrade=True, + ) definition["materialize"]["spec"]["requestRollout"] = str(uuid.uuid4()) run(definition, expect_fail) mod_dict = {mod.__class__: mod.value for mod in mods} @@ -2604,6 +2531,58 @@ def run_scenario( run_mz_debug() +def is_prerelease_tag(tag: str) -> bool: + version = Version.parse(tag.removeprefix("v")) + if version.prerelease is not None: + return True + if version.build is not None: + return True + return False + + +def helm_install_operator( + values: dict[str, Any], + upgrade: bool, +): + tag = values.get("operator", {}).get("image", {}).get("tag") + helm_release_version = tag or "dev" + chart_path = MZ_ROOT / "misc" / "helm-charts" / "operator" + + # If installing existing released versions of orchestratord, + # we should use the corresponding helm chart. + if not is_prerelease_tag(tag or "v26.0.0"): + chart_path = "materialize/materialize-operator" + spawn.runv( + [ + "helm", + "repo", + "add", + "materialize", + "https://materializeinc.github.io/materialize", + ] + ) + spawn.runv(["helm", "repo", "update", "materialize"]) + + operation = "upgrade" if upgrade else "install" + + spawn.runv( + [ + "helm", + operation, + "operator", + chart_path, + "--namespace=materialize", + "--create-namespace", + "--version", + helm_release_version, + "--wait", + "-f", + "-", + ], + stdin=yaml.dump(values).encode(), + ) + + def init(definition: dict[str, Any]) -> None: try: spawn.capture( @@ -2619,66 +2598,50 @@ def init(definition: dict[str, Any]) -> None: ) except subprocess.CalledProcessError: pass - spawn.runv( - [ - "helm", - "install", - "operator", - MZ_ROOT / "misc" / "helm-charts" / "operator", - "--namespace=materialize", - "--create-namespace", - "--version", - "v26.0.0", - "--wait", - "-f", - "-", - ], - stdin=yaml.dump(definition["operator"]).encode(), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, + + helm_install_operator( + values=definition["operator"], + upgrade=False, ) - for i in range(240): + wait_for_crd_established() + + +def wait_for_crd_established(): + for _ in range(240): try: - spawn.capture( - [ - "kubectl", - "get", - "crd", - "materializes.materialize.cloud", - "-n", - "materialize", - "-o", - "name", - ], - stderr=subprocess.DEVNULL, + crd = json.loads( + spawn.capture( + [ + "kubectl", + "get", + "crd", + "materializes.materialize.cloud", + "-n", + "materialize", + "-o", + "json", + ], + stderr=subprocess.DEVNULL, + ) + ) + conditions = crd.get("status", {}).get("conditions", []) + established_condition = next( + ( + condition + for condition in conditions + if condition["type"] == "Established" + ), + {"status": "False"}, ) - break + if established_condition["status"] == "True": + break except subprocess.CalledProcessError: pass time.sleep(1) else: - raise ValueError("Never completed") - - -def upgrade_operator_helm_chart(definition: dict[str, Any], expect_fail: bool) -> None: - spawn.runv( - [ - "helm", - "upgrade", - "operator", - MZ_ROOT / "misc" / "helm-charts" / "operator", - "--namespace=materialize", - "--version", - "v26.0.0", - "-f", - "-", - ], - stdin=yaml.dump(definition["operator"]).encode(), - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) + raise ValueError("CRD never became 'Established'") def run(definition: dict[str, Any], expect_fail: bool) -> None: