From 3a325b3454ff2eced7ba65bd838f404b477ffffc Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Boric Date: Wed, 20 Feb 2019 11:30:12 +0100 Subject: [PATCH 1/2] Implement tracking commands Signed-off-by: Jean-Baptiste Boric --- src/fty_nut_command.cc | 28 ++-- src/fty_nut_command_server.cc | 252 ++++++++++++++++++++++------------ 2 files changed, 182 insertions(+), 98 deletions(-) diff --git a/src/fty_nut_command.cc b/src/fty_nut_command.cc index 365c425a..cde8a42b 100644 --- a/src/fty_nut_command.cc +++ b/src/fty_nut_command.cc @@ -28,6 +28,8 @@ #include "fty_nut_command_server.h" +#include "nut_mlm.h" + const char *NUT_USER_ENV = "NUT_USER"; const char *NUT_PASS_ENV = "NUT_PASSWD"; @@ -91,24 +93,29 @@ int main (int argc, char *argv []) DBConn::dbpath(); log_info ("fty_nut_command "); - const char *endpoint = "ipc://@/malamute"; - zactor_t *server = zactor_new (fty_nut_command_server, (void*)endpoint); + zactor_t *server = zactor_new (fty_nut_command_server, MLM_ENDPOINT_VOID); - zstr_sendm (server, "NUT_SERVER"); + zstr_sendm (server, "CONFIGURATION"); zstr_sendm (server, nutHost.c_str()); zstr_sendm (server, nutUsername.c_str()); - zstr_send (server, nutPassword.c_str()); - - zstr_sendm (server, "DB_URL"); + zstr_sendm (server, nutPassword.c_str()); zstr_send (server, DBConn::url.c_str()); + int r = EXIT_SUCCESS; + // code from src/malamute.c, under MPL // Accept and print any message back from server while (true) { - char *message = zstr_recv (server); + ZstrGuard message (zstr_recv (server)); if (message) { - puts (message); - free (message); + if (streq (message, "NUT_CONNECTION_FAILURE")) { + log_fatal ("Agent not connected to NUT, aborting..."); + r = EXIT_FAILURE; + break; + } + else { + puts (message.get()); + } } else { puts ("interrupted"); @@ -117,6 +124,5 @@ int main (int argc, char *argv []) } zactor_destroy (&server); - - return 0; + return r; } diff --git a/src/fty_nut_command_server.cc b/src/fty_nut_command_server.cc index 97ad5df8..4bcde393 100644 --- a/src/fty_nut_command_server.cc +++ b/src/fty_nut_command_server.cc @@ -32,19 +32,32 @@ const char *COMMAND_SUBJECT = "power-actions"; const char *ACTOR_COMMAND_NAME = "fty-nut-command"; const int TIMEOUT = 5; -struct mapped_device +struct MappedDevice { - mapped_device(const std::string &n, int d) : name(n), daisy_chain(d) {} + MappedDevice(const std::string &n, int d) : name(n), daisy_chain(d) {} std::string name; int daisy_chain; }; +struct PendingCommand +{ + PendingCommand(const std::list& ids, const std::string& addr, const std::string& msgId) : trackingIDs(ids), address(addr), uuid(msgId), numCommands(ids.size()) {} + + std::list trackingIDs; + std::string address; + std::string uuid; + std::string errMsg; + int numCommands; +}; + +typedef std::list PendingCommands; + // // Helpers // -static mapped_device +static MappedDevice asset_to_mapped_device(tntdb::Connection &conn, const std::string &asset) { auto daisy_chain = DBAssets::select_daisy_chain(conn, asset); @@ -52,7 +65,7 @@ asset_to_mapped_device(tntdb::Connection &conn, const std::string &asset) throw std::runtime_error(daisy_chain.msg); } if (daisy_chain.item.size() == 0) { - return mapped_device(asset, 0); + return MappedDevice(asset, 0); } else { int daisy_number = 0; @@ -62,23 +75,12 @@ asset_to_mapped_device(tntdb::Connection &conn, const std::string &asset) break; } } - return mapped_device(daisy_chain.item.begin()->second, daisy_number); - } -} - -static void -send_reply(mlm_client_t *client, const std::string &address, const std::string &command, zmsg_t **msg) -{ - if (mlm_client_sendto(client, address.c_str(), COMMAND_SUBJECT, nullptr, TIMEOUT, msg) == 0) { - log_info("Processed request '%s' by '%s'", command.c_str(), address.c_str()); - } - else { - log_error("Failed to send reply of request '%s' to '%s'", command.c_str(), address.c_str()); + return MappedDevice(daisy_chain.item.begin()->second, daisy_number); } } static std::set -get_commands_nut(nut::Client &nut, const mapped_device &asset) +get_commands_nut(nut::Client &nut, const MappedDevice &asset) { const auto cmds = nut.getDeviceCommandNames(asset.name); @@ -101,34 +103,48 @@ get_commands_nut(nut::Client &nut, const mapped_device &asset) } static void -send_error(mlm_client_t *client, const std::string &address, const std::string &reason, const char *uuid) +send_reply(mlm_client_t *client, const std::string &address, const std::string &command, zmsg_t **msg) { - zmsg_t *error = zmsg_new(); - zmsg_addstr(error, "ERROR"); - zmsg_addstr(error, uuid); - zmsg_addstr(error, reason.c_str()); + std::string status = zmsg_popstr(*msg); + zmsg_pushstr(*msg, status.c_str()); - if (mlm_client_sendto(client, address.c_str(), COMMAND_SUBJECT, nullptr, TIMEOUT, &error) == 0) { - log_error("Sent error '%s' to '%s'", reason.c_str(), address.c_str()); + if (mlm_client_sendto(client, address.c_str(), COMMAND_SUBJECT, nullptr, TIMEOUT, msg) == 0) { + if (status == "OK") { + log_info("Processed request '%s' from '%s', status='%s'.", command.c_str(), address.c_str(), status.c_str()); + } + else { + log_error("Processed request '%s' from '%s', status='%s'.", command.c_str(), address.c_str(), status.c_str()); + } } else { - log_error("Failed to send error '%s' to '%s'", reason.c_str(), address.c_str()); + log_error("Failed to send reply of request '%s' to '%s', status='%s'.", command.c_str(), address.c_str(), status.c_str()); } } +static void +send_error(mlm_client_t *client, const std::string &address, const std::string &command, const std::string &reason, const std::string &uuid) +{ + zmsg_t *error = zmsg_new(); + zmsg_addstr(error, "ERROR"); + zmsg_addstr(error, uuid.c_str()); + zmsg_addstr(error, reason.c_str()); + + send_reply(client, address, command, &error); +} + // // Commands // static void -get_commands(nut::Client &nut, tntdb::Connection &conn, mlm_client_t *client, const std::string &address, zmsg_t *msg, const char *uuid) +get_commands(nut::Client &nut, tntdb::Connection &conn, mlm_client_t *client, const std::string &address, const std::string &uuid, zmsg_t *msg) { std::vector>> replyData; - std::map mappedDevices; + std::map mappedDevices; while (zmsg_size(msg)) { ZstrGuard asset(zmsg_popstr(msg)); - mapped_device device = asset_to_mapped_device(conn, std::string(asset)); + MappedDevice device = asset_to_mapped_device(conn, std::string(asset)); replyData.emplace_back(asset.get(), get_commands_nut(nut, device)); mappedDevices.emplace(asset.get(), device); } @@ -136,13 +152,18 @@ get_commands(nut::Client &nut, tntdb::Connection &conn, mlm_client_t *client, co // Build reply message zmsg_t *reply = zmsg_new(); zmsg_addstr(reply, "OK"); - zmsg_addstr(reply, uuid); + zmsg_addstr(reply, uuid.c_str()); + for (const auto &asset : replyData) { const auto& mappedDevice = mappedDevices.at(asset.first); const std::string pattern = mappedDevice.daisy_chain ? std::string("device.") + std::to_string(mappedDevice.daisy_chain) + "." : ""; + + // Push asset name. zmsg_addstr(reply, "ASSET"); zmsg_addstr(reply, asset.first.c_str()); + for (const auto &commands : asset.second) { + // Push commands of asset. zmsg_addstr(reply, commands.c_str()); zmsg_addstr(reply, nut.getDeviceCommandDescription(mappedDevice.name, pattern+commands).c_str()); } @@ -152,45 +173,80 @@ get_commands(nut::Client &nut, tntdb::Connection &conn, mlm_client_t *client, co } static void -do_commands(nut::Client &nut, tntdb::Connection &conn, mlm_client_t *client, const std::string &address, zmsg_t *msg, const char *uuid) +do_commands(nut::Client &nut, tntdb::Connection &conn, mlm_client_t *client, const std::string &address, const std::string &uuid, zmsg_t *msg, PendingCommands &pendingCommands) { ZstrGuard asset(zmsg_popstr(msg)); - if (!asset) { - throw std::runtime_error("INVALID_REQUEST"); - } - auto device = asset_to_mapped_device(conn, std::string(asset)); - auto valid_commands = get_commands_nut(nut, device); if (zmsg_size(msg) % 2) { throw std::runtime_error("INVALID_REQUEST"); } - std::vector> commands; + std::list trackingIDs; + while (zmsg_size(msg)) { ZstrGuard cmd(zmsg_popstr(msg)); ZstrGuard data(zmsg_popstr(msg)); - // Check if command is known in advance (try to prevent failures halfway) - if (!valid_commands.count(cmd.get())) { - throw std::runtime_error("CMD-NOT-SUPPORTED"); - } - // XXX: we don't handle arguments just yet - if (!streq(data, "")) { - throw std::runtime_error("CMD-ARGS-NOT-YET-SUPPORTED"); - } + auto device = asset_to_mapped_device(conn, std::string(asset)); + const auto prefix = device.daisy_chain ? std::string("device.") + std::to_string(device.daisy_chain) + "." : ""; + trackingIDs.emplace_back(nut.executeDeviceCommand(device.name, prefix + cmd.get(), data.get())); + } - commands.emplace_back(cmd.get(), data.get()); + // Store pending command data for further processing. + pendingCommands.emplace_back(trackingIDs, address, uuid); + log_debug("Sent %d NUT commands, correlation ID='%s'.", trackingIDs.size(), uuid.c_str()); +} + +static void +treat_pending_commands(nut::Client &nutClient, mlm_client_t *client, PendingCommands &pendingCommands) +{ + // Check if any pending commands are completed. + for (auto &command : pendingCommands) { + auto treated = std::remove_if(command.trackingIDs.begin(), command.trackingIDs.end(), + [&command, &nutClient](nut::TrackingID &id) -> bool { + try { + switch (nutClient.getTrackingResult(id)) { + case nut::FAILURE: + command.errMsg = "Failure to execute command"; + return true; + case nut::SUCCESS: + return true; + default: + return false; + } + } + catch (nut::NutException &e) { + command.errMsg = e.what(); + return true; + } + }); + + command.trackingIDs.erase(treated, command.trackingIDs.end()); + log_debug("Reclaimed %d/%d results from NUT command, correlation ID ='%s'.", command.numCommands - command.trackingIDs.size(), command.numCommands, command.uuid.c_str()); } - const std::string prefix = device.daisy_chain ? std::string("device.") + std::to_string(device.daisy_chain) + "." : ""; - for (const auto &i : commands) { - nut.executeDeviceCommand(device.name, prefix + i.first); + // Check if any pending requests are completed. + auto treated = std::remove_if(pendingCommands.begin(), pendingCommands.end(), + [](PendingCommand &c) -> bool { + return c.trackingIDs.empty(); + } + ); + + // Send results of completed requests. + for (auto it = treated; it != pendingCommands.end(); it++) { + if (it->errMsg.empty()) { + zmsg_t *reply = zmsg_new(); + zmsg_addstr(reply, "OK"); + zmsg_addstr(reply, it->uuid.c_str()); + send_reply(client, it->address, "DO_COMMANDS", &reply); + } + else { + send_error(client, it->address, "DO_COMMANDS", it->errMsg, it->uuid.c_str()); + } } - zmsg_t *reply = zmsg_new(); - zmsg_addstr(reply, "OK"); - zmsg_addstr(reply, uuid); - send_reply(client, address, "DO_COMMANDS", &reply); + // Purge completed commands from memory. + pendingCommands.erase(treated, pendingCommands.end()); } // @@ -200,6 +256,8 @@ do_commands(nut::Client &nut, tntdb::Connection &conn, mlm_client_t *client, con void fty_nut_command_server(zsock_t *pipe, void *args) { + PendingCommands pendingCommands; + std::string dbURL; std::string nutHost = "localhost"; std::string nutUsername; @@ -217,11 +275,16 @@ fty_nut_command_server(zsock_t *pipe, void *args) return; } + int64_t lastTrackingCheck = zclock_mono(); + int64_t lastPingPong = zclock_mono(); + nut::TcpClient nutClient; + // Enter mainloop ZpollerGuard poller(zpoller_new(pipe, mlm_client_msgpipe(client), nullptr)); zsock_signal(pipe, 0); + while (!zsys_interrupted) { - void *which = zpoller_wait(poller, -1); + void *which = zpoller_wait(poller, pendingCommands.empty() ? 15000 : 250); if (zsys_interrupted) { break; } @@ -229,25 +292,33 @@ fty_nut_command_server(zsock_t *pipe, void *args) ZmsgGuard msg(zmsg_recv(pipe)); ZstrGuard actor_command(zmsg_popstr(msg)); - // $TERM actor command implementation is required by zactor_t interface + // $TERM actor command implementation is required by zactor_t interface if (streq(actor_command, "$TERM")) { return; } - else if (streq(actor_command, "NUT_SERVER")) { - ZstrGuard host(zmsg_popstr(msg)); - ZstrGuard username(zmsg_popstr(msg)); - ZstrGuard password(zmsg_popstr(msg)); - nutHost = host.get(); - nutUsername = username.get(); - nutPassword = password.get(); - log_info("NUT server '%s' configured", host.get()); - continue; - } - else if (streq(actor_command, "DB_URL")) { - ZstrGuard databaseURL(zmsg_popstr(msg)); - dbURL = databaseURL.get(); - log_info("Database URL configured"); - continue; + else if (streq(actor_command, "CONFIGURATION")) { + // Configure agent + try { + ZstrGuard host(zmsg_popstr(msg)); + ZstrGuard username(zmsg_popstr(msg)); + ZstrGuard password(zmsg_popstr(msg)); + nutHost = host.get(); + nutUsername = username.get(); + nutPassword = password.get(); + + nutClient.connect(nutHost); + log_info("Connected to NUT server '%s'", host.get()); + nutClient.authenticate(nutUsername, nutPassword); + log_info("Authenticated to NUT server '%s' as '%s'", host.get(), username.get()); + nutClient.setFeature(nut::Client::TRACKING, true); + + ZstrGuard databaseURL(zmsg_popstr(msg)); + dbURL = databaseURL.get(); + log_info("Database URL configured"); + } + catch (nut::NutException &e) { + zstr_send(pipe, "NUT_CONNECTION_FAILURE"); + } } else { log_error("Unrecognized pipe request '%s'", actor_command.get()); @@ -263,7 +334,7 @@ fty_nut_command_server(zsock_t *pipe, void *args) continue; } if (zmsg_size(msg) < 2) { - log_error("Message doesn't have UUID and command fields, ignoring message", mlm_client_subject(client)); + log_error("Message doesn't have correlation id and command fields, ignoring message", mlm_client_subject(client)); continue; } @@ -272,25 +343,20 @@ fty_nut_command_server(zsock_t *pipe, void *args) ZstrGuard uuid(zmsg_popstr(msg)); try { - // Connect to NUT server - nut::TcpClient nutClient; - nutClient.connect(nutHost); - log_info("Connected to NUT server"); - nutClient.authenticate(nutUsername, nutPassword); - log_info("Authenticated to NUT server"); - // Connect to database tntdb::Connection conn = tntdb::connectCached(dbURL); - log_info("Received request '%s' from '%s', UUID='%s'", action.get(), sender, uuid.get()); + log_info("Received request '%s' from '%s', correlation id='%s'", action.get(), sender, uuid.get()); - if (streq(action.get(), "GET_COMMANDS")) { - get_commands(nutClient, conn, client, sender, msg, uuid.get()); + if (streq(action, "GET_COMMANDS")) { + get_commands(nutClient, conn, client, sender, uuid.get(), msg); + lastPingPong = zclock_mono(); } - else if (streq(action.get(), "DO_COMMANDS")) { - do_commands(nutClient, conn, client, sender, msg, uuid.get()); + else if (streq(action, "DO_COMMANDS")) { + do_commands(nutClient, conn, client, sender, uuid.get(), msg, pendingCommands); + lastPingPong = zclock_mono(); } - else if (streq(action.get(), "ERROR")) { + else if (streq(action, "ERROR")) { ZstrGuard desc(zmsg_popstr(msg)); log_error("Received error message with payload '%s' '%s', ignoring", uuid ? uuid.get() : "(null)", desc ? desc.get() : "(null)"); } @@ -299,12 +365,24 @@ fty_nut_command_server(zsock_t *pipe, void *args) throw std::runtime_error("INVALID_REQUEST"); } } catch (std::exception &e) { - send_error(client, sender, e.what(), uuid.get()); + send_error(client, sender, action.get(), e.what(), uuid.get()); } } - else { - // How did we get here? - log_error("Abnormal zpoller_wait() return value '%p'", which); + + if ((zclock_mono() > lastTrackingCheck + 1000) && !pendingCommands.empty()) { + treat_pending_commands(nutClient, client, pendingCommands); + + lastTrackingCheck = zclock_mono(); + lastPingPong = zclock_mono(); + } + if (zclock_mono() > lastPingPong + 30000) { + // Keep alive the NUT connection. + nutClient.getDeviceNames(); + lastPingPong = zclock_mono(); + } + + if (!nutClient.isConnected()) { + zstr_send(mlm_client_msgpipe(client), "NUT_CONNECTION_FAILURE"); } } } From ff9581c51578210b4bdb538987cde231617edf98 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Boric Date: Mon, 8 Apr 2019 09:42:44 +0200 Subject: [PATCH 2/2] Nudge CI Signed-off-by: Jean-Baptiste Boric