diff --git a/ext/include/opentelemetry/ext/http/server/file_http_server.h b/ext/include/opentelemetry/ext/http/server/file_http_server.h new file mode 100644 index 0000000000..b93e545968 --- /dev/null +++ b/ext/include/opentelemetry/ext/http/server/file_http_server.h @@ -0,0 +1,145 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "opentelemetry/ext/http/server/http_server.h" + +namespace HTTP_SERVER_NS +{ + +class FileHttpServer : public HTTP_SERVER_NS::HttpServer +{ +protected: + /** + * Construct the server by initializing the endpoint for serving static files, + * which show up on the web if the user is on the given host:port. Static + * files can be seen relative to the folder where the executable was ran. + */ + FileHttpServer(const std::string &host = "127.0.0.1", int port = 3333) : HttpServer() + { + std::ostringstream os; + os << host << ":" << port; + setServerName(os.str()); + addListeningPort(port); + }; + + /** + * Set the HTTP server to serve static files from the root of host:port. + * Derived HTTP servers should initialize the file endpoint AFTER they + * initialize their own, otherwise everything will be served like a file + * @param server should be an instance of this object + */ + void InitializeFileEndpoint(FileHttpServer &server) { server[root_endpt_] = ServeFile; } + +private: + /** + * Return whether a file is found whose location is searched for relative to + * where the executable was triggered. If the file is valid, fill result with + * the file data/information required to display it on a webpage + * @param name of the file to look for, + * @param resulting file information, necessary for displaying them on a + * webpage + * @returns whether a file was found and result filled with display + * information + */ + bool FileGetSuccess(const std::string &filename, std::vector &result) + { +#ifdef _WIN32 + std::replace(filename.begin(), filename.end(), '/', '\\'); +#endif + std::streampos size; + std::ifstream file(filename, std::ios::in | std::ios::binary | std::ios::ate); + if (file.is_open()) + { + size = file.tellg(); + if (size) + { + result.resize(size); + file.seekg(0, std::ios::beg); + file.read(result.data(), size); + } + file.close(); + return true; + } + return false; + }; + + /** + * Returns the extension of a file + * @param name of the file + * @returns file extension type under HTTP protocol + */ + std::string GetMimeContentType(const std::string &filename) + { + std::string file_ext = filename.substr(filename.find_last_of(".") + 1); + auto file_type = mime_types_.find(file_ext); + return (file_type != mime_types_.end()) ? file_type->second : HTTP_SERVER_NS::CONTENT_TYPE_TEXT; + }; + + /** + * Returns the standardized name of a file by removing backslashes, and + * assuming index.html is the wanted file if a directory is given + * @param name of the file + */ + std::string GetFileName(std::string name) + { + if (name.back() == '/') + { + auto temp = name.substr(0, name.size() - 1); + name = temp; + } + // If filename appears to be a directory, serve the hypothetical index.html + // file there + if (name.find(".") == std::string::npos) + name += "/index.html"; + + return name; + } + + /** + * Sets the response object with the correct file data based on the requested + * file address, or return 404 error if a file isn't found + * @param req is the HTTP request, which we use to figure out the response to + * send + * @param resp is the HTTP response we want to send to the frontend, including + * file data + */ + HTTP_SERVER_NS::HttpRequestCallback ServeFile{ + [&](HTTP_SERVER_NS::HttpRequest const &req, HTTP_SERVER_NS::HttpResponse &resp) { + LOG_INFO("File: %s\n", req.uri.c_str()); + auto f = GetFileName(req.uri); + auto filename = f.c_str() + 1; + + std::vector content; + if (FileGetSuccess(filename, content)) + { + resp.headers[HTTP_SERVER_NS::CONTENT_TYPE] = GetMimeContentType(filename); + resp.body = std::string(content.data(), content.size()); + resp.code = 200; + resp.message = HTTP_SERVER_NS::HttpServer::getDefaultResponseMessage(resp.code); + return resp.code; + } + // Two additional 'special' return codes possible here: + // 0 - proceed to next handler + // -1 - immediately terminate and close connection + resp.headers[HTTP_SERVER_NS::CONTENT_TYPE] = HTTP_SERVER_NS::CONTENT_TYPE_TEXT; + resp.code = 404; + resp.message = HTTP_SERVER_NS::HttpServer::getDefaultResponseMessage(resp.code); + resp.body = resp.message; + return 404; + }}; + + // Maps file extensions to their HTTP-compatible mime file type + const std::unordered_map mime_types_ = { + {"css", "text/css"}, {"png", "image/png"}, {"js", "text/javascript"}, + {"htm", "text/html"}, {"html", "text/html"}, {"json", "application/json"}, + {"txt", "text/plain"}, {"jpg", "image/jpeg"}, {"jpeg", "image/jpeg"}, + }; + const std::string root_endpt_ = "/"; +}; + +} // namespace HTTP_SERVER_NS diff --git a/ext/include/opentelemetry/ext/http/server/http_server.h b/ext/include/opentelemetry/ext/http/server/http_server.h new file mode 100644 index 0000000000..672bf95c6a --- /dev/null +++ b/ext/include/opentelemetry/ext/http/server/http_server.h @@ -0,0 +1,890 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "socket_tools.h" + +#ifdef HAVE_HTTP_DEBUG +# ifdef LOG_TRACE +# undef LOG_TRACE +# define LOG_TRACE(x, ...) printf(x "\n", __VA_ARGS__) +# endif +#endif + +#ifndef HTTP_SERVER_NS +# define HTTP_SERVER_NS testing +#endif + +namespace HTTP_SERVER_NS +{ + +constexpr const char *CONTENT_TYPE = "Content-Type"; +constexpr const char *CONTENT_TYPE_TEXT = "text/plain"; +constexpr const char *CONTENT_TYPE_BIN = "application/octet-stream"; + +struct HttpRequest +{ + std::string client; + std::string method; + std::string uri; + std::string protocol; + std::map headers; + std::string content; +}; + +struct HttpResponse +{ + int code; + std::string message; + std::map headers; + std::string body; +}; + +using CallbackFunction = std::function; + +class HttpRequestCallback +{ +protected: + CallbackFunction callback = nullptr; + +public: + HttpRequestCallback(){}; + + HttpRequestCallback &operator=(HttpRequestCallback other) + { + callback = other.callback; + return *this; + }; + + HttpRequestCallback(CallbackFunction func) : callback(func){}; + + HttpRequestCallback &operator=(CallbackFunction func) + { + callback = func; + return (*this); + } + + virtual int onHttpRequest(HttpRequest const &request, HttpResponse &response) + { + if (callback != nullptr) + { + return callback(request, response); + } + return 0; + }; +}; + +// Simple HTTP server +// Goals: +// - Support enough of HTTP to be used as a mock +// - Be flexible to allow creating various test scenarios +// Out of scope: +// - Performance +// - Full support of RFC 7230-7237 +class HttpServer : private SocketTools::Reactor::SocketCallback +{ +protected: + struct Connection + { + SocketTools::Socket socket; + std::string receiveBuffer; + std::string sendBuffer; + enum + { + Idle, + ReceivingHeaders, + Sending100Continue, + ReceivingBody, + Processing, + SendingHeaders, + SendingBody, + Closing + } state; + size_t contentLength; + bool keepalive; + HttpRequest request; + HttpResponse response; + }; + + std::string m_serverHost; + bool allowKeepalive{true}; + SocketTools::Reactor m_reactor; + std::list m_listeningSockets; + + class HttpRequestHandler : public std::pair + { + public: + HttpRequestHandler(std::string key, HttpRequestCallback *value) + { + first = key; + second = value; + }; + + HttpRequestHandler() : std::pair() + { + first = ""; + second = nullptr; + }; + + HttpRequestHandler &operator=(std::pair other) + { + first = other.first; + second = other.second; + return (*this); + }; + + HttpRequestHandler &operator=(HttpRequestCallback &cb) + { + second = &cb; + return (*this); + }; + + HttpRequestHandler &operator=(HttpRequestCallback *cb) + { + second = cb; + return (*this); + }; + }; + + std::list m_handlers; + + std::map m_connections; + size_t m_maxRequestHeadersSize, m_maxRequestContentSize; + +public: + void setKeepalive(bool keepAlive) { allowKeepalive = keepAlive; } + + HttpServer() + : m_serverHost("unnamed"), + allowKeepalive(true), + m_reactor(*this), + m_maxRequestHeadersSize(8192), + m_maxRequestContentSize(2 * 1024 * 1024){}; + + HttpServer(std::string serverHost, int port = 30000) : HttpServer() + { + std::ostringstream os; + os << serverHost << ":" << port; + setServerName(os.str()); + addListeningPort(port); + }; + + ~HttpServer() + { + for (auto &sock : m_listeningSockets) + { + sock.close(); + } + } + + void setRequestLimits(size_t maxRequestHeadersSize, size_t maxRequestContentSize) + { + m_maxRequestHeadersSize = maxRequestHeadersSize; + m_maxRequestContentSize = maxRequestContentSize; + } + + void setServerName(std::string const &name) { m_serverHost = name; } + + int addListeningPort(int port) + { + SocketTools::Socket socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + socket.setNonBlocking(); + socket.setReuseAddr(); + + SocketTools::SocketAddr addr(0, port); + socket.bind(addr); + socket.getsockname(addr); + + socket.listen(10); + m_listeningSockets.push_back(socket); + m_reactor.addSocket(socket, SocketTools::Reactor::Acceptable); + LOG_INFO("HttpServer: Listening on %s", addr.toString().c_str()); + + return addr.port(); + } + + HttpRequestHandler &addHandler(const std::string &root, HttpRequestCallback &handler) + { + // No thread-safety here! + m_handlers.push_back({root, &handler}); + LOG_INFO("HttpServer: Added handler for %s", root.c_str()); + return m_handlers.back(); + } + + HttpRequestHandler &operator[](const std::string &root) + { + // No thread-safety here! + m_handlers.push_back({root, nullptr}); + LOG_INFO("HttpServer: Added handler for %s", root.c_str()); + return m_handlers.back(); + } + + HttpServer &operator+=(std::pair other) + { + LOG_INFO("HttpServer: Added handler for %s", other.first.c_str()); + m_handlers.push_back(HttpRequestHandler(other.first, &other.second)); + return (*this); + }; + + void start() { m_reactor.start(); } + + void stop() { m_reactor.stop(); } + +protected: + virtual void onSocketAcceptable(SocketTools::Socket socket) override + { + LOG_TRACE("HttpServer: accepting socket fd=0x%llx", socket.m_sock); + assert(std::find(m_listeningSockets.begin(), m_listeningSockets.end(), socket) != + m_listeningSockets.end()); + + SocketTools::Socket csocket; + SocketTools::SocketAddr caddr; + if (socket.accept(csocket, caddr)) + { + csocket.setNonBlocking(); + Connection &conn = m_connections[csocket]; + conn.socket = csocket; + conn.state = Connection::Idle; + conn.request.client = caddr.toString(); + m_reactor.addSocket(csocket, SocketTools::Reactor::Readable | SocketTools::Reactor::Closed); + LOG_TRACE("HttpServer: [%s] accepted", conn.request.client.c_str()); + } + } + + virtual void onSocketReadable(SocketTools::Socket socket) override + { + LOG_TRACE("HttpServer: reading socket fd=0x%llx", socket.m_sock); + // No thread-safety here! + assert(std::find(m_listeningSockets.begin(), m_listeningSockets.end(), socket) == + m_listeningSockets.end()); + + // No thread-safety here! + auto connIt = m_connections.find(socket); + if (connIt == m_connections.end()) + { + return; + } + Connection &conn = connIt->second; + + char buffer[2048] = {0}; + int received = socket.recv(buffer, sizeof(buffer)); + LOG_TRACE("HttpServer: [%s] received %d", conn.request.client.c_str(), received); + if (received <= 0) + { + handleConnectionClosed(conn); + return; + } + conn.receiveBuffer.append(buffer, buffer + received); + + handleConnection(conn); + } + + virtual void onSocketWritable(SocketTools::Socket socket) override + { + LOG_TRACE("HttpServer: writing socket fd=0x%llx", socket.m_sock); + + // No thread-safety here! + assert(std::find(m_listeningSockets.begin(), m_listeningSockets.end(), socket) == + m_listeningSockets.end()); + + // No thread-safety here! + auto connIt = m_connections.find(socket); + if (connIt == m_connections.end()) + { + return; + } + Connection &conn = connIt->second; + + if (!sendMore(conn)) + { + handleConnection(conn); + } + } + + virtual void onSocketClosed(SocketTools::Socket socket) override + { + LOG_TRACE("HttpServer: closing socket fd=0x%llx", socket.m_sock); + assert(std::find(m_listeningSockets.begin(), m_listeningSockets.end(), socket) == + m_listeningSockets.end()); + + auto connIt = m_connections.find(socket); + if (connIt == m_connections.end()) + { + return; + } + Connection &conn = connIt->second; + + handleConnectionClosed(conn); + } + + bool sendMore(Connection &conn) + { + if (conn.sendBuffer.empty()) + { + return false; + } + + int sent = conn.socket.send(conn.sendBuffer.data(), static_cast(conn.sendBuffer.size())); + LOG_TRACE("HttpServer: [%s] sent %d", conn.request.client.c_str(), sent); + if (sent < 0 && conn.socket.error() != SocketTools::Socket::ErrorWouldBlock) + { + return true; + } + conn.sendBuffer.erase(0, sent); + + if (!conn.sendBuffer.empty()) + { + m_reactor.addSocket(conn.socket, + SocketTools::Reactor::Writable | SocketTools::Reactor::Closed); + return true; + } + + return false; + } + +protected: + void handleConnectionClosed(Connection &conn) + { + LOG_TRACE("HttpServer: [%s] closed", conn.request.client.c_str()); + if (conn.state != Connection::Idle && conn.state != Connection::Closing) + { + LOG_WARN("HttpServer: [%s] connection closed unexpectedly", conn.request.client.c_str()); + } + m_reactor.removeSocket(conn.socket); + auto connIt = m_connections.find(conn.socket); + conn.socket.close(); + m_connections.erase(connIt); + } + + void handleConnection(Connection &conn) + { + for (;;) + { + if (conn.state == Connection::Idle) + { + conn.response.code = 0; + conn.state = Connection::ReceivingHeaders; + LOG_TRACE("HttpServer: [%s] receiving headers", conn.request.client.c_str()); + } + + if (conn.state == Connection::ReceivingHeaders) + { + bool lfOnly = false; + size_t ofs = conn.receiveBuffer.find("\r\n\r\n"); + if (ofs == std::string::npos) + { + lfOnly = true; + ofs = conn.receiveBuffer.find("\n\n"); + } + size_t headersLen = (ofs != std::string::npos) ? ofs : conn.receiveBuffer.length(); + if (headersLen > m_maxRequestHeadersSize) + { + LOG_WARN("HttpServer: [%s] headers too long - %u", conn.request.client.c_str(), + static_cast(headersLen)); + conn.response.code = 431; // Request Header Fields Too Large + conn.keepalive = false; + conn.state = Connection::Processing; + continue; + } + if (ofs == std::string::npos) + { + return; + } + + if (!parseHeaders(conn)) + { + LOG_WARN("HttpServer: [%s] invalid headers", conn.request.client.c_str()); + conn.response.code = 400; // Bad Request + conn.keepalive = false; + conn.state = Connection::Processing; + continue; + } + LOG_INFO("HttpServer: [%s] %s %s %s", conn.request.client.c_str(), + conn.request.method.c_str(), conn.request.uri.c_str(), + conn.request.protocol.c_str()); + conn.receiveBuffer.erase(0, ofs + (lfOnly ? 2 : 4)); + + conn.keepalive = (conn.request.protocol == "HTTP/1.1"); + auto const connection = conn.request.headers.find("Connection"); + if (connection != conn.request.headers.end()) + { + if (equalsLowercased(connection->second, "keep-alive")) + { + conn.keepalive = true; + } + else if (equalsLowercased(connection->second, "close")) + { + conn.keepalive = false; + } + } + + auto const contentLength = conn.request.headers.find("Content-Length"); + if (contentLength != conn.request.headers.end()) + { + conn.contentLength = atoi(contentLength->second.c_str()); + } + else + { + conn.contentLength = 0; + } + if (conn.contentLength > m_maxRequestContentSize) + { + LOG_WARN("HttpServer: [%s] content too long - %u", conn.request.client.c_str(), + static_cast(conn.contentLength)); + conn.response.code = 413; // Payload Too Large + conn.keepalive = false; + conn.state = Connection::Processing; + continue; + } + + auto const expect = conn.request.headers.find("Expect"); + if (expect != conn.request.headers.end() && conn.request.protocol == "HTTP/1.1") + { + if (!equalsLowercased(expect->second, "100-continue")) + { + LOG_WARN("HttpServer: [%s] unknown expectation - %s", conn.request.client.c_str(), + expect->second.c_str()); + conn.response.code = 417; // Expectation Failed + conn.keepalive = false; + conn.state = Connection::Processing; + continue; + } + conn.sendBuffer = "HTTP/1.1 100 Continue\r\n\r\n"; + conn.state = Connection::Sending100Continue; + LOG_TRACE("HttpServer: [%s] sending \"100 Continue\"", conn.request.client.c_str()); + continue; + } + + conn.state = Connection::ReceivingBody; + LOG_TRACE("HttpServer: [%s] receiving body", conn.request.client.c_str()); + } + + if (conn.state == Connection::Sending100Continue) + { + if (sendMore(conn)) + { + return; + } + + conn.state = Connection::ReceivingBody; + LOG_TRACE("HttpServer: [%s] receiving body", conn.request.client.c_str()); + } + + if (conn.state == Connection::ReceivingBody) + { + if (conn.receiveBuffer.length() < conn.contentLength) + { + return; + } + + if (conn.receiveBuffer.length() == conn.contentLength) + { + conn.request.content = std::move(conn.receiveBuffer); + conn.receiveBuffer.clear(); + } + else + { + conn.request.content.assign(conn.receiveBuffer, 0, conn.contentLength); + conn.receiveBuffer.erase(0, conn.contentLength); + } + + conn.state = Connection::Processing; + LOG_TRACE("HttpServer: [%s] processing request", conn.request.client.c_str()); + } + + if (conn.state == Connection::Processing) + { + processRequest(conn); + + std::ostringstream os; + os << conn.request.protocol << ' ' << conn.response.code << ' ' << conn.response.message + << "\r\n"; + for (auto const &header : conn.response.headers) + { + os << header.first << ": " << header.second << "\r\n"; + } + os << "\r\n"; + + conn.sendBuffer = os.str(); + conn.state = Connection::SendingHeaders; + LOG_TRACE("HttpServer: [%s] sending headers", conn.request.client.c_str()); + } + + if (conn.state == Connection::SendingHeaders) + { + if (sendMore(conn)) + { + return; + } + + conn.sendBuffer = std::move(conn.response.body); + conn.state = Connection::SendingBody; + LOG_TRACE("HttpServer: [%s] sending body", conn.request.client.c_str()); + } + + if (conn.state == Connection::SendingBody) + { + if (sendMore(conn)) + { + return; + } + + conn.keepalive &= allowKeepalive; + + if (conn.keepalive) + { + m_reactor.addSocket(conn.socket, + SocketTools::Reactor::Readable | SocketTools::Reactor::Closed); + conn.state = Connection::Idle; + LOG_TRACE("HttpServer: [%s] idle (keep-alive)", conn.request.client.c_str()); + if (conn.receiveBuffer.empty()) + { + return; + } + } + else + { + conn.socket.shutdown(SocketTools::Socket::ShutdownSend); + m_reactor.addSocket(conn.socket, SocketTools::Reactor::Closed); + conn.state = Connection::Closing; + LOG_TRACE("HttpServer: [%s] closing", conn.request.client.c_str()); + } + } + + if (conn.state == Connection::Closing) + { + return; + } + } + } + + bool parseHeaders(Connection &conn) + { + // Method + char const *begin = conn.receiveBuffer.c_str(); + char const *ptr = begin; + while (*ptr && *ptr != ' ' && *ptr != '\r' && *ptr != '\n') + { + ptr++; + } + if (*ptr != ' ') + { + return false; + } + conn.request.method.assign(begin, ptr); + while (*ptr == ' ') + { + ptr++; + } + + // URI + begin = ptr; + while (*ptr && *ptr != ' ' && *ptr != '\r' && *ptr != '\n') + { + ptr++; + } + if (*ptr != ' ') + { + return false; + } + conn.request.uri.assign(begin, ptr); + while (*ptr == ' ') + { + ptr++; + } + + // Protocol + begin = ptr; + while (*ptr && *ptr != ' ' && *ptr != '\r' && *ptr != '\n') + { + ptr++; + } + if (*ptr != '\r' && *ptr != '\n') + { + return false; + } + conn.request.protocol.assign(begin, ptr); + if (*ptr == '\r') + { + ptr++; + } + if (*ptr != '\n') + { + return false; + } + ptr++; + + // Headers + conn.request.headers.clear(); + while (*ptr != '\r' && *ptr != '\n') + { + // Name + begin = ptr; + while (*ptr && *ptr != ':' && *ptr != ' ' && *ptr != '\r' && *ptr != '\n') + { + ptr++; + } + if (*ptr != ':') + { + return false; + } + std::string name = normalizeHeaderName(begin, ptr); + ptr++; + while (*ptr == ' ') + { + ptr++; + } + + // Value + begin = ptr; + while (*ptr && *ptr != '\r' && *ptr != '\n') + { + ptr++; + } + conn.request.headers[name] = std::string(begin, ptr); + if (*ptr == '\r') + { + ptr++; + } + if (*ptr != '\n') + { + return false; + } + ptr++; + } + + if (*ptr == '\r') + { + ptr++; + } + if (*ptr != '\n') + { + return false; + } + ptr++; + + return true; + } + + static bool equalsLowercased(std::string const &str, char const *mask) + { + char const *ptr = str.c_str(); + while (*ptr && *mask && ::tolower(*ptr) == *mask) + { + ptr++; + mask++; + } + return !*ptr && !*mask; + } + + static std::string normalizeHeaderName(char const *begin, char const *end) + { + std::string result(begin, end); + bool first = true; + for (char &ch : result) + { + if (first) + { + ch = static_cast(::toupper(ch)); + first = false; + } + else if (ch == '-') + { + first = true; + } + else + { + ch = static_cast(::tolower(ch)); + } + } + return result; + } + + void processRequest(Connection &conn) + { + conn.response.message.clear(); + conn.response.headers.clear(); + conn.response.body.clear(); + + if (conn.response.code == 0) + { + conn.response.code = 404; // Not Found + for (auto &handler : m_handlers) + { + if (conn.request.uri.length() >= handler.first.length() && + strncmp(conn.request.uri.c_str(), handler.first.c_str(), handler.first.length()) == 0) + { + LOG_TRACE("HttpServer: [%s] using handler for %s", conn.request.client.c_str(), + handler.first.c_str()); + // auto callback = handler.second; // Bazel gets mad at this unused + // var, uncomment when using + int result = handler.second->onHttpRequest(conn.request, conn.response); + if (result != 0) + { + conn.response.code = result; + break; + } + } + } + + if (conn.response.code == -1) + { + LOG_TRACE("HttpServer: [%s] closing by request", conn.request.client.c_str()); + handleConnectionClosed(conn); + } + } + + if (conn.response.message.empty()) + { + conn.response.message = getDefaultResponseMessage(conn.response.code); + } + + conn.response.headers["Host"] = m_serverHost; + conn.response.headers["Connection"] = (conn.keepalive ? "keep-alive" : "close"); + conn.response.headers["Date"] = formatTimestamp(time(nullptr)); + conn.response.headers["Content-Length"] = std::to_string(conn.response.body.size()); + } + + static std::string formatTimestamp(time_t time) + { + tm tm; +#ifdef _WIN32 + gmtime_s(&tm, &time); +#else + gmtime_r(&time, &tm); +#endif + char buf[32]; + strftime(buf, sizeof(buf), "%a, %d %b %Y %H:%M:%S GMT", &tm); + return buf; + } + +public: + static char const *getDefaultResponseMessage(int code) + { + switch (code) + { + // *INDENT-OFF* + case 100: + return "Continue"; + case 101: + return "Switching Protocols"; + case 200: + return "OK"; + case 201: + return "Created"; + case 202: + return "Accepted"; + case 203: + return "Non-Authoritative Information"; + case 204: + return "No Content"; + case 205: + return "Reset Content"; + case 206: + return "Partial Content"; + case 300: + return "Multiple Choices"; + case 301: + return "Moved Permanently"; + case 302: + return "Found"; + case 303: + return "See Other"; + case 304: + return "Not Modified"; + case 305: + return "Use Proxy"; + case 306: + return "Switch Proxy"; + case 307: + return "Temporary Redirect"; + case 308: + return "Permanent Redirect"; + case 400: + return "Bad Request"; + case 401: + return "Unauthorized"; + case 402: + return "Payment Required"; + case 403: + return "Forbidden"; + case 404: + return "Not Found"; + case 405: + return "Method Not Allowed"; + case 406: + return "Not Acceptable"; + case 407: + return "Proxy Authentication Required"; + case 408: + return "Request Timeout"; + case 409: + return "Conflict"; + case 410: + return "Gone"; + case 411: + return "Length Required"; + case 412: + return "Precondition Failed"; + case 413: + return "Payload Too Large"; + case 414: + return "URI Too Long"; + case 415: + return "Unsupported Media Type"; + case 416: + return "Range Not Satisfiable"; + case 417: + return "Expectation Failed"; + case 421: + return "Misdirected Request"; + case 426: + return "Upgrade Required"; + case 428: + return "Precondition Required"; + case 429: + return "Too Many Requests"; + case 431: + return "Request Header Fields Too Large"; + case 500: + return "Internal Server Error"; + case 501: + return "Not Implemented"; + case 502: + return "Bad Gateway"; + case 503: + return "Service Unavailable"; + case 504: + return "Gateway Timeout"; + case 505: + return "HTTP Version Not Supported"; + case 506: + return "Variant Also Negotiates"; + case 510: + return "Not Extended"; + case 511: + return "Network Authentication Required"; + default: + return "???"; + // *INDENT-ON* + } + } +}; + +} // namespace HTTP_SERVER_NS diff --git a/ext/include/opentelemetry/ext/http/server/socket_tools.h b/ext/include/opentelemetry/ext/http/server/socket_tools.h new file mode 100644 index 0000000000..e980b78940 --- /dev/null +++ b/ext/include/opentelemetry/ext/http/server/socket_tools.h @@ -0,0 +1,861 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef _WIN32 + +//# include + +# include + +// TODO: consider NOMINMAX +# undef min +# undef max +# pragma comment(lib, "ws2_32.lib") + +#else + +# include + +# ifdef __linux__ +# include +# endif + +# if __APPLE__ +# include "TargetConditionals.h" +// Use kqueue on mac +# include +# include +# include +# endif + +// Common POSIX headers for Linux and Mac OS X +# include +# include +# include +# include +# include +# include + +#endif + +#ifndef _Out_cap_ +# define _Out_cap_(size) +#endif + +#if defined(HAVE_CONSOLE_LOG) && !defined(LOG_DEBUG) +// Log to console if there's no standard log facility defined +# include +# ifndef LOG_DEBUG +# define LOG_DEBUG(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +# define LOG_TRACE(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +# define LOG_INFO(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +# define LOG_WARN(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +# define LOG_ERROR(fmt_, ...) printf(" " fmt_ "\n", ##__VA_ARGS__) +# endif +#endif + +#ifndef LOG_DEBUG +// Don't log anything if there's no standard log facility defined +# define LOG_DEBUG(fmt_, ...) +# define LOG_TRACE(fmt_, ...) +# define LOG_INFO(fmt_, ...) +# define LOG_WARN(fmt_, ...) +# define LOG_ERROR(fmt_, ...) +#endif + +namespace common +{ + +/// +/// A simple thread, derived class overloads onThread() method. +/// +struct Thread +{ + std::thread m_thread; + + volatile bool m_terminate{false}; + + /// + /// Thread Constructor + /// + /// Thread + Thread() {} + + /// + /// Start Thread + /// + void startThread() + { + m_terminate = false; + m_thread = std::thread([&]() { this->onThread(); }); + } + + /// + /// Join Thread + /// + void joinThread() + { + m_terminate = true; + if (m_thread.joinable()) + { + m_thread.join(); + } + } + + /// + /// Indicates if this thread should terminate + /// + /// + bool shouldTerminate() const { return m_terminate; } + + /// + /// Must be implemented by children + /// + virtual void onThread() = 0; + + /// + /// Thread destructor + /// + /// + virtual ~Thread() noexcept {} +}; + +}; // namespace common +namespace SocketTools +{ + +#ifdef _WIN32 +// WinSocks need extra (de)initialization, solved by a global object here, +// whose constructor/destructor will be called before and after main(). +struct WsaInitializer +{ + WsaInitializer() + { + WSADATA wsaData; + WSAStartup(MAKEWORD(2, 2), &wsaData); + } + + ~WsaInitializer() { WSACleanup(); } +}; + +static WsaInitializer g_wsaInitializer; + +#endif + +/// +/// Encapsulation of sockaddr(_in) +/// +struct SocketAddr +{ + static u_long const Loopback = 0x7F000001; + + sockaddr m_data; + + /// + /// SocketAddr constructor + /// + /// SocketAddr + SocketAddr() { memset(&m_data, 0, sizeof(m_data)); } + + SocketAddr(u_long addr, int port) + { + sockaddr_in &inet4 = reinterpret_cast(m_data); + inet4.sin_family = AF_INET; + inet4.sin_port = htons(static_cast(port)); + inet4.sin_addr.s_addr = htonl(addr); + } + + SocketAddr(char const *addr) + { +#ifdef _WIN32 + INT addrlen = sizeof(m_data); + WCHAR buf[200]; + for (int i = 0; i < sizeof(buf) && addr[i]; i++) + { + buf[i] = addr[i]; + } + buf[199] = L'\0'; + ::WSAStringToAddressW(buf, AF_INET, nullptr, &m_data, &addrlen); +#else + sockaddr_in &inet4 = reinterpret_cast(m_data); + inet4.sin_family = AF_INET; + char const *colon = strchr(addr, ':'); + if (colon) + { + inet4.sin_port = htons(atoi(colon + 1)); + char buf[16]; + memcpy(buf, addr, std::min(15, colon - addr)); + buf[15] = '\0'; + ::inet_pton(AF_INET, buf, &inet4.sin_addr); + } + else + { + inet4.sin_port = 0; + ::inet_pton(AF_INET, addr, &inet4.sin_addr); + } +#endif + } + + SocketAddr(SocketAddr const &other) = default; + + SocketAddr &operator=(SocketAddr const &other) = default; + + operator sockaddr *() { return &m_data; } + + operator const sockaddr *() const { return &m_data; } + + int port() const + { + switch (m_data.sa_family) + { + case AF_INET: + { + sockaddr_in const &inet4 = reinterpret_cast(m_data); + return ntohs(inet4.sin_port); + } + + default: + return -1; + } + } + + std::string toString() const + { + std::ostringstream os; + + switch (m_data.sa_family) + { + case AF_INET: + { + sockaddr_in const &inet4 = reinterpret_cast(m_data); + u_long addr = ntohl(inet4.sin_addr.s_addr); + os << (addr >> 24) << '.' << ((addr >> 16) & 255) << '.' << ((addr >> 8) & 255) << '.' + << (addr & 255); + os << ':' << ntohs(inet4.sin_port); + break; + } + + default: + os << "[?AF?" << m_data.sa_family << ']'; + } + return os.str(); + } +}; + +/// +/// Encapsulation of a socket (non-exclusive ownership) +/// +struct Socket +{ +#ifdef _WIN32 + typedef SOCKET Type; + static Type const Invalid = INVALID_SOCKET; +#else + typedef int Type; + static Type const Invalid = -1; +#endif + + Type m_sock; + + Socket(Type sock = Invalid) : m_sock(sock) {} + + Socket(int af, int type, int proto) { m_sock = ::socket(af, type, proto); } + + ~Socket() {} + + operator Socket::Type() const { return m_sock; } + + bool operator==(Socket const &other) const { return (m_sock == other.m_sock); } + + bool operator!=(Socket const &other) const { return (m_sock != other.m_sock); } + + bool operator<(Socket const &other) const { return (m_sock < other.m_sock); } + + bool invalid() const { return (m_sock == Invalid); } + + void setNonBlocking() + { + assert(m_sock != Invalid); +#ifdef _WIN32 + u_long value = 1; + ::ioctlsocket(m_sock, FIONBIO, &value); +#else + int flags = ::fcntl(m_sock, F_GETFL, 0); + ::fcntl(m_sock, F_SETFL, flags | O_NONBLOCK); +#endif + } + + bool setReuseAddr() + { + assert(m_sock != Invalid); +#ifdef _WIN32 + BOOL value = TRUE; +#else + int value = 1; +#endif + return (::setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast(&value), + sizeof(value)) == 0); + } + + bool setNoDelay() + { + assert(m_sock != Invalid); +#ifdef _WIN32 + BOOL value = TRUE; +#else + int value = 1; +#endif + return (::setsockopt(m_sock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast(&value), + sizeof(value)) == 0); + } + + bool connect(SocketAddr const &addr) + { + assert(m_sock != Invalid); + return (::connect(m_sock, addr, sizeof(addr)) == 0); + } + + void close() + { + assert(m_sock != Invalid); +#ifdef _WIN32 + ::closesocket(m_sock); +#else + ::close(m_sock); +#endif + m_sock = Invalid; + } + + int recv(_Out_cap_(size) void *buffer, unsigned size) + { + assert(m_sock != Invalid); + int flags = 0; + return static_cast(::recv(m_sock, reinterpret_cast(buffer), size, flags)); + } + + int send(void const *buffer, unsigned size) + { + assert(m_sock != Invalid); + return static_cast(::send(m_sock, reinterpret_cast(buffer), size, 0)); + } + + bool bind(SocketAddr const &addr) + { + assert(m_sock != Invalid); + return (::bind(m_sock, addr, sizeof(addr)) == 0); + } + + bool getsockname(SocketAddr &addr) const + { + assert(m_sock != Invalid); +#ifdef _WIN32 + int addrlen = sizeof(addr); +#else + socklen_t addrlen = sizeof(addr); +#endif + return (::getsockname(m_sock, addr, &addrlen) == 0); + } + + bool listen(int backlog) + { + assert(m_sock != Invalid); + return (::listen(m_sock, backlog) == 0); + } + + bool accept(Socket &csock, SocketAddr &caddr) + { + assert(m_sock != Invalid); +#ifdef _WIN32 + int addrlen = sizeof(caddr); +#else + socklen_t addrlen = sizeof(caddr); +#endif + csock = ::accept(m_sock, caddr, &addrlen); + return !csock.invalid(); + } + + bool shutdown(int how) + { + assert(m_sock != Invalid); + return (::shutdown(m_sock, how) == 0); + } + + int error() const + { +#ifdef _WIN32 + return ::WSAGetLastError(); +#else + return errno; +#endif + } + + enum + { +#ifdef _WIN32 + ErrorWouldBlock = WSAEWOULDBLOCK +#else + ErrorWouldBlock = EWOULDBLOCK +#endif + }; + + enum + { +#ifdef _WIN32 + ShutdownReceive = SD_RECEIVE, + ShutdownSend = SD_SEND, + ShutdownBoth = SD_BOTH +#else + ShutdownReceive = SHUT_RD, + ShutdownSend = SHUT_WR, + ShutdownBoth = SHUT_RDWR +#endif + }; +}; + +/// +/// Socket Data +/// +struct SocketData +{ + Socket socket; + int flags; + + SocketData() : socket(), flags(0) {} + + bool operator==(Socket s) { return (socket == s); } +}; + +/// +/// Socket Reactor +/// +struct Reactor : protected common::Thread +{ + /// + /// Socket State callback + /// + class SocketCallback + { + public: + virtual void onSocketReadable(Socket sock) = 0; + virtual void onSocketWritable(Socket sock) = 0; + virtual void onSocketAcceptable(Socket sock) = 0; + virtual void onSocketClosed(Socket sock) = 0; + }; + + /// + /// Socket State + /// + enum State + { + Readable = 1, + Writable = 2, + Acceptable = 4, + Closed = 8 + }; + + SocketCallback &m_callback; + + std::vector m_sockets; + +#ifdef _WIN32 + /* use WinSock events on Windows */ + std::vector m_events{}; +#endif + +#ifdef __linux__ + /* use epoll on Linux */ + int m_epollFd; +#endif + +#ifdef TARGET_OS_MAC + /* use kqueue on Mac */ +# define KQUEUE_SIZE 32 + int kq{0}; + struct kevent m_events[KQUEUE_SIZE]; +#endif + +public: + Reactor(SocketCallback &callback) : m_callback(callback) + { +#ifdef __linux__ +# ifdef ANDROID + m_epollFd = ::epoll_create(0); +# else + m_epollFd = ::epoll_create1(0); +# endif +#endif + +#ifdef TARGET_OS_MAC + bzero(&m_events[0], sizeof(m_events)); + kq = kqueue(); +#endif + } + + ~Reactor() + { +#ifdef __linux__ + ::close(m_epollFd); +#endif +#ifdef TARGET_OS_MAC + ::close(kq); +#endif + } + + /// + /// Add Socket + /// + /// + /// + void addSocket(const Socket &socket, int flags) + { + if (flags == 0) + { + removeSocket(socket); + } + else + { + auto it = std::find(m_sockets.begin(), m_sockets.end(), socket); + if (it == m_sockets.end()) + { + LOG_TRACE("Reactor: Adding socket 0x%x with flags 0x%x", static_cast(socket), flags); +#ifdef _WIN32 + m_events.push_back(::WSACreateEvent()); +#endif +#ifdef __linux__ + epoll_event event = {}; + event.data.fd = socket; + event.events = 0; + ::epoll_ctl(m_epollFd, EPOLL_CTL_ADD, socket, &event); +#endif +#ifdef TARGET_OS_MAC + struct kevent event; + bzero(&event, sizeof(event)); + event.ident = socket.m_sock; + EV_SET(&event, event.ident, EVFILT_READ, EV_ADD, 0, 0, NULL); + kevent(kq, &event, 1, NULL, 0, NULL); + EV_SET(&event, event.ident, EVFILT_WRITE, EV_ADD, 0, 0, NULL); + kevent(kq, &event, 1, NULL, 0, NULL); +#endif + m_sockets.push_back(SocketData()); + m_sockets.back().socket = socket; + m_sockets.back().flags = 0; + it = m_sockets.end() - 1; + } + else + { + LOG_TRACE("Reactor: Updating socket 0x%x with flags 0x%x", static_cast(socket), flags); + } + + if (it->flags != flags) + { + it->flags = flags; +#ifdef _WIN32 + long lNetworkEvents = 0; + if (it->flags & Readable) + { + lNetworkEvents |= FD_READ; + } + if (it->flags & Writable) + { + lNetworkEvents |= FD_WRITE; + } + if (it->flags & Acceptable) + { + lNetworkEvents |= FD_ACCEPT; + } + if (it->flags & Closed) + { + lNetworkEvents |= FD_CLOSE; + } + auto eventIt = m_events.begin() + std::distance(m_sockets.begin(), it); + ::WSAEventSelect(socket, *eventIt, lNetworkEvents); +#endif +#ifdef __linux__ + int events = 0; + if (it->flags & Readable) + { + events |= EPOLLIN; + } + if (it->flags & Writable) + { + events |= EPOLLOUT; + } + if (it->flags & Acceptable) + { + events |= EPOLLIN; + } + // if (it->flags & Closed) - always handled (EPOLLERR | EPOLLHUP) + epoll_event event = {}; + event.data.fd = socket; + event.events = events; + ::epoll_ctl(m_epollFd, EPOLL_CTL_MOD, socket, &event); +#endif +#ifdef TARGET_OS_MAC + // TODO: [MG] - Mac OS X socket doesn't currently support updating flags +#endif + } + } + } + + /// + /// Remove Socket + /// + /// + void removeSocket(const Socket &socket) + { + LOG_TRACE("Reactor: Removing socket 0x%x", static_cast(socket)); + auto it = std::find(m_sockets.begin(), m_sockets.end(), socket); + if (it != m_sockets.end()) + { +#ifdef _WIN32 + auto eventIt = m_events.begin() + std::distance(m_sockets.begin(), it); + ::WSAEventSelect(it->socket, *eventIt, 0); + ::WSACloseEvent(*eventIt); + m_events.erase(eventIt); +#endif +#ifdef __linux__ + ::epoll_ctl(m_epollFd, EPOLL_CTL_DEL, socket, nullptr); +#endif +#ifdef TARGET_OS_MAC + struct kevent event; + bzero(&event, sizeof(event)); + event.ident = socket; + EV_SET(&event, socket, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) + { + //// Already removed? + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + EV_SET(&event, socket, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) + { + //// Already removed? + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } +#endif + m_sockets.erase(it); + } + } + + /// + /// Start server + /// + void start() + { + LOG_INFO("Reactor: Starting..."); + startThread(); + } + + /// + /// Stop server + /// + void stop() + { + LOG_INFO("Reactor: Stopping..."); + joinThread(); +#ifdef _WIN32 + for (auto &hEvent : m_events) + { + ::WSACloseEvent(hEvent); + } +#else /* Linux and Mac */ + for (auto &sd : m_sockets) + { +# ifdef __linux__ + ::epoll_ctl(m_epollFd, EPOLL_CTL_DEL, sd.socket, nullptr); +# endif +# ifdef TARGET_OS_MAC + struct kevent event; + bzero(&event, sizeof(event)); + event.ident = sd.socket; + EV_SET(&event, sd.socket, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) + { + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + EV_SET(&event, sd.socket, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &event, 1, NULL, 0, NULL)) + { + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } +# endif + } +#endif + m_sockets.clear(); + } + + /// + /// Thread Loop for async events processing + /// + virtual void onThread() override + { + LOG_INFO("Reactor: Thread started"); + while (!shouldTerminate()) + { +#ifdef _WIN32 + DWORD dwResult = ::WSAWaitForMultipleEvents(static_cast(m_events.size()), + m_events.data(), FALSE, 500, FALSE); + if (dwResult == WSA_WAIT_TIMEOUT) + { + continue; + } + + assert(dwResult <= WSA_WAIT_EVENT_0 + m_events.size()); + int index = dwResult - WSA_WAIT_EVENT_0; + Socket socket = m_sockets[index].socket; + int flags = m_sockets[index].flags; + + WSANETWORKEVENTS ne; + ::WSAEnumNetworkEvents(socket, m_events[index], &ne); + LOG_TRACE( + "Reactor: Handling socket 0x%x (index %d) with active flags 0x%x " + "(armed 0x%x)", + static_cast(socket), index, ne.lNetworkEvents, flags); + + if ((flags & Readable) && (ne.lNetworkEvents & FD_READ)) + { + m_callback.onSocketReadable(socket); + } + if ((flags & Writable) && (ne.lNetworkEvents & FD_WRITE)) + { + m_callback.onSocketWritable(socket); + } + if ((flags & Acceptable) && (ne.lNetworkEvents & FD_ACCEPT)) + { + m_callback.onSocketAcceptable(socket); + } + if ((flags & Closed) && (ne.lNetworkEvents & FD_CLOSE)) + { + m_callback.onSocketClosed(socket); + } +#endif + +#ifdef __linux__ + epoll_event events[4]; + int result = ::epoll_wait(m_epollFd, events, sizeof(events) / sizeof(events[0]), 500); + if (result == 0 || (result == -1 && errno == EINTR)) + { + continue; + } + + assert(result >= 1 && static_cast(result) <= sizeof(events) / sizeof(events[0])); + for (int i = 0; i < result; i++) + { + auto it = std::find(m_sockets.begin(), m_sockets.end(), events[i].data.fd); + assert(it != m_sockets.end()); + Socket socket = it->socket; + int flags = it->flags; + + LOG_TRACE("Reactor: Handling socket 0x%x active flags 0x%x (armed 0x%x)", + static_cast(socket), events[i].events, flags); + + if ((flags & Readable) && (events[i].events & EPOLLIN)) + { + m_callback.onSocketReadable(socket); + } + if ((flags & Writable) && (events[i].events & EPOLLOUT)) + { + m_callback.onSocketWritable(socket); + } + if ((flags & Acceptable) && (events[i].events & EPOLLIN)) + { + m_callback.onSocketAcceptable(socket); + } + if ((flags & Closed) && (events[i].events & (EPOLLHUP | EPOLLERR))) + { + m_callback.onSocketClosed(socket); + } + } +#endif + +#if defined(TARGET_OS_MAC) + unsigned waitms = 500; // never block for more than 500ms + struct timespec timeout; + timeout.tv_sec = waitms / 1000; + timeout.tv_nsec = (waitms % 1000) * 1000 * 1000; + + int nev = kevent(kq, NULL, 0, m_events, KQUEUE_SIZE, &timeout); + for (int i = 0; i < nev; i++) + { + struct kevent &event = m_events[i]; + int fd = (int)event.ident; + auto it = std::find(m_sockets.begin(), m_sockets.end(), fd); + assert(it != m_sockets.end()); + Socket socket = it->socket; + int flags = it->flags; + + LOG_TRACE("Handling socket 0x%x active flags 0x%x (armed 0x%x)", static_cast(socket), + event.flags, event.fflags); + + if (event.filter == EVFILT_READ) + { + if (flags & Acceptable) + { + m_callback.onSocketAcceptable(socket); + } + if (flags & Readable) + { + m_callback.onSocketReadable(socket); + } + continue; + } + + if (event.filter == EVFILT_WRITE) + { + if (flags & Writable) + { + m_callback.onSocketWritable(socket); + } + continue; + } + + if ((event.flags & EV_EOF) || (event.flags & EV_ERROR)) + { + LOG_TRACE("event.filter=%s", "EVFILT_WRITE"); + m_callback.onSocketClosed(socket); + it->flags = Closed; + struct kevent kevt; + EV_SET(&kevt, event.ident, EVFILT_READ, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &kevt, 1, NULL, 0, NULL)) + { + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + EV_SET(&kevt, event.ident, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + if (-1 == kevent(kq, &kevt, 1, NULL, 0, NULL)) + { + LOG_ERROR("cannot delete fd=0x%x from kqueue!", event.ident); + } + continue; + } + LOG_ERROR("Reactor: unhandled kevent!"); + } +#endif + } + LOG_TRACE("Reactor: Thread done"); + } +}; + +} // namespace SocketTools