Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ COPY ./src ./src
# cargo build --release

RUN apt-get update && apt-get install --no-install-recommends -y libssl-dev; \
cargo build --release
cargo build --bin=console && cargo build --release

#RUN ls -la /app/target/release/ >&2

Expand All @@ -51,6 +51,7 @@ RUN mkdir ./files && chmod 0777 ./files

# copy binary and configuration files
COPY --from=builder /app/target/release/server .
COPY --from=builder /app/target/release/console .
COPY --from=builder /app/.env .
COPY --from=builder /app/configuration.yaml .
COPY --from=builder /usr/local/cargo/bin/sqlx sqlx
Expand Down
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ services:
# stackerdb:
# condition: service_healthy

# stacker_queue:
# image: trydirect/stacker:0.0.7
# container_name: stacker_queue
# restart: always
# volumes:
# - ./configuration.yaml:/app/configuration.yaml
# - ./.env:/app/.env
# environment:
# - RUST_LOG=debug
# - RUST_BACKTRACE=1
# env_file:
# - ./.env
# depends_on:
# stackerdb:
# condition: service_healthy
# entrypoint: /app/console mq listen

# stackerdb:
# container_name: stackerdb
Expand Down
20 changes: 20 additions & 0 deletions docker/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,26 @@ services:
- backend


stacker_queue:
image: trydirect/stacker:0.0.7
container_name: stacker_queue
restart: always
volumes:
- ./configuration.yaml:/app/configuration.yaml
- ./.env:/app/.env
environment:
- RUST_LOG=debug
- RUST_BACKTRACE=1
env_file:
- ./.env
depends_on:
stackerdb:
condition: service_healthy
entrypoint: /app/console mq listen
networks:
- backend


stackerdb:
container_name: stackerdb
healthcheck:
Expand Down
101 changes: 68 additions & 33 deletions src/console/commands/mq/listener.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
use crate::configuration::get_configuration;
use actix_web::rt;
use actix_web::web;
use lapin::{Channel, Queue};
use chrono::Utc;
use lapin::options::{BasicAckOptions, BasicConsumeOptions};
use lapin::types::FieldTable;
use sqlx::PgPool;
use db::deployment;
use crate::{db, helpers};
use crate::helpers::mq_manager;
use crate::db;
use crate::helpers::mq_manager::MqManager;
use futures_lite::stream::StreamExt;
use serde_derive::{Deserialize, Serialize};

pub struct ListenCommand {
}

#[derive(Serialize, Deserialize, Debug)]
struct ProgressMessage {
id: String,
deploy_id: Option<String>,
alert: i32,
message: String,
status: String,
progress: String
}

