Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 172 additions & 84 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions crd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ stackable-opa-crd = { git = "https://github.com/stackabletech/opa-operator.git",
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", branch = "main" }
stackable-zookeeper-crd = { git = "https://github.com/stackabletech/zookeeper-operator.git", branch = "main"}

k8s-openapi = { version = "0.12", default-features = false, features = ["v1_20"] }
kube = { version = "0.57", default-features = false, features = ["jsonpatch", "derive"] }
kube-runtime = "0.57"
k8s-openapi = { version = "0.12", default-features = false }
kube = { version = "0.58", default-features = false, features = ["jsonpatch", "derive"] }
kube-runtime = "0.58"
schemars = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" # Needed by the CustomResource annotation
Expand All @@ -22,7 +22,7 @@ strum = "0.21"
strum_macros = "0.21"

[dev-dependencies]
k8s-openapi = { version = "0.12", default-features = false, features = ["v1_20"] }
k8s-openapi = { version = "0.12", default-features = false, features = ["v1_21"] }
serde_yaml = "0.8"
rstest = "0.11"

Expand Down
10 changes: 0 additions & 10 deletions crd/src/bin/generate_crds.rs

This file was deleted.

2 changes: 1 addition & 1 deletion deploy/crd/kafkacluster.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ spec:
replicas:
format: uint16
minimum: 0.0
nullable: true
type: integer
selector:
description: A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects.
Expand Down Expand Up @@ -122,7 +123,6 @@ spec:
type: object
type: object
required:
- replicas
- selector
type: object
type: object
Expand Down
6 changes: 3 additions & 3 deletions operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ stackable-zookeeper-crd = { git = "https://github.com/stackabletech/zookeeper-op

async-trait = "0.1"
futures = "0.3"
k8s-openapi = { version = "0.12", default-features = false, features = ["v1_20"] }
kube = { version = "0.57", default-features = false, features = ["jsonpatch"] }
kube-runtime = "0.57"
k8s-openapi = { version = "0.12", default-features = false }
kube = { version = "0.58", default-features = false, features = ["jsonpatch"] }
kube-runtime = "0.58"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
strum = "0.21"
Expand Down
14 changes: 3 additions & 11 deletions operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use stackable_operator::builder::{
use stackable_operator::client::Client;
use stackable_operator::controller::{Controller, ControllerStrategy, ReconciliationState};
use stackable_operator::error::OperatorResult;
use stackable_operator::k8s_utils;
use stackable_operator::labels::{
build_common_labels_for_all_managed_resources, get_recommended_labels, APP_COMPONENT_LABEL,
APP_INSTANCE_LABEL, APP_MANAGED_BY_LABEL, APP_NAME_LABEL, APP_VERSION_LABEL,
Expand All @@ -36,7 +37,6 @@ use stackable_operator::role_utils::{
find_nodes_that_fit_selectors, get_role_and_group_labels,
list_eligible_nodes_for_role_and_group, EligibleNodesForRoleAndGroup,
};
use stackable_operator::{cli, k8s_utils};
use stackable_zookeeper_crd::util::ZookeeperConnectionInformation;
use std::collections::{BTreeMap, HashMap};
use std::future::Future;
Expand Down Expand Up @@ -552,7 +552,7 @@ pub fn validated_product_config(
)
}

pub async fn create_controller(client: Client) -> OperatorResult<()> {
pub async fn create_controller(client: Client, product_config_path: &str) -> OperatorResult<()> {
let kafka_api: Api<KafkaCluster> = client.get_all_api();
let pods_api: Api<Pod> = client.get_all_api();
let config_maps_api: Api<ConfigMap> = client.get_all_api();
Expand All @@ -561,15 +561,7 @@ pub async fn create_controller(client: Client) -> OperatorResult<()> {
.owns(pods_api, ListParams::default())
.owns(config_maps_api, ListParams::default());

let product_config_path = cli::product_config_path(
"kafka-operator",
vec![
"deploy/config-spec/properties.yaml",
"/etc/stackable/kafka-operator/config-spec/properties.yaml",
],
)?;

let product_config = ProductConfigManager::from_yaml_file(&product_config_path).unwrap();
let product_config = ProductConfigManager::from_yaml_file(product_config_path).unwrap();

let strategy = KafkaStrategy::new(product_config);

Expand Down
10 changes: 9 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
[package]
authors = ["Lars Francke <lars.francke@stackable.de>"]
description = "Stackable Operator for Apache Kafka - The main binary"
description = "Stackable Operator for Apache Kafka"
edition = "2018"
license = "OSL-3.0"
name = "stackable-kafka-operator-server"
version = "0.1.0-nightly"
build = "build.rs"

[dependencies]
stackable-kafka-crd = { path = "../crd" }
stackable-kafka-operator = { path = "../operator" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", branch = "main" }

clap = "2.33"
k8s-openapi = { version = "0.12", default-features = false, features = ["v1_21"] }
tokio = { version = "1.10", features = ["macros", "rt-multi-thread"] }
tracing = "0.1"

[build-dependencies]
built = { version = "0.5", features = ["chrono", "git2"] }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", branch = "main" }
stackable-kafka-crd = { path = "../crd" }

[package.metadata.deb]
maintainer-scripts = "packaging/debian/"
systemd-units = { enable = false }
Expand Down
11 changes: 11 additions & 0 deletions server/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use stackable_kafka_crd::KafkaCluster;
use stackable_operator::crd::CustomResourceExt;
use stackable_operator::error::OperatorResult;

fn main() -> OperatorResult<()> {
built::write_built_file().expect("Failed to acquire build-time information");

KafkaCluster::write_yaml_schema("../deploy/crd/kafkacluster.crd.yaml")?;

Ok(())
}
47 changes: 43 additions & 4 deletions server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,54 @@
use clap::{crate_version, App, AppSettings, SubCommand};
use stackable_kafka_crd::KafkaCluster;
use stackable_operator::crd::CustomResourceExt;
use stackable_operator::{client, error};
use tracing::{error, info};
use stackable_operator::{cli, client, error};
use tracing::error;

mod built_info {
// The file has been placed there by the build script.
include!(concat!(env!("OUT_DIR"), "/built.rs"));
}

const FIELD_MANAGER: &str = "kafka.stackable.tech";

#[tokio::main]
async fn main() -> Result<(), error::Error> {
stackable_operator::logging::initialize_logging("KAFKA_OPERATOR_LOG");

info!("Starting Stackable Operator for Apache Kafka");
stackable_operator::utils::print_startup_string(
built_info::PKG_DESCRIPTION,
built_info::PKG_VERSION,
built_info::GIT_VERSION,
built_info::TARGET,
built_info::BUILT_TIME_UTC,
built_info::RUSTC_VERSION,
);

// Handle CLI arguments
let matches = App::new(built_info::PKG_DESCRIPTION)
.author("Stackable GmbH - info@stackable.de")
.about(built_info::PKG_DESCRIPTION)
.version(crate_version!())
.arg(cli::generate_productconfig_arg())
.subcommand(
SubCommand::with_name("crd")
.setting(AppSettings::ArgRequiredElseHelp)
.subcommand(cli::generate_crd_subcommand::<KafkaCluster>()),
)
.get_matches();

if let ("crd", Some(subcommand)) = matches.subcommand() {
if cli::handle_crd_subcommand::<KafkaCluster>(subcommand)? {
return Ok(());
};
}

let paths = vec![
"deploy/config-spec/properties.yaml",
"/etc/stackable/kafka-operator/config-spec/properties.yaml",
];
let product_config_path = cli::handle_productconfig_arg(&matches, paths)?;

let client = client::create_client(Some(FIELD_MANAGER.to_string())).await?;

if let Err(error) = stackable_operator::crd::wait_until_crds_present(
Expand All @@ -23,6 +62,6 @@ async fn main() -> Result<(), error::Error> {
return Err(error);
};

stackable_kafka_operator::create_controller(client).await?;
stackable_kafka_operator::create_controller(client, &product_config_path).await?;
Ok(())
}