Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion include/faabric/endpoint/Endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ class Endpoint

Endpoint(int port, int threadCount);

void start();
void start(bool awaitSignal = true);

void stop();

virtual std::shared_ptr<Pistache::Http::Handler> getHandler() = 0;

private:
int port = faabric::util::getSystemConfig().endpointPort;
int threadCount = faabric::util::getSystemConfig().endpointNumThreads;

Pistache::Http::Endpoint httpEndpoint;
};
}
4 changes: 2 additions & 2 deletions include/faabric/endpoint/FaabricEndpointHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class FaabricEndpointHandler : public Pistache::Http::Handler
void onRequest(const Pistache::Http::Request& request,
Pistache::Http::ResponseWriter response) override;

std::string handleFunction(const std::string& requestStr);
std::pair<int, std::string> handleFunction(const std::string& requestStr);

private:
std::string executeFunction(faabric::Message& msg);
std::pair<int, std::string> executeFunction(faabric::Message& msg);
};
}
2 changes: 0 additions & 2 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ class Scheduler

void flushLocally();

std::string getMessageStatus(unsigned int messageId);

void setFunctionResult(faabric::Message& msg);

faabric::Message getFunctionResult(unsigned int messageId, int timeout);
Expand Down
46 changes: 28 additions & 18 deletions src/endpoint/Endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,47 +9,57 @@ namespace faabric::endpoint {
Endpoint::Endpoint(int portIn, int threadCountIn)
: port(portIn)
, threadCount(threadCountIn)
, httpEndpoint(Pistache::Address(Pistache::Ipv4::any(), Pistache::Port(port)))
{}

void Endpoint::start()
void Endpoint::start(bool awaitSignal)
{
SPDLOG_INFO("Starting HTTP endpoint");
SPDLOG_INFO("Starting HTTP endpoint on {}", port);

// Set up signal handler
sigset_t signals;
if (sigemptyset(&signals) != 0 || sigaddset(&signals, SIGTERM) != 0 ||
sigaddset(&signals, SIGKILL) != 0 || sigaddset(&signals, SIGINT) != 0 ||
sigaddset(&signals, SIGHUP) != 0 || sigaddset(&signals, SIGQUIT) != 0 ||
pthread_sigmask(SIG_BLOCK, &signals, nullptr) != 0) {
if (awaitSignal) {
if (sigemptyset(&signals) != 0 || sigaddset(&signals, SIGTERM) != 0 ||
sigaddset(&signals, SIGKILL) != 0 ||
sigaddset(&signals, SIGINT) != 0 ||
sigaddset(&signals, SIGHUP) != 0 ||
sigaddset(&signals, SIGQUIT) != 0 ||
pthread_sigmask(SIG_BLOCK, &signals, nullptr) != 0) {

throw std::runtime_error("Install signal handler failed");
throw std::runtime_error("Install signal handler failed");
}
}

Pistache::Address addr(Pistache::Ipv4::any(), Pistache::Port(this->port));

// Configure endpoint
auto opts = Pistache::Http::Endpoint::options()
.threads(threadCount)
.backlog(256)
.flags(Pistache::Tcp::Options::ReuseAddr);

Pistache::Http::Endpoint httpEndpoint(addr);
httpEndpoint.init(opts);

// Configure and start endpoint
httpEndpoint.setHandler(this->getHandler());
httpEndpoint.serveThreaded();

// Wait for a signal
SPDLOG_INFO("Awaiting signal");
int signal = 0;
int status = sigwait(&signals, &signal);
if (status == 0) {
SPDLOG_INFO("Received signal: {}", signal);
} else {
SPDLOG_INFO("Sigwait return value: {}", signal);
if (awaitSignal) {
// Wait for a signal
SPDLOG_INFO("Awaiting signal");
int signal = 0;
int status = sigwait(&signals, &signal);
if (status == 0) {
SPDLOG_INFO("Received signal: {}", signal);
} else {
SPDLOG_INFO("Sigwait return value: {}", signal);
}

httpEndpoint.shutdown();
}
}

void Endpoint::stop()
{
SPDLOG_INFO("Shutting down endpoint on {}", port);
httpEndpoint.shutdown();
}
}
76 changes: 49 additions & 27 deletions src/endpoint/FaabricEndpointHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,50 +40,70 @@ void FaabricEndpointHandler::onRequest(const Pistache::Http::Request& request,

// Parse message from JSON in request
const std::string requestStr = request.body();
std::string responseStr = handleFunction(requestStr);
std::pair<int, std::string> result = handleFunction(requestStr);

PROF_END(endpointRoundTrip)
response.send(Pistache::Http::Code::Ok, responseStr);
Pistache::Http::Code responseCode = Pistache::Http::Code::Ok;
if (result.first > 0) {
responseCode = Pistache::Http::Code::Internal_Server_Error;
}
response.send(responseCode, result.second);
}

std::string FaabricEndpointHandler::handleFunction(
std::pair<int, std::string> FaabricEndpointHandler::handleFunction(
const std::string& requestStr)
{
std::string responseStr;
std::pair<int, std::string> response;
if (requestStr.empty()) {
responseStr = "Empty request";
SPDLOG_ERROR("Faabric handler received empty request");
response = std::make_pair(1, "Empty request");
} else {
faabric::Message msg = faabric::util::jsonToMessage(requestStr);
faabric::scheduler::Scheduler& sched =
faabric::scheduler::getScheduler();

if (msg.isstatusrequest()) {
responseStr = sched.getMessageStatus(msg.id());
SPDLOG_DEBUG("Processing status request");
const faabric::Message result =
sched.getFunctionResult(msg.id(), 0);

if (result.type() == faabric::Message_MessageType_EMPTY) {
response = std::make_pair(0, "RUNNING");
} else if (result.returnvalue() == 0) {
response = std::make_pair(0, "SUCCESS: " + result.outputdata());
} else {
response = std::make_pair(1, "FAILED: " + result.outputdata());
}
} else if (msg.isexecgraphrequest()) {
SPDLOG_DEBUG("Processing execution graph request");
faabric::scheduler::ExecGraph execGraph =
sched.getFunctionExecGraph(msg.id());
responseStr = faabric::scheduler::execGraphToJson(execGraph);
response =
std::make_pair(0, faabric::scheduler::execGraphToJson(execGraph));

} else if (msg.type() == faabric::Message_MessageType_FLUSH) {
SPDLOG_DEBUG("Broadcasting flush request");
sched.broadcastFlush();
response = std::make_pair(0, "Flush sent");
} else {
responseStr = executeFunction(msg);
response = executeFunction(msg);
}
}

return responseStr;
return response;
}

std::string FaabricEndpointHandler::executeFunction(faabric::Message& msg)
std::pair<int, std::string> FaabricEndpointHandler::executeFunction(
faabric::Message& msg)
{
faabric::util::SystemConfig& conf = faabric::util::getSystemConfig();

if (msg.user().empty()) {
return "Empty user";
} else if (msg.function().empty()) {
return "Empty function";
return std::make_pair(1, "Empty user");
}

if (msg.function().empty()) {
return std::make_pair(1, "Empty function");
}

// Set message ID and master host
Expand All @@ -101,23 +121,25 @@ std::string FaabricEndpointHandler::executeFunction(faabric::Message& msg)

// Await result on global bus (may have been executed on a different worker)
if (msg.isasync()) {
return faabric::util::buildAsyncResponse(msg);
} else {
SPDLOG_DEBUG("Worker thread {} awaiting {}", tid, funcStr);
return std::make_pair(0, faabric::util::buildAsyncResponse(msg));
}

try {
const faabric::Message result =
sch.getFunctionResult(msg.id(), conf.globalMessageTimeout);
SPDLOG_DEBUG("Worker thread {} result {}", tid, funcStr);
SPDLOG_DEBUG("Worker thread {} awaiting {}", tid, funcStr);

if (result.sgxresult().empty()) {
return result.outputdata() + "\n";
} else {
return faabric::util::getJsonOutput(result);
}
} catch (faabric::redis::RedisNoResponseException& ex) {
return "No response from function\n";
try {
const faabric::Message result =
sch.getFunctionResult(msg.id(), conf.globalMessageTimeout);
SPDLOG_DEBUG("Worker thread {} result {}", tid, funcStr);

if (result.sgxresult().empty()) {
return std::make_pair(result.returnvalue(),
result.outputdata() + "\n");
}

return std::make_pair(result.returnvalue(),
faabric::util::getJsonOutput(result));
} catch (faabric::redis::RedisNoResponseException& ex) {
return std::make_pair(1, "No response from function\n");
}
}
}
13 changes: 0 additions & 13 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -784,19 +784,6 @@ faabric::Message Scheduler::getFunctionResult(unsigned int messageId,
return msgResult;
}

std::string Scheduler::getMessageStatus(unsigned int messageId)
{
const faabric::Message result = getFunctionResult(messageId, 0);

if (result.type() == faabric::Message_MessageType_EMPTY) {
return "RUNNING";
} else if (result.returnvalue() == 0) {
return "SUCCESS: " + result.outputdata();
} else {
return "FAILED: " + result.outputdata();
}
}

faabric::HostResources Scheduler::getThisHostResources()
{
return thisHostResources;
Expand Down
Loading