Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 98 additions & 135 deletions test/orchestratord/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -1566,32 +1566,10 @@ def workflow_documentation_defaults(
with open(os.path.join(dir, file), "wb") as f:
f.write(response.content)

spawn.runv(
[
"helm",
"repo",
"add",
"materialize",
"https://materializeinc.github.io/materialize",
]
)
spawn.runv(["helm", "repo", "update", "materialize"])
spawn.runv(
[
"helm",
"install",
"my-materialize-operator",
MZ_ROOT / "misc" / "helm-charts" / "operator",
"--namespace=materialize",
"--create-namespace",
"--version",
"v26.0.0",
"--set",
"observability.podMetrics.enabled=true",
"-f",
os.path.join(dir, "sample-values.yaml"),
]
)
with open(os.path.join(dir, "sample-values.yaml")) as f:
sample_values = yaml.load(f, Loader=yaml.Loader)
helm_install_operator(sample_values, upgrade=False)

spawn.runv(
["kubectl", "apply", "-f", os.path.join(dir, "sample-postgres.yaml")]
)
Expand Down Expand Up @@ -1619,28 +1597,8 @@ def workflow_documentation_defaults(
"args={--kubelet-insecure-tls,--kubelet-preferred-address-types=InternalIP,Hostname,ExternalIP}",
]
)
for i in range(120):
try:
spawn.capture(
[
"kubectl",
"get",
"crd",
"materializes.materialize.cloud",
"-n",
"materialize",
"-o",
"name",
],
stderr=subprocess.DEVNULL,
)
break

except subprocess.CalledProcessError:
pass
time.sleep(1)
else:
raise ValueError("Never completed")
wait_for_crd_established()

with open(os.path.join(dir, "sample-materialize.yaml")) as f:
materialize_setup = list(yaml.load_all(f, Loader=yaml.Loader))
Expand Down Expand Up @@ -2113,24 +2071,7 @@ def check():
for version in versions[-2:]:
print(f"running orchestratord {version}")
definition["operator"]["operator"]["image"]["tag"] = str(version)
spawn.runv(
[
"helm",
"upgrade",
"operator",
MZ_ROOT / "misc" / "helm-charts" / "operator",
"--namespace=materialize",
"--create-namespace",
"--version",
"v26.0.0",
"--wait",
"-f",
"-",
],
stdin=yaml.dump(definition["operator"]).encode(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
helm_install_operator(definition["operator"], upgrade=True)
check_orchestratord_version(version)

print(f"running environmentd {version}")
Expand Down Expand Up @@ -2165,24 +2106,7 @@ def check():

print(f"running orchestratord {versions[-1]}")
definition["operator"]["operator"]["image"]["tag"] = str(versions[-1])
spawn.runv(
[
"helm",
"upgrade",
"operator",
MZ_ROOT / "misc" / "helm-charts" / "operator",
"--namespace=materialize",
"--create-namespace",
"--version",
"v26.0.0",
"--wait",
"-f",
"-",
],
stdin=yaml.dump(definition["operator"]).encode(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
helm_install_operator(definition["operator"], upgrade=True)
check_orchestratord_version(versions[-1])

print(f"running environmentd {versions[-1]}")
Expand Down Expand Up @@ -2583,7 +2507,10 @@ def run_scenario(
run(definition, expect_fail)
initialize = False # only initialize once
else:
upgrade_operator_helm_chart(definition, expect_fail)
helm_install_operator(
values=definition["operator"],
upgrade=True,
)
definition["materialize"]["spec"]["requestRollout"] = str(uuid.uuid4())
run(definition, expect_fail)
mod_dict = {mod.__class__: mod.value for mod in mods}
Expand All @@ -2604,6 +2531,58 @@ def run_scenario(
run_mz_debug()


def is_prerelease_tag(tag: str) -> bool:
version = Version.parse(tag.removeprefix("v"))
if version.prerelease is not None:
return True
if version.build is not None:
return True
return False


def helm_install_operator(
values: dict[str, Any],
upgrade: bool,
):
tag = values.get("operator", {}).get("image", {}).get("tag")
helm_release_version = tag or "dev"
chart_path = MZ_ROOT / "misc" / "helm-charts" / "operator"

# If installing existing released versions of orchestratord,
# we should use the corresponding helm chart.
if not is_prerelease_tag(tag or "v26.0.0"):
chart_path = "materialize/materialize-operator"
spawn.runv(
[
"helm",
"repo",
"add",
"materialize",
"https://materializeinc.github.io/materialize",
]
)
spawn.runv(["helm", "repo", "update", "materialize"])

operation = "upgrade" if upgrade else "install"

spawn.runv(
[
"helm",
operation,
"operator",
chart_path,
"--namespace=materialize",
"--create-namespace",
"--version",
helm_release_version,
"--wait",
"-f",
"-",
],
stdin=yaml.dump(values).encode(),
)


def init(definition: dict[str, Any]) -> None:
try:
spawn.capture(
Expand All @@ -2619,66 +2598,50 @@ def init(definition: dict[str, Any]) -> None:
)
except subprocess.CalledProcessError:
pass
spawn.runv(
[
"helm",
"install",
"operator",
MZ_ROOT / "misc" / "helm-charts" / "operator",
"--namespace=materialize",
"--create-namespace",
"--version",
"v26.0.0",
"--wait",
"-f",
"-",
],
stdin=yaml.dump(definition["operator"]).encode(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,

helm_install_operator(
values=definition["operator"],
upgrade=False,
)

for i in range(240):
wait_for_crd_established()


def wait_for_crd_established():
for _ in range(240):
try:
spawn.capture(
[
"kubectl",
"get",
"crd",
"materializes.materialize.cloud",
"-n",
"materialize",
"-o",
"name",
],
stderr=subprocess.DEVNULL,
crd = json.loads(
spawn.capture(
[
"kubectl",
"get",
"crd",
"materializes.materialize.cloud",
"-n",
"materialize",
"-o",
"json",
],
stderr=subprocess.DEVNULL,
)
)
conditions = crd.get("status", {}).get("conditions", [])
established_condition = next(
(
condition
for condition in conditions
if condition["type"] == "Established"
),
{"status": "False"},
)
break
if established_condition["status"] == "True":
break

except subprocess.CalledProcessError:
pass
time.sleep(1)
else:
raise ValueError("Never completed")


def upgrade_operator_helm_chart(definition: dict[str, Any], expect_fail: bool) -> None:
spawn.runv(
[
"helm",
"upgrade",
"operator",
MZ_ROOT / "misc" / "helm-charts" / "operator",
"--namespace=materialize",
"--version",
"v26.0.0",
"-f",
"-",
],
stdin=yaml.dump(definition["operator"]).encode(),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
raise ValueError("CRD never became 'Established'")


def run(definition: dict[str, Any], expect_fail: bool) -> None:
Expand Down