impl ListenCommand {
pub fn new() -> Self {
Self {}
Expand All @@ -31,57 +41,82 @@ impl crate::console::commands::CallableTrait for ListenCommand {

let db_pool = web::Data::new(db_pool);

println!("Declare exchange");
let mq_manager = MqManager::try_new(settings.amqp.connection_string())?;
let queue_name = "stacker_listener";
// let queue_name = "install_progress_m383emvfP9zQKs8lkgSU_Q";
// let queue_name = "install_progress_hy181TZa4DaabUZWklsrxw";
let consumer_channel= mq_manager
.consume(
"install_progress",
"install_progress_*******"
queue_name,
"install.progress.*.*.*"
)
.await?;


println!("Declare queue");
let mut consumer = consumer_channel
.basic_consume(
"install_progress",
queue_name,
"console_listener",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("Basic consume");

// .map_err(|err| format!("Error {:?}", err));
println!("Waiting for messages ..");
while let Some(delivery) = consumer.next().await {
// println!("checking messages delivery {:?}", delivery);
let delivery = delivery.expect("error in consumer");
let s:String = match String::from_utf8(delivery.data.to_owned()) {
//delivery.data is of type Vec<u8>
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};

tracing::info!("will consume");
// if let Ok(consumer) = consumer {
while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer");
delivery.ack(BasicAckOptions::default()).await.expect("ack");
}
// }
let statuses = vec![
"completed",
"paused",
"failed",
"in_progress",
"error",
"wait_resume",
"wait_start",
"confirmed"
];
match serde_json::from_str::<ProgressMessage>(&s) {
Ok(msg) => {
println!("message {:?}", s);

// while let Some(delivery) = consumer.next().await {
// tracing::debug!(message=?delivery, "received message");
// if let Ok(delivery) = delivery {
// delivery
// .ack(BasicAckOptions::default())
// .await
// .expect("basic_ack");
// }
// }
if statuses.contains(&(msg.status.as_ref())) && msg.deploy_id.is_some() {
println!("Update DB on status change ..");
let id = msg.deploy_id.unwrap()
.parse::<i32>()
.map_err(|_err| "Could not parse deployment id".to_string())?;

match deployment::fetch(
db_pool.get_ref(), id
)
.await? {
Some(mut row) => {
row.status = msg.status;
row.updated_at = Utc::now();
println!("Deployment {} updated with status {}",
&id, &row.status
);
deployment::update(db_pool.get_ref(), row).await?;
}
None => println!("Deployment record was not found in db")
}
}
}
Err(_err) => { tracing::debug!("Invalid message format {:?}", _err)}
}

// on_complete()
// let deployment = crate::models::deployment::Deployment {
// id: 0,
// project_id: 0,
// deleted: false,
// status: "".to_string(),
// body: Default::default(),
// created_at: Default::default(),
// updated_at: Default::default(),
// };
// deployment::update(db_pool.get_ref(), deployment).await?;
delivery.ack(BasicAckOptions::default()).await.expect("ack");
}

Ok(())
})
Expand Down
3 changes: 1 addition & 2 deletions src/console/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ fn get_command(cli: Cli) -> Result<Box<dyn stacker::console::commands::CallableT
AppMqCommands::Listen {} => Ok(Box::new(
stacker::console::commands::mq::ListenCommand::new(),
)),
},
_ => Err("command does not match".to_string()),
}
}
}
2 changes: 1 addition & 1 deletion src/db/cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub async fn delete(pool: &PgPool, id: i32) -> Result<bool, String> {
});
Ok(true)
}
Err(err) => {
Err(_err) => {
let _ = tx.rollback().await.map_err(|err| println!("{:?}", err));
Ok(false)
}
Expand Down
26 changes: 26 additions & 0 deletions src/db/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,32 @@ use crate::models;
use sqlx::PgPool;
use tracing::Instrument;


pub async fn fetch(pool: &PgPool, id: i32) -> Result<Option<models::Deployment>, String> {
tracing::info!("Fetch deployment {}", id);
sqlx::query_as!(
models::Deployment,
r#"
SELECT
*
FROM deployment
WHERE id=$1
LIMIT 1
"#,
id
)
.fetch_one(pool)
.await
.map(|deployment| Some(deployment))
.or_else(|err| match err {
sqlx::Error::RowNotFound => Ok(None),
e => {
tracing::error!("Failed to fetch deployment, error: {:?}", e);
Err("Could not fetch data".to_string())
}
})
}

pub async fn insert(pool: &PgPool, mut deployment: models::Deployment) -> Result<models::Deployment, String> {
let query_span = tracing::info_span!("Saving new deployment into the database");
sqlx::query!(
Expand Down
6 changes: 3 additions & 3 deletions src/db/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,14 @@ pub async fn delete(pool: &PgPool, id: i32) -> Result<bool, String> {
})
{
Ok(_) => {
tx.commit().await.map_err(|err| {
let _ = tx.commit().await.map_err(|err| {
tracing::error!("Failed to commit transaction: {:?}", err);
false
});
Ok(true)
}
Err(err) => {
tx.rollback().await.map_err(|err| println!("{:?}", err));
Err(_err) => {
let _ = tx.rollback().await.map_err(|err| println!("{:?}", err));
Ok(false)
}
// todo, when empty commit()
Expand Down
2 changes: 1 addition & 1 deletion src/db/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub async fn delete(pool: &PgPool, id: i32) -> Result<bool, String> {
});
Ok(true)
}
Err(err) => {
Err(_err) => {
let _ = tx.rollback().await.map_err(|err| println!("{:?}", err));
Ok(false)
}
Expand Down
1 change: 0 additions & 1 deletion src/forms/cloud.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::models;
use serde::{Deserialize, Serialize};
use serde_valid::Validate;
use chrono::Utc;

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize, Validate)]
pub struct Cloud {
Expand Down
2 changes: 1 addition & 1 deletion src/forms/project/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl App {


service.ports = dctypes::Ports::Long(ports);
service.restart = Some("always".to_owned());
service.restart = Some(self.restart.clone());
service.volumes = volumes;
service.environment = dctypes::Environment::KvPair(envs);

Expand Down
1 change: 0 additions & 1 deletion src/forms/project/compose_networks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use serde::{Deserialize, Serialize};
use docker_compose_types as dctypes;
use indexmap::IndexMap;
use crate::forms::project;
use crate::forms::project::network::Network;

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand Down
18 changes: 5 additions & 13 deletions src/forms/project/form.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_valid::Validate;
use actix_web::Error;
use actix_web::web::Bytes;
use crate::models;
use crate::forms;
use crate::helpers::JsonResponse;
use std::str;


Expand All @@ -24,31 +20,27 @@ impl TryFrom<&models::Project> for ProjectForm {

impl ProjectForm {
pub async fn is_readable_docker_image(&self) -> Result<bool, String> {
let mut is_active = true;
for app in &self.custom.web {
if !app.app.docker_image.is_active().await? {
is_active = false;
break;
return Ok(false);
}
}

if let Some(service) = &self.custom.service {
for app in service {
if !app.app.docker_image.is_active().await? {
is_active = false;
break;
return Ok(false);
}
}
}

if let Some(features) = &self.custom.feature {
for app in features {
if !app.app.docker_image.is_active().await? {
is_active = false;
break;
return Ok(false);
}
}
}
Ok(is_active)
Ok(true)
}
}
}
2 changes: 0 additions & 2 deletions src/forms/project/network.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use serde::{Deserialize, Serialize};
use serde_valid::Validate;
use crate::forms::project;
use docker_compose_types as dctypes;
use indexmap::IndexMap;
use crate::forms::project::NetworkDriver;


Expand Down
4 changes: 2 additions & 2 deletions src/forms/project/payload.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::convert::TryFrom;
use crate::models;
use crate::forms;
use serde_json::Value;
use serde::{Deserialize, Serialize};
use serde_valid::Validate;

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize, Validate)]
#[serde(rename_all = "snake_case")]
pub struct Payload {
pub(crate) id: Option<i32>,
pub(crate) project_id: Option<i32>,
pub(crate) user_token: Option<String>,
pub(crate) user_email: Option<String>,
#[serde(flatten)]
Expand All @@ -31,7 +31,7 @@ impl TryFrom<&models::Project> for Payload {
format!("{:?}", err)
})?;

project_data.id = Some(project.id.clone());
project_data.project_id = Some(project.id);

Ok(project_data)
}
Expand Down
Loading