diff --git a/deflect/ImageJpegCompressor.cpp b/deflect/ImageJpegCompressor.cpp index a4bf8d0..c3f27a8 100644 --- a/deflect/ImageJpegCompressor.cpp +++ b/deflect/ImageJpegCompressor.cpp @@ -42,6 +42,7 @@ #include "ImageWrapper.h" #include +#include namespace deflect { @@ -72,7 +73,8 @@ int _getTurboJpegFormat(const PixelFormat pixelFormat) case ABGR: return TJPF_XBGR; default: - std::cerr << "unknown pixel format" << std::endl; + throw std::invalid_argument("unknown pixel format " + + std::to_string((int)pixelFormat)); return TJPF_RGB; } } @@ -88,7 +90,8 @@ int _getTurboJpegSubsamp(const ChromaSubsampling subsampling) case ChromaSubsampling::YUV420: return TJSAMP_420; default: - std::cerr << "unknown subsampling format" << std::endl; + throw std::invalid_argument("unknown subsampling format " + + std::to_string((int)subsampling)); return TJSAMP_444; } } @@ -100,6 +103,10 @@ QByteArray ImageJpegCompressor::computeJpeg(const ImageWrapper& sourceImage, // though it does not modify it. It can "safely" be cast to non-const // pointer to comply with the incorrect API. unsigned char* tjSrcBuffer = (unsigned char*)sourceImage.data; + if (!tjSrcBuffer) + throw std::invalid_argument( + "libjpeg-turbo image conversion failure: source image is NULL"); + tjSrcBuffer += imageRegion.y() * sourceImage.width * sourceImage.getBytesPerPixel(); tjSrcBuffer += imageRegion.x() * sourceImage.getBytesPerPixel(); @@ -124,8 +131,9 @@ QByteArray ImageJpegCompressor::computeJpeg(const ImageWrapper& sourceImage, tjJpegQual, tjFlags); if (err != 0) { - std::cerr << "libjpeg-turbo image conversion failure" << std::endl; - return QByteArray(); + std::stringstream msg; + msg << "libjpeg-turbo image conversion failure: " << tjGetErrorStr(); + throw std::runtime_error(msg.str()); } return QByteArray((const char*)ptr, tjJpegSize); diff --git a/deflect/ImageJpegCompressor.h b/deflect/ImageJpegCompressor.h index 6cf24e7..9c07e63 100644 --- a/deflect/ImageJpegCompressor.h +++ b/deflect/ImageJpegCompressor.h @@ -65,6 +65,9 @@ class ImageJpegCompressor * @param sourceImage The source image containing uncompressed image data. * @param imageRegion The region of the image to be compressed. Must not * exceed image dimensions. + * @return compressed image + * @throw std::invalid_argument if sourceImage.data is nullptr + * @throw std::runtime_error if JPEG compression failed */ DEFLECT_API QByteArray computeJpeg(const ImageWrapper& sourceImage, const QRect& imageRegion); diff --git a/deflect/ImageSegmenter.cpp b/deflect/ImageSegmenter.cpp index 38dbae8..603183e 100644 --- a/deflect/ImageSegmenter.cpp +++ b/deflect/ImageSegmenter.cpp @@ -69,19 +69,37 @@ bool ImageSegmenter::generate(const ImageWrapper& image, const Handler& handler) return _generateRaw(image, handler); } -Segment ImageSegmenter::compressSingleSegment(const ImageWrapper& image) +Segment ImageSegmenter::createSingleSegment(const ImageWrapper& image) { -#ifdef DEFLECT_USE_LIBJPEGTURBO auto segments = _generateSegments(image); if (segments.size() > 1) throw std::runtime_error( - "compressSingleSegment only works for small images"); - _computeJpeg(segments[0], false); - return segments[0]; + "createSingleSegment only works for small images"); + + auto& segment = segments[0]; + + if (image.compressionPolicy == COMPRESSION_OFF) + { + segment.imageData.reserve(segment.parameters.width * + segment.parameters.height * + image.getBytesPerPixel()); + segment.parameters.dataType = DataType::rgba; + segment.imageData.append((const char*)image.data, + int(image.getBufferSize())); + } + else + { +#ifdef DEFLECT_USE_LIBJPEGTURBO + _computeJpeg(segment, false); + if (segment.exception) + std::rethrow_exception(segment.exception); #else - throw std::runtime_error( - "LibJpegTurbo not available, needed for compressSingleSegment"); + throw std::runtime_error( + "LibJpegTurbo not available, needed for createSingleSegment"); #endif + } + + return segment; } void ImageSegmenter::setNominalSegmentDimensions(const uint width, @@ -136,8 +154,17 @@ void ImageSegmenter::_computeJpeg(Segment& segment, const bool sendSegment) // turbojpeg handles need to be per thread, and this function is called from // multiple threads by QtConcurrent::map static QThreadStorage compressor; - segment.imageData = - compressor.localData().computeJpeg(*segment.sourceImage, imageRegion); + try + { + segment.imageData = + compressor.localData().computeJpeg(*segment.sourceImage, + imageRegion); + } + catch (...) + { + segment.exception = std::current_exception(); + } + segment.parameters.dataType = DataType::jpeg; if (sendSegment) _sendQueue.enqueue(segment); @@ -205,7 +232,8 @@ Segments ImageSegmenter::_generateSegments(const ImageWrapper& image) const if (image.view == View::side_by_side) { if (image.width % 2 != 0) - throw std::runtime_error("side_by_side image width must be even!"); + throw std::invalid_argument( + "side_by_side image width must be even!"); // create copy of segments for right view auto segmentsRight = segments; diff --git a/deflect/ImageSegmenter.h b/deflect/ImageSegmenter.h index ed100f7..c9b6a2d 100644 --- a/deflect/ImageSegmenter.h +++ b/deflect/ImageSegmenter.h @@ -98,9 +98,12 @@ class ImageSegmenter * * @param image The image to be compressed * @return the compressed segment + * @throw std::invalid_argument if image is too big or invalid JPEG + * compression arguments + * @throw std::runtime_error if JPEG compression failed * @threadsafe */ - DEFLECT_API Segment compressSingleSegment(const ImageWrapper& image); + DEFLECT_API Segment createSingleSegment(const ImageWrapper& image); private: struct SegmentationInfo diff --git a/deflect/ImageWrapper.h b/deflect/ImageWrapper.h index 6f82cbc..470de9e 100644 --- a/deflect/ImageWrapper.h +++ b/deflect/ImageWrapper.h @@ -124,7 +124,7 @@ struct ImageWrapper //@{ CompressionPolicy compressionPolicy; /**< Is the image to be compressed (default: auto). @version 1.0 */ - unsigned int compressionQuality; /**< Compression quality (0 worst, + unsigned int compressionQuality; /**< Compression quality (1 worst, 100 best, default: 75). @version 1.0 */ ChromaSubsampling subsampling; /**< Chrominance sub-sampling. diff --git a/deflect/Observer.cpp b/deflect/Observer.cpp index f96e118..aa4e85c 100644 --- a/deflect/Observer.cpp +++ b/deflect/Observer.cpp @@ -40,6 +40,7 @@ /*********************************************************************/ #include "Observer.h" +#include "NetworkProtocol.h" #include "StreamPrivate.h" #include @@ -49,8 +50,10 @@ namespace deflect { -Observer::Observer() - : _impl(new StreamPrivate("", "", Socket::defaultPortNumber, true)) +const unsigned short Observer::defaultPortNumber = DEFAULT_PORT_NUMBER; + +Observer::Observer(const unsigned short port) + : _impl(new StreamPrivate("", "", port, true)) { } @@ -169,4 +172,16 @@ void Observer::setDisconnectedCallback(const std::function callback) { _impl->disconnectedCallback = callback; } + +void Observer::sendSizeHints(const SizeHints& hints) +{ + _impl->sendWorker.enqueueSizeHints(hints); +} + +bool Observer::sendData(const char* data, const size_t count) +{ + return _impl->sendWorker + .enqueueData(QByteArray::fromRawData(data, int(count))) + .get(); +} } diff --git a/deflect/Observer.h b/deflect/Observer.h index 4b26676..48facff 100644 --- a/deflect/Observer.h +++ b/deflect/Observer.h @@ -69,16 +69,21 @@ class StreamPrivate; class Observer { public: + /** The default communication port */ + static const unsigned short defaultPortNumber; + /** * Open a new connection to the Server using environment variables. * * DEFLECT_HOST The address of the target Server instance (required). * DEFLECT_ID The identifier for the stream. If not provided, a random * unique identifier will be used. - * @throw std::runtime_error if DEFLECT_HOST was not provided. + * @param port Port of the Server instance, default 1701. + * @throw std::runtime_error if DEFLECT_HOST was not provided or no + * connection to server could be established * @version 1.3 */ - DEFLECT_API Observer(); + DEFLECT_API explicit Observer(unsigned short port = defaultPortNumber); /** * Open a new connection to the Server. @@ -97,11 +102,12 @@ class Observer * "192.168.1.83". If left empty, the environment variable * DEFLECT_HOST will be used instead. * @param port Port of the Server instance, default 1701. - * @throw std::runtime_error if no host was provided. + * @throw std::runtime_error if no host was provided or no + * connection to server could be established * @version 1.0 */ DEFLECT_API Observer(const std::string& id, const std::string& host, - unsigned short port = 1701); + unsigned short port = defaultPortNumber); /** Destruct the Observer, closing the connection. @version 1.0 */ DEFLECT_API virtual ~Observer(); @@ -199,6 +205,27 @@ class Observer */ DEFLECT_API void setDisconnectedCallback(std::function callback); + /** + * Send size hints to the stream server to indicate sizes that should be + * respected by resize operations on the server side. + * + * @note blocks until all pending asynchonous send operations are finished. + * @param hints the new size hints for the server + * @version 1.2 + */ + DEFLECT_API void sendSizeHints(const SizeHints& hints); + + /** + * Send data to the Server. + * + * @note blocks until all pending asynchonous send operations are finished. + * @param data the pointer to the data buffer. + * @param count the number of bytes to send. + * @return true if the data could be sent, false otherwise + * @version 1.3 + */ + DEFLECT_API bool sendData(const char* data, size_t count); + protected: Observer(const Observer&) = delete; const Observer& operator=(const Observer&) = delete; diff --git a/deflect/Segment.h b/deflect/Segment.h index 95f955b..6ff1791 100644 --- a/deflect/Segment.h +++ b/deflect/Segment.h @@ -63,6 +63,9 @@ struct Segment /** @internal raw, uncompressed source image, used for compression */ const ImageWrapper* sourceImage = nullptr; + + /** @internal holds potential exception from compression thread */ + std::exception_ptr exception; }; } diff --git a/deflect/Socket.cpp b/deflect/Socket.cpp index b224c01..ad139a0 100644 --- a/deflect/Socket.cpp +++ b/deflect/Socket.cpp @@ -57,8 +57,6 @@ const int RECEIVE_TIMEOUT_MS = 1000; namespace deflect { -const unsigned short Socket::defaultPortNumber = DEFAULT_PORT_NUMBER; - Socket::Socket(const std::string& host, const unsigned short port) : _host(host) , _socket(new QTcpSocket(this)) // Ensure that _socket parent is diff --git a/deflect/Socket.h b/deflect/Socket.h index 2e9094c..e0b124b 100644 --- a/deflect/Socket.h +++ b/deflect/Socket.h @@ -66,16 +66,12 @@ class Socket : public QObject Q_OBJECT public: - /** The default communication port */ - static const unsigned short defaultPortNumber; - /** * Construct a Socket and connect to host. * @param host The target host (IP address or hostname) * @param port The target port */ - DEFLECT_API Socket(const std::string& host, - unsigned short port = defaultPortNumber); + DEFLECT_API Socket(const std::string& host, unsigned short port); /** Destruct a Socket, disconnecting from host. */ DEFLECT_API ~Socket() = default; diff --git a/deflect/Stream.cpp b/deflect/Stream.cpp index 1f5b767..b533b56 100644 --- a/deflect/Stream.cpp +++ b/deflect/Stream.cpp @@ -44,8 +44,8 @@ namespace deflect { -Stream::Stream() - : Observer(new StreamPrivate("", "", Socket::defaultPortNumber, false)) +Stream::Stream(const unsigned short port) + : Observer(new StreamPrivate("", "", port, false)) { } @@ -73,16 +73,4 @@ Stream::Future Stream::sendAndFinish(const ImageWrapper& image) { return _impl->sendWorker.enqueueImage(image, true); } - -void Stream::sendSizeHints(const SizeHints& hints) -{ - _impl->sendWorker.enqueueSizeHints(hints); -} - -bool Stream::sendData(const char* data, const size_t count) -{ - return _impl->sendWorker - .enqueueData(QByteArray::fromRawData(data, int(count))) - .get(); -} } diff --git a/deflect/Stream.h b/deflect/Stream.h index aa9e336..bd23fcb 100644 --- a/deflect/Stream.h +++ b/deflect/Stream.h @@ -68,10 +68,12 @@ class Stream : public Observer * DEFLECT_HOST The address of the target Server instance (required). * DEFLECT_ID The identifier for the stream. If not provided, a random * unique identifier will be used. - * @throw std::runtime_error if DEFLECT_HOST was not provided. + * @param port Port of the Server instance, default 1701. + * @throw std::runtime_error if DEFLECT_HOST was not provided or no + * connection to server could be established * @version 1.3 */ - DEFLECT_API Stream(); + DEFLECT_API explicit Stream(unsigned short port = defaultPortNumber); /** * Open a new connection to the Server. @@ -91,11 +93,12 @@ class Stream : public Observer * "192.168.1.83". If left empty, the environment variable * DEFLECT_HOST will be used instead. * @param port Port of the Server instance, default 1701. - * @throw std::runtime_error if no host was provided. + * @throw std::runtime_error if no host was provided or no + * connection to server could be established * @version 1.0 */ DEFLECT_API Stream(const std::string& id, const std::string& host, - unsigned short port = 1701); + unsigned short port = defaultPortNumber); /** Destruct the Stream, closing the connection. @version 1.0 */ DEFLECT_API virtual ~Stream(); @@ -111,6 +114,10 @@ class Stream : public Observer * @param image The image to send. Note that the image is not copied, so the * referenced must remain valid until the send is finished. * @return true if the image data could be sent, false otherwise + * @throw std::invalid_argument if RGBA and uncompressed + * @throw std::invalid_argument if invalid JPEG compression arguments + * @throw std::runtime_error if pending finishFrame() has not been completed + * @throw std::runtime_error if JPEG compression failed * @version 1.6 * @sa finishFrame() */ @@ -140,6 +147,10 @@ class Stream : public Observer * @param image The image to send. Note that the image is not copied, so the * referenced must remain valid until the send is finished * @return true if the image data could be sent, false otherwise. + * @throw std::invalid_argument if RGBA and uncompressed + * @throw std::invalid_argument if invalid JPEG compression arguments + * @throw std::runtime_error if pending finishFrame() has not been completed + * @throw std::runtime_error if JPEG compression failed * @see send() * @version 1.6 */ @@ -149,27 +160,6 @@ class Stream : public Observer Future asyncSend(const ImageWrapper& image) { return sendAndFinish(image); } //@} - /** - * Send size hints to the stream server to indicate sizes that should be - * respected by resize operations on the server side. - * - * @note blocks until all pending asynchonous send operations are finished. - * @param hints the new size hints for the server - * @version 1.2 - */ - DEFLECT_API void sendSizeHints(const SizeHints& hints); - - /** - * Send data to the Server. - * - * @note blocks until all pending asynchonous send operations are finished. - * @param data the pointer to the data buffer. - * @param count the number of bytes to send. - * @return true if the data could be sent, false otherwise - * @version 1.3 - */ - DEFLECT_API bool sendData(const char* data, size_t count); - private: Stream(const Stream&) = delete; const Stream& operator=(const Stream&) = delete; diff --git a/deflect/StreamPrivate.cpp b/deflect/StreamPrivate.cpp index f2a09a5..819b754 100644 --- a/deflect/StreamPrivate.cpp +++ b/deflect/StreamPrivate.cpp @@ -86,7 +86,8 @@ StreamPrivate::StreamPrivate(const std::string& id_, const std::string& host, , sendWorker{socket, id} { if (!socket.isConnected()) - return; + throw std::runtime_error( + "Connection to deflect server could not be established"); socket.connect(&socket, &Socket::disconnected, [this]() { if (disconnectedCallback) diff --git a/deflect/StreamSendWorker.cpp b/deflect/StreamSendWorker.cpp index 67b709e..7fbd95a 100644 --- a/deflect/StreamSendWorker.cpp +++ b/deflect/StreamSendWorker.cpp @@ -45,6 +45,7 @@ #include "SizeHints.h" #include +#include namespace { @@ -108,23 +109,37 @@ void StreamSendWorker::run() if (request.isFinish) { if (_pendingFinish) - throw std::runtime_error("Already have pending finish"); + { + if (request.promise) + request.promise->set_exception(std::make_exception_ptr( + std::runtime_error("Already have pending finish"))); + continue; + } _finishRequest = request; _pendingFinish = true; continue; } - for (auto& task : request.tasks) + try { - if (!task()) + for (auto& task : request.tasks) { - success = false; - break; + if (!task()) + { + success = false; + break; + } } + + if (request.promise) + request.promise->set_value(success); + } + catch (...) + { + if (request.promise) + request.promise->set_exception(std::current_exception()); } - if (request.promise) - request.promise->set_value(success); } } } @@ -151,29 +166,53 @@ Stream::Future StreamSendWorker::enqueueImage(const ImageWrapper& image, const bool finish) { if (_pendingFinish) - throw(std::runtime_error{"Pending finish, no send allowed"}); + { + return make_exception_future( + std::runtime_error("Pending finish, no send allowed")); + } if (image.compressionPolicy != COMPRESSION_ON && image.pixelFormat != RGBA) { - std::cerr << "Currently, RAW images can only be sent in RGBA format. " - "Other formats support remain to be implemented." - << std::endl; - return make_ready_future(false); + return make_exception_future(std::invalid_argument( + "Currently, RAW images can only be sent in RGBA format. Other " + "formats support remain to be implemented.")); + } + + if (image.compressionPolicy == COMPRESSION_ON) + { + if (image.compressionQuality < 1 || image.compressionQuality > 100) + { + std::stringstream msg; + msg << "JPEG compression quality must be between 1 and 100, got " + << image.compressionQuality << std::endl; + return make_exception_future( + std::invalid_argument(msg.str())); + } } std::vector tasks; - if (image.width <= SMALL_IMAGE_SIZE && image.height <= SMALL_IMAGE_SIZE && - image.compressionPolicy == COMPRESSION_ON) + if (image.width <= SMALL_IMAGE_SIZE && image.height <= SMALL_IMAGE_SIZE) { - auto segment = _imageSegmenter.compressSingleSegment(image); - tasks.emplace_back([this, segment] { return _sendSegment(segment); }); - - // as we expect to encounter a lot of these small sends, be optimistic - // and fulfill the promise already to reduce load in the send thread - // (c.f. lock ops performance on KNL) - _requests.enqueue({nullptr, tasks, false}); - return make_ready_future(true); + try + { + auto segment = _imageSegmenter.createSingleSegment(image); + + tasks.emplace_back( + [this, segment] { return _sendSegment(segment); }); + + // as we expect to encounter a lot of these small sends, be + // optimistic and fulfill the promise already to reduce load in the + // send thread (c.f. lock ops performance on KNL) + _requests.enqueue({nullptr, tasks, false}); + if (finish) + return enqueueFinish(); + return make_ready_future(true); + } + catch (...) + { + return make_exception_future(std::current_exception()); + } } else tasks.emplace_back([this, image] { return _sendImage(image); }); @@ -256,6 +295,9 @@ bool StreamSendWorker::_sendImageView(const View view) bool StreamSendWorker::_sendSegment(const Segment& segment) { + if (segment.exception) + std::rethrow_exception(segment.exception); + if (segment.view != _currentView) { if (!_sendImageView(segment.view)) diff --git a/deflect/types.h b/deflect/types.h index dcb8cbc..ee316b9 100644 --- a/deflect/types.h +++ b/deflect/types.h @@ -81,6 +81,19 @@ std::future make_ready_future(T&& value) return promise.get_future(); } +template +std::future make_exception_future(std::exception_ptr&& e) +{ + std::promise promise; + promise.set_exception(std::move(e)); + return promise.get_future(); +} +template +std::future make_exception_future(Exception&& e) +{ + return make_exception_future(std::make_exception_ptr(std::move(e))); +} + class EventReceiver; class Frame; class FrameDispatcher; diff --git a/doc/Changelog.md b/doc/Changelog.md index 4677130..d313154 100644 --- a/doc/Changelog.md +++ b/doc/Changelog.md @@ -5,6 +5,10 @@ Changelog {#Changelog} ### 0.14.0 (git master) +* [177](https://github.com/BlueBrain/Deflect/pull/177): + Improve stream error handling + * Catch errors regarding invalid JPEG quality values + * Set exceptions in returned futures on errors * [176](https://github.com/BlueBrain/Deflect/pull/176): OPT: Lock-free request queueing for multi-threaded stream clients (e.g. KNL) * [175](https://github.com/BlueBrain/Deflect/pull/175): diff --git a/tests/cpp/ServerTests.cpp b/tests/cpp/ServerTests.cpp index e0e2066..a0fe917 100644 --- a/tests/cpp/ServerTests.cpp +++ b/tests/cpp/ServerTests.cpp @@ -41,19 +41,13 @@ #include namespace ut = boost::unit_test; +#include "DeflectServer.h" #include "MinimalGlobalQtApp.h" -#include #include -#include #include #include -#include - -#include -#include -#include namespace { @@ -62,279 +56,105 @@ const QString testStreamId("teststream"); BOOST_GLOBAL_FIXTURE(MinimalGlobalQtApp); +BOOST_FIXTURE_TEST_SUITE(server, DeflectServer) + BOOST_AUTO_TEST_CASE(testSizeHintsReceivedByServer) { - QThread serverThread; - deflect::Server* server = new deflect::Server(0 /* OS-chosen port */); - server->moveToThread(&serverThread); - serverThread.connect(&serverThread, &QThread::finished, server, - &deflect::Server::deleteLater); - serverThread.start(); - - QWaitCondition received; - QMutex mutex; - deflect::SizeHints testHints; testHints.maxWidth = 500; testHints.preferredHeight = 200; - QString streamId; - deflect::SizeHints sizeHints; - - bool receivedState = false; - server->connect(server, &deflect::Server::receivedSizeHints, - [&](const QString id, const deflect::SizeHints hints) { - streamId = id; - sizeHints = hints; - mutex.lock(); - receivedState = true; - received.wakeAll(); - mutex.unlock(); - }); + bool received = false; + setSizeHintsCallback([&](const QString id, const deflect::SizeHints hints) { + BOOST_CHECK_EQUAL(id.toStdString(), testStreamId.toStdString()); + BOOST_CHECK(hints == testHints); + received = true; + }); { deflect::Stream stream(testStreamId.toStdString(), "localhost", - server->serverPort()); + serverPort()); BOOST_CHECK(stream.isConnected()); stream.sendSizeHints(testHints); - - for (size_t i = 0; i < 20; ++i) - { - mutex.lock(); - received.wait(&mutex, 100 /*ms*/); - if (receivedState) - { - BOOST_CHECK_EQUAL(streamId.toStdString(), - testStreamId.toStdString()); - BOOST_CHECK(sizeHints == testHints); - - serverThread.quit(); - serverThread.wait(); - mutex.unlock(); - return; - } - mutex.unlock(); - } - BOOST_CHECK(!"reachable"); + waitForMessage(); } + + waitForMessage(); + BOOST_CHECK_EQUAL(getOpenedStreams(), 0); + BOOST_CHECK(received); } BOOST_AUTO_TEST_CASE(testRegisterForEventReceivedByServer) { - QThread serverThread; - deflect::Server* server = new deflect::Server(0 /* OS-chosen port */); - server->moveToThread(&serverThread); - serverThread.connect(&serverThread, &QThread::finished, server, - &deflect::Server::deleteLater); - serverThread.start(); - - QWaitCondition received; - QMutex mutex; - - QString streamId; - bool exclusiveBind = false; - deflect::EventReceiver* eventReceiver = nullptr; - - bool receivedState = false; - server->connect(server, &deflect::Server::registerToEvents, - [&](const QString id, const bool exclusive, - deflect::EventReceiver* receiver, - deflect::BoolPromisePtr success) { - streamId = id; - exclusiveBind = exclusive; - eventReceiver = receiver; - success->set_value(true); - mutex.lock(); - receivedState = true; - received.wakeAll(); - mutex.unlock(); - }); + bool received = false; + setRegisterToEventsCallback([&](const QString id, const bool exclusive, + deflect::EventReceiver* eventReceiver) { + BOOST_CHECK_EQUAL(id.toStdString(), testStreamId.toStdString()); + BOOST_CHECK(exclusive); + BOOST_CHECK(eventReceiver); + received = true; + }); { deflect::Stream stream(testStreamId.toStdString(), "localhost", - server->serverPort()); + serverPort()); BOOST_REQUIRE(stream.isConnected()); BOOST_CHECK(stream.registerForEvents(true)); + waitForMessage(); } - for (size_t i = 0; i < 20; ++i) - { - mutex.lock(); - received.wait(&mutex, 100 /*ms*/); - if (receivedState) - { - BOOST_CHECK_EQUAL(streamId.toStdString(), - testStreamId.toStdString()); - BOOST_CHECK(exclusiveBind); - BOOST_CHECK(eventReceiver); - - serverThread.quit(); - serverThread.wait(); - mutex.unlock(); - return; - } - mutex.unlock(); - } - BOOST_CHECK(!"reachable"); + waitForMessage(); + BOOST_CHECK_EQUAL(getOpenedStreams(), 0); + BOOST_CHECK(received); } BOOST_AUTO_TEST_CASE(testDataReceivedByServer) { - if (getenv("TRAVIS")) - { - std::cout << "ignore testDataReceivedByServer on Jenkins" << std::endl; - return; - } - - QThread serverThread; - deflect::Server* server = new deflect::Server(0 /* OS-chosen port */); - server->moveToThread(&serverThread); - serverThread.connect(&serverThread, &QThread::finished, server, - &deflect::Server::deleteLater); - serverThread.start(); - - QWaitCondition received; - QMutex mutex; - - QString streamId; - std::string receivedData; const auto sentData = std::string{"Hello World!"}; - bool receivedState = false; - server->connect(server, &deflect::Server::receivedData, - [&](const QString id, QByteArray data) { - streamId = id; - receivedData = QString(data).toStdString(); - mutex.lock(); - receivedState = true; - received.wakeAll(); - mutex.unlock(); - }); + bool received = false; + setDataReceivedCallback([&](const QString id, QByteArray data) { + BOOST_CHECK_EQUAL(id.toStdString(), testStreamId.toStdString()); + BOOST_CHECK_EQUAL(data.toStdString(), sentData); + received = true; + }); { deflect::Stream stream(testStreamId.toStdString(), "localhost", - server->serverPort()); + serverPort()); BOOST_REQUIRE(stream.isConnected()); stream.sendData(sentData.data(), sentData.size()); + waitForMessage(); } - for (size_t i = 0; i < 20; ++i) - { - mutex.lock(); - received.wait(&mutex, 100 /*ms*/); - if (receivedState) - { - BOOST_CHECK_EQUAL(streamId.toStdString(), - testStreamId.toStdString()); - BOOST_CHECK_EQUAL(receivedData, sentData); - - serverThread.quit(); - serverThread.wait(); - mutex.unlock(); - return; - } - mutex.unlock(); - } - BOOST_CHECK(!"reachable"); + waitForMessage(); + BOOST_CHECK_EQUAL(getOpenedStreams(), 0); + BOOST_CHECK(received); } BOOST_AUTO_TEST_CASE(testOneObserverAndOneStream) { - QThread serverThread; - deflect::Server* server = new deflect::Server(0 /* OS-chosen port */); - server->moveToThread(&serverThread); - serverThread.connect(&serverThread, &QThread::finished, server, - &deflect::Server::deleteLater); - serverThread.start(); - - // to wait in this thread until server thread is done with certain - // operations - QWaitCondition received; - QMutex mutex; - bool receivedState = false; - - auto processServerMessages = [&] { - for (size_t j = 0; j < 20; ++j) - { - mutex.lock(); - received.wait(&mutex, 100 /*ms*/); - if (receivedState) - { - mutex.unlock(); - break; - } - mutex.unlock(); - } - BOOST_REQUIRE(receivedState); - receivedState = false; - }; - - size_t openedStreams = 0; - - // only continue once we have the stream, whichever comes first - server->connect(server, &deflect::Server::pixelStreamOpened, - [&](const QString) { - ++openedStreams; - if (openedStreams == 1) - { - mutex.lock(); - receivedState = true; - received.wakeAll(); - mutex.unlock(); - } - }); - - // make sure that we get the close of the stream and the observer - server->connect(server, &deflect::Server::pixelStreamClosed, - [&](const QString) { - --openedStreams; - if (openedStreams == 0) - { - mutex.lock(); - receivedState = true; - received.wakeAll(); - mutex.unlock(); - } - }); - - // register to events to test the observer's purpose - deflect::EventReceiver* eventReceiver = nullptr; - server->connect(server, &deflect::Server::registerToEvents, - [&](const QString, const bool, - deflect::EventReceiver* evtReceiver, - deflect::BoolPromisePtr success) { - eventReceiver = evtReceiver; - success->set_value(true); - }); + setFrameReceivedCallback([&](deflect::FramePtr frame) { + BOOST_CHECK_EQUAL(frame->segments.size(), 1); + BOOST_CHECK_EQUAL(frame->uri.toStdString(), testStreamId.toStdString()); + }); // handle received frames to test the stream's purpose const size_t expectedFrames = 2; - size_t receivedFrames = 0; - server->connect(server, &deflect::Server::receivedFrame, - [&](deflect::FramePtr frame) { - BOOST_CHECK_EQUAL(frame->segments.size(), 1); - BOOST_CHECK_EQUAL(frame->uri.toStdString(), - testStreamId.toStdString()); - ++receivedFrames; - mutex.lock(); - receivedState = true; - received.wakeAll(); - mutex.unlock(); - }); { deflect::Stream stream(testStreamId.toStdString(), "localhost", - server->serverPort()); + serverPort()); BOOST_REQUIRE(stream.isConnected()); deflect::Observer observer(testStreamId.toStdString(), "localhost", - server->serverPort()); + serverPort()); BOOST_REQUIRE(observer.isConnected()); BOOST_CHECK(observer.registerForEvents(true)); // handle connects first before sending and receiving frames - processServerMessages(); + waitForMessage(); const unsigned int width = 4; const unsigned int height = 4; @@ -350,14 +170,13 @@ BOOST_AUTO_TEST_CASE(testOneObserverAndOneStream) // are received accordingly for (size_t i = 0; i < expectedFrames; ++i) { - receivedState = false; stream.sendAndFinish(image).wait(); - server->requestFrame(testStreamId); + requestFrame(testStreamId); event.key = i; - eventReceiver->processEvent(event); + processEvent(event); // process event sending first before receiving in observer - processServerMessages(); + waitForMessage(); while (!observer.hasEvent()) ; @@ -369,46 +188,14 @@ BOOST_AUTO_TEST_CASE(testOneObserverAndOneStream) } // handle close of streamer and observer - processServerMessages(); - - serverThread.quit(); - serverThread.wait(); + waitForMessage(); - BOOST_CHECK_EQUAL(openedStreams, 0); - BOOST_CHECK_EQUAL(receivedFrames, expectedFrames); + BOOST_CHECK_EQUAL(getOpenedStreams(), 0); + BOOST_CHECK_EQUAL(getReceivedFrames(), expectedFrames); } BOOST_AUTO_TEST_CASE(testThreadedSmallSegmentStream) { - QThread serverThread; - deflect::Server* server = new deflect::Server(0 /* OS-chosen port */); - server->moveToThread(&serverThread); - serverThread.connect(&serverThread, &QThread::finished, server, - &deflect::Server::deleteLater); - serverThread.start(); - - // to wait in this thread until server thread is done with certain - // operations - QWaitCondition received; - QMutex mutex; - bool receivedState = false; - - auto processServerMessages = [&] { - for (size_t j = 0; j < 20; ++j) - { - mutex.lock(); - received.wait(&mutex, 100 /*ms*/); - if (receivedState) - { - mutex.unlock(); - break; - } - mutex.unlock(); - } - BOOST_REQUIRE(receivedState); - receivedState = false; - }; - const unsigned int segmentSize = 64; const unsigned int width = 1920; const unsigned int height = 1088; @@ -430,66 +217,30 @@ BOOST_AUTO_TEST_CASE(testThreadedSmallSegmentStream) std::ceil((float)width / segmentSize) * std::ceil((float)height / segmentSize)); - const std::vector pixels(segmentSize * segmentSize * 4, 42); + setFrameReceivedCallback([&](deflect::FramePtr frame) { + BOOST_CHECK_EQUAL(frame->segments.size(), numSegments); + BOOST_CHECK_EQUAL(frame->uri.toStdString(), testStreamId.toStdString()); + const auto dim = frame->computeDimensions(); + BOOST_CHECK_EQUAL(dim.width(), width); + BOOST_CHECK_EQUAL(dim.height(), height); + }); - size_t openedStreams = 0; - // only continue once we have the stream - server->connect(server, &deflect::Server::pixelStreamOpened, - [&](const QString) { - ++openedStreams; - if (openedStreams == 1) - { - mutex.lock(); - receivedState = true; - received.wakeAll(); - mutex.unlock(); - } - }); - - // make sure that we get the close of the stream - server->connect(server, &deflect::Server::pixelStreamClosed, - [&](const QString) { - --openedStreams; - if (openedStreams == 0) - { - mutex.lock(); - receivedState = true; - received.wakeAll(); - mutex.unlock(); - } - }); + const std::vector pixels(segmentSize * segmentSize * 4, 42); // handle received frames to test the stream's purpose const size_t expectedFrames = 10; - size_t receivedFrames = 0; - server->connect(server, &deflect::Server::receivedFrame, - [&](deflect::FramePtr frame) { - BOOST_CHECK_EQUAL(frame->segments.size(), numSegments); - BOOST_CHECK_EQUAL(frame->uri.toStdString(), - testStreamId.toStdString()); - const auto dim = frame->computeDimensions(); - BOOST_CHECK_EQUAL(dim.width(), width); - BOOST_CHECK_EQUAL(dim.height(), height); - ++receivedFrames; - mutex.lock(); - receivedState = true; - received.wakeAll(); - mutex.unlock(); - }); { deflect::Stream stream(testStreamId.toStdString(), "localhost", - server->serverPort()); + serverPort()); BOOST_REQUIRE(stream.isConnected()); // handle connects first before sending and receiving frames - processServerMessages(); + waitForMessage(); std::mutex testMutex; // BOOST_CHECK is not thread-safe for (size_t i = 0; i < expectedFrames; ++i) { - receivedState = false; - // to make coverage report work; otherwise fails for unknown reasons #ifdef NDEBUG #pragma omp parallel for @@ -510,19 +261,32 @@ BOOST_AUTO_TEST_CASE(testThreadedSmallSegmentStream) BOOST_CHECK(stream.finishFrame().get()); - server->requestFrame(testStreamId); + requestFrame(testStreamId); // process frame receive - processServerMessages(); + waitForMessage(); } } // handle close of streamer - processServerMessages(); + waitForMessage(); - serverThread.quit(); - serverThread.wait(); + BOOST_CHECK_EQUAL(getOpenedStreams(), 0); + BOOST_CHECK_EQUAL(getReceivedFrames(), expectedFrames); +} + +BOOST_AUTO_TEST_CASE(testCompressionErrorForBigNullImage) +{ + deflect::Stream stream(testStreamId.toStdString(), "localhost", + serverPort()); + BOOST_REQUIRE(stream.isConnected()); - BOOST_CHECK_EQUAL(openedStreams, 0); - BOOST_CHECK_EQUAL(receivedFrames, expectedFrames); + // handle connect of stream + waitForMessage(); + + deflect::ImageWrapper bigImage(nullptr, 1000, 1000, deflect::ARGB); + bigImage.compressionPolicy = deflect::COMPRESSION_ON; + BOOST_CHECK_THROW(stream.send(bigImage).get(), std::invalid_argument); } + +BOOST_AUTO_TEST_SUITE_END() diff --git a/tests/cpp/SocketTests.cpp b/tests/cpp/SocketTests.cpp index 40c91bb..5051b3f 100644 --- a/tests/cpp/SocketTests.cpp +++ b/tests/cpp/SocketTests.cpp @@ -41,30 +41,20 @@ #include namespace ut = boost::unit_test; +#include "MinimalDeflectServer.h" #include "MinimalGlobalQtApp.h" -#include "MockServer.h" -#include #include -#include - BOOST_GLOBAL_FIXTURE(MinimalGlobalQtApp); void testSocketConnect(const int32_t versionOffset) { - QThread thread; - auto server = new MockServer(NETWORK_PROTOCOL_VERSION + versionOffset); - server->moveToThread(&thread); - server->connect(&thread, &QThread::finished, server, &QObject::deleteLater); - thread.start(); + MinimalDeflectServer server(versionOffset); - deflect::Socket socket("localhost", server->serverPort()); + deflect::Socket socket("localhost", server.serverPort()); BOOST_CHECK(socket.isConnected() == (versionOffset >= 0)); - - thread.quit(); - thread.wait(); } BOOST_AUTO_TEST_CASE( diff --git a/tests/cpp/StreamTests.cpp b/tests/cpp/StreamTests.cpp index fca34f4..9f390ed 100644 --- a/tests/cpp/StreamTests.cpp +++ b/tests/cpp/StreamTests.cpp @@ -41,6 +41,9 @@ #include namespace ut = boost::unit_test; +#include "MinimalDeflectServer.h" +#include "MinimalGlobalQtApp.h" + #include #include @@ -53,66 +56,153 @@ const char* STREAM_ID_ENV_VAR = "DEFLECT_ID"; const char* STREAM_HOST_ENV_VAR = "DEFLECT_HOST"; } -BOOST_AUTO_TEST_CASE(testParameterizedConstructorWithValues) +BOOST_AUTO_TEST_CASE(testConstructionWithNoServer) { - const deflect::Stream stream("mystream", "somehost"); - BOOST_CHECK_EQUAL(stream.getId(), "mystream"); - BOOST_CHECK_EQUAL(stream.getHost(), "somehost"); + BOOST_CHECK_THROW(deflect::Stream("id", "localhost"), std::runtime_error); } +BOOST_GLOBAL_FIXTURE(MinimalGlobalQtApp); +BOOST_FIXTURE_TEST_SUITE(server, MinimalDeflectServer) + BOOST_AUTO_TEST_CASE(testDefaultConstructorReadsEnvironmentVariables) { qputenv(STREAM_ID_ENV_VAR, "mystream"); - qputenv(STREAM_HOST_ENV_VAR, "somehost"); - deflect::Stream stream; + qputenv(STREAM_HOST_ENV_VAR, "localhost"); + deflect::Stream stream(serverPort()); BOOST_CHECK_EQUAL(stream.getId(), "mystream"); - BOOST_CHECK_EQUAL(stream.getHost(), "somehost"); + BOOST_CHECK_EQUAL(stream.getHost(), "localhost"); qunsetenv(STREAM_ID_ENV_VAR); qunsetenv(STREAM_HOST_ENV_VAR); } +BOOST_AUTO_TEST_CASE(testParameterizedConstructorWithValues) +{ + const deflect::Stream stream("mystream", "localhost", serverPort()); + BOOST_CHECK_EQUAL(stream.getId(), "mystream"); + BOOST_CHECK_EQUAL(stream.getHost(), "localhost"); +} + BOOST_AUTO_TEST_CASE(testParameterizedConstructorReadsEnvironmentVariables) { qputenv(STREAM_ID_ENV_VAR, "mystream"); - qputenv(STREAM_HOST_ENV_VAR, "somehost"); - const deflect::Stream stream("", ""); + qputenv(STREAM_HOST_ENV_VAR, "localhost"); + const deflect::Stream stream("", "", serverPort()); BOOST_CHECK_EQUAL(stream.getId(), "mystream"); - BOOST_CHECK_EQUAL(stream.getHost(), "somehost"); + BOOST_CHECK_EQUAL(stream.getHost(), "localhost"); qunsetenv(STREAM_ID_ENV_VAR); qunsetenv(STREAM_HOST_ENV_VAR); } -BOOST_AUTO_TEST_CASE(testWhenSteamHostNotProvidedThenThrow) +BOOST_AUTO_TEST_CASE(testWhenStreamHostNotProvidedThenThrow) { BOOST_REQUIRE(QString(qgetenv(STREAM_HOST_ENV_VAR)).isEmpty()); BOOST_CHECK_THROW(std::make_shared(), std::runtime_error); - BOOST_CHECK_THROW(deflect::Stream stream("mystream", ""), + BOOST_CHECK_THROW(deflect::Stream stream("mystream", "", serverPort()), std::runtime_error); } -BOOST_AUTO_TEST_CASE(testWhenSteamHostProvidedThenNoThrow) +BOOST_AUTO_TEST_CASE(testWhenStreamHostProvidedThenNoThrow) { - BOOST_CHECK_NO_THROW(deflect::Stream stream("mystream", "somehost")); - qputenv(STREAM_HOST_ENV_VAR, "somehost"); - BOOST_CHECK_NO_THROW(std::make_shared()); - BOOST_CHECK_NO_THROW(deflect::Stream stream("mystream", "")); + BOOST_CHECK_NO_THROW( + deflect::Stream stream("mystream", "localhost", serverPort())); + qputenv(STREAM_HOST_ENV_VAR, "localhost"); + BOOST_CHECK_NO_THROW( + std::make_shared("", "", serverPort())); + BOOST_CHECK_NO_THROW(deflect::Stream stream("mystream", "", serverPort())); qunsetenv(STREAM_HOST_ENV_VAR); } -BOOST_AUTO_TEST_CASE(testWhenNoSteamIdProvidedThenARandomOneIsGenerated) +BOOST_AUTO_TEST_CASE(testWhenNoStreamIdProvidedThenARandomOneIsGenerated) { BOOST_REQUIRE(QString(qgetenv(STREAM_ID_ENV_VAR)).isEmpty()); { - deflect::Stream stream("", "somehost"); + deflect::Stream stream("", "localhost", serverPort()); BOOST_CHECK(!stream.getId().empty()); - BOOST_CHECK_NE(deflect::Stream("", "host").getId(), - deflect::Stream("", "host").getId()); + BOOST_CHECK_NE(deflect::Stream("", "localhost", serverPort()).getId(), + deflect::Stream("", "localhost", serverPort()).getId()); } { - qputenv(STREAM_HOST_ENV_VAR, "somehost"); - deflect::Stream stream; + qputenv(STREAM_HOST_ENV_VAR, "localhost"); + deflect::Stream stream("", "", serverPort()); BOOST_CHECK(!stream.getId().empty()); - BOOST_CHECK_NE(deflect::Stream().getId(), deflect::Stream().getId()); + BOOST_CHECK_NE(deflect::Stream("", "", serverPort()).getId(), + deflect::Stream("", "", serverPort()).getId()); qunsetenv(STREAM_HOST_ENV_VAR); } } + +// Note: the following send tests only work as the small segment send does not +// need the sendWorker thread to be present. So fortunate for us we can skip +// setting up a stream server. +BOOST_AUTO_TEST_CASE(testSendUncompressedRGBA) +{ + deflect::Stream stream("id", "localhost", serverPort()); + std::vector pixels(4 * 4 * 4); + + deflect::ImageWrapper image(pixels.data(), 4, 4, deflect::RGBA); + image.compressionPolicy = deflect::COMPRESSION_OFF; + BOOST_CHECK(stream.send(image).get()); +} + +BOOST_AUTO_TEST_CASE(testErrorOnUnsupportedUncompressedFormats) +{ + deflect::Stream stream("id", "localhost", serverPort()); + std::vector pixels(4 * 4 * 4); + + const auto allFormats = {deflect::RGB, deflect::ARGB, deflect::BGR, + deflect::BGRA, deflect::ABGR}; + for (const auto format : allFormats) + { + deflect::ImageWrapper image(pixels.data(), 4, 4, format); + image.compressionPolicy = deflect::COMPRESSION_OFF; + BOOST_CHECK_THROW(stream.send(image).get(), std::invalid_argument); + } +} + +BOOST_AUTO_TEST_CASE(testSuccessOnCompressedFormats) +{ + deflect::Stream stream("id", "localhost", serverPort()); + std::vector pixels(4 * 4 * 4); + + std::vector unsupportedUncompressedFormats = { + deflect::RGB, deflect::ARGB, deflect::BGR, deflect::BGRA, + deflect::ABGR}; + for (const auto format : unsupportedUncompressedFormats) + { + deflect::ImageWrapper image(pixels.data(), 4, 4, format); + image.compressionPolicy = deflect::COMPRESSION_ON; + BOOST_CHECK(stream.send(image).get()); + } +} + +BOOST_AUTO_TEST_CASE(testErrorOnNullUncompressedImage) +{ + deflect::Stream stream("id", "localhost", serverPort()); + deflect::ImageWrapper nullImage(nullptr, 4, 4, deflect::ARGB); + nullImage.compressionPolicy = deflect::COMPRESSION_ON; + BOOST_CHECK_THROW(stream.send(nullImage).get(), std::invalid_argument); +} + +BOOST_AUTO_TEST_CASE(testErrorOnInvalidJpegCompressionValues) +{ + deflect::Stream stream("id", "localhost", serverPort()); + std::vector pixels(4 * 4 * 4); + deflect::ImageWrapper imageWrapper(pixels.data(), 4, 4, deflect::ARGB); + imageWrapper.compressionPolicy = deflect::COMPRESSION_ON; + imageWrapper.compressionQuality = 0; + BOOST_CHECK_THROW(stream.send(imageWrapper).get(), std::invalid_argument); + + imageWrapper.compressionQuality = 101; + BOOST_CHECK_THROW(stream.send(imageWrapper).get(), std::invalid_argument); + + imageWrapper.compressionQuality = 1; + BOOST_CHECK(stream.send(imageWrapper).get()); + + imageWrapper.compressionQuality = 75; + BOOST_CHECK(stream.send(imageWrapper).get()); + + imageWrapper.compressionQuality = 100; + BOOST_CHECK(stream.send(imageWrapper).get()); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/tests/mock/CMakeLists.txt b/tests/mock/CMakeLists.txt index c4d773e..2807e8f 100644 --- a/tests/mock/CMakeLists.txt +++ b/tests/mock/CMakeLists.txt @@ -6,16 +6,21 @@ # Generates a mock library used by the cpp unit tests. set(DEFLECTMOCK_HEADERS + DeflectServer.h MinimalGlobalQtApp.h + MinimalDeflectServer.h MockServer.h Timer.h ) set(DEFLECTMOCK_SOURCES + DeflectServer.cpp + MinimalDeflectServer.cpp MockServer.cpp ) -set(DEFLECTMOCK_LINK_LIBRARIES Qt5::Core Qt5::Network) +set(DEFLECTMOCK_LINK_LIBRARIES Deflect ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY} + Qt5::Core Qt5::Network) set(DEFLECTMOCK_INCLUDE_NAME deflect/mock) set(DEFLECTMOCK_OMIT_LIBRARY_HEADER ON) diff --git a/tests/mock/DeflectServer.cpp b/tests/mock/DeflectServer.cpp new file mode 100644 index 0000000..3b7ce76 --- /dev/null +++ b/tests/mock/DeflectServer.cpp @@ -0,0 +1,142 @@ +/*********************************************************************/ +/* Copyright (c) 2017, EPFL/Blue Brain Project */ +/* Daniel.Nachbaur@epfl.ch */ +/* All rights reserved. */ +/* */ +/* Redistribution and use in source and binary forms, with or */ +/* without modification, are permitted provided that the following */ +/* conditions are met: */ +/* */ +/* 1. Redistributions of source code must retain the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer. */ +/* */ +/* 2. Redistributions in binary form must reproduce the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer in the documentation and/or other materials */ +/* provided with the distribution. */ +/* */ +/* THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, */ +/* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF */ +/* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE */ +/* DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, */ +/* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES */ +/* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE */ +/* GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR */ +/* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF */ +/* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT */ +/* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT */ +/* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE */ +/* POSSIBILITY OF SUCH DAMAGE. */ +/* */ +/* The views and conclusions contained in the software and */ +/* documentation are those of the authors and should not be */ +/* interpreted as representing official policies, either expressed */ +/* or implied, of The University of Texas at Austin. */ +/*********************************************************************/ + +#include "DeflectServer.h" + +#include + +DeflectServer::DeflectServer() +{ + _server = new deflect::Server(0 /* OS-chosen port */); + _server->moveToThread(&_thread); + _thread.connect(&_thread, &QThread::finished, _server, + &deflect::Server::deleteLater); + _thread.start(); + + _server->connect(_server, &deflect::Server::pixelStreamOpened, + [&](const QString) { + ++_openedStreams; + _mutex.lock(); + _receivedState = true; + _received.wakeAll(); + _mutex.unlock(); + }); + + _server->connect(_server, &deflect::Server::pixelStreamClosed, + [&](const QString) { + --_openedStreams; + _mutex.lock(); + _receivedState = true; + _received.wakeAll(); + _mutex.unlock(); + }); + + _server->connect(_server, &deflect::Server::receivedSizeHints, + [&](const QString id, const deflect::SizeHints hints) { + if (_sizeHintsCallback) + _sizeHintsCallback(id, hints); + _mutex.lock(); + _receivedState = true; + _received.wakeAll(); + _mutex.unlock(); + }); + + _server->connect(_server, &deflect::Server::receivedData, + [&](const QString id, QByteArray data) { + if (_dataReceivedCallback) + _dataReceivedCallback(id, data); + _mutex.lock(); + _receivedState = true; + _received.wakeAll(); + _mutex.unlock(); + }); + + _server->connect(_server, &deflect::Server::receivedFrame, + [&](deflect::FramePtr frame) { + if (_frameReceivedCallback) + _frameReceivedCallback(frame); + ++_receivedFrames; + _mutex.lock(); + _receivedState = true; + _received.wakeAll(); + _mutex.unlock(); + }); + + _server->connect(_server, &deflect::Server::registerToEvents, + [&](const QString id, const bool exclusive, + deflect::EventReceiver* evtReceiver, + deflect::BoolPromisePtr success) { + + if (_registerToEventsCallback) + _registerToEventsCallback(id, exclusive, + evtReceiver); + + _eventReceiver = evtReceiver; + success->set_value(true); + }); +} + +DeflectServer::~DeflectServer() +{ + _thread.quit(); + _thread.wait(); +} + +void DeflectServer::waitForMessage() +{ + for (size_t j = 0; j < 20; ++j) + { + _mutex.lock(); + _received.wait(&_mutex, 100 /*ms*/); + if (_receivedState) + { + _mutex.unlock(); + break; + } + _mutex.unlock(); + } + BOOST_CHECK(_receivedState); + _receivedState = false; +} + +void DeflectServer::processEvent(const deflect::Event& event) +{ + BOOST_REQUIRE(_eventReceiver); + _eventReceiver->processEvent(event); +} diff --git a/tests/mock/DeflectServer.h b/tests/mock/DeflectServer.h new file mode 100644 index 0000000..62ef3ab --- /dev/null +++ b/tests/mock/DeflectServer.h @@ -0,0 +1,113 @@ +/*********************************************************************/ +/* Copyright (c) 2017, EPFL/Blue Brain Project */ +/* Daniel.Nachbaur@epfl.ch */ +/* All rights reserved. */ +/* */ +/* Redistribution and use in source and binary forms, with or */ +/* without modification, are permitted provided that the following */ +/* conditions are met: */ +/* */ +/* 1. Redistributions of source code must retain the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer. */ +/* */ +/* 2. Redistributions in binary form must reproduce the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer in the documentation and/or other materials */ +/* provided with the distribution. */ +/* */ +/* THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, */ +/* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF */ +/* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE */ +/* DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, */ +/* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES */ +/* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE */ +/* GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR */ +/* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF */ +/* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT */ +/* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT */ +/* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE */ +/* POSSIBILITY OF SUCH DAMAGE. */ +/* */ +/* The views and conclusions contained in the software and */ +/* documentation are those of the authors and should not be */ +/* interpreted as representing official policies, either expressed */ +/* or implied, of The University of Texas at Austin. */ +/*********************************************************************/ + +#ifndef DEFLECT_DEFLECTSERVER_H +#define DEFLECT_DEFLECTSERVER_H + +#ifdef _WIN32 +typedef __int32 int32_t; +#endif + +#include +#include +#include + +#include +#include + +class DeflectServer +{ +public: + explicit DeflectServer(); + ~DeflectServer(); + + quint16 serverPort() const { return _server->serverPort(); } + void requestFrame(QString uri) { _server->requestFrame(uri); } + void waitForMessage(); + + size_t getReceivedFrames() const { return _receivedFrames; } + size_t getOpenedStreams() const { return _openedStreams; } + using SizeHintsCallback = + std::function; + void setSizeHintsCallback(const SizeHintsCallback& callback) + { + _sizeHintsCallback = callback; + } + + using RegisterToEventsCallback = + std::function; + void setRegisterToEventsCallback(const RegisterToEventsCallback& callback) + { + _registerToEventsCallback = callback; + } + + using DataReceivedCallback = std::function; + void setDataReceivedCallback(const DataReceivedCallback& callback) + { + _dataReceivedCallback = callback; + } + + using FrameReceivedCallback = std::function; + void setFrameReceivedCallback(const FrameReceivedCallback& callback) + { + _frameReceivedCallback = callback; + } + + void processEvent(const deflect::Event& event); + +private: + QThread _thread; + deflect::Server* _server; + + bool _receivedState{false}; + QWaitCondition _received; + QMutex _mutex; + + size_t _openedStreams{0}; + size_t _receivedFrames{0}; + + SizeHintsCallback _sizeHintsCallback; + RegisterToEventsCallback _registerToEventsCallback; + DataReceivedCallback _dataReceivedCallback; + FrameReceivedCallback _frameReceivedCallback; + + deflect::EventReceiver* _eventReceiver{nullptr}; +}; + +#endif diff --git a/tests/mock/MinimalDeflectServer.cpp b/tests/mock/MinimalDeflectServer.cpp new file mode 100644 index 0000000..6cf1b3d --- /dev/null +++ b/tests/mock/MinimalDeflectServer.cpp @@ -0,0 +1,57 @@ +/*********************************************************************/ +/* Copyright (c) 2017, EPFL/Blue Brain Project */ +/* Daniel.Nachbaur@epfl.ch */ +/* All rights reserved. */ +/* */ +/* Redistribution and use in source and binary forms, with or */ +/* without modification, are permitted provided that the following */ +/* conditions are met: */ +/* */ +/* 1. Redistributions of source code must retain the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer. */ +/* */ +/* 2. Redistributions in binary form must reproduce the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer in the documentation and/or other materials */ +/* provided with the distribution. */ +/* */ +/* THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, */ +/* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF */ +/* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE */ +/* DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, */ +/* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES */ +/* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE */ +/* GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR */ +/* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF */ +/* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT */ +/* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT */ +/* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE */ +/* POSSIBILITY OF SUCH DAMAGE. */ +/* */ +/* The views and conclusions contained in the software and */ +/* documentation are those of the authors and should not be */ +/* interpreted as representing official policies, either expressed */ +/* or implied, of The University of Texas at Austin. */ +/*********************************************************************/ + +#include "MinimalDeflectServer.h" + +#include + +MinimalDeflectServer::MinimalDeflectServer(const int32_t versionOffset) +{ + _server = new MockServer(NETWORK_PROTOCOL_VERSION + versionOffset); + _server->moveToThread(&_thread); + _server->connect(&_thread, &QThread::finished, _server, + &QObject::deleteLater); + _thread.start(); +} + +MinimalDeflectServer::~MinimalDeflectServer() +{ + _thread.quit(); + _thread.wait(); +} diff --git a/tests/mock/MinimalDeflectServer.h b/tests/mock/MinimalDeflectServer.h new file mode 100644 index 0000000..21470d0 --- /dev/null +++ b/tests/mock/MinimalDeflectServer.h @@ -0,0 +1,59 @@ +/*********************************************************************/ +/* Copyright (c) 2017, EPFL/Blue Brain Project */ +/* Daniel.Nachbaur@epfl.ch */ +/* All rights reserved. */ +/* */ +/* Redistribution and use in source and binary forms, with or */ +/* without modification, are permitted provided that the following */ +/* conditions are met: */ +/* */ +/* 1. Redistributions of source code must retain the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer. */ +/* */ +/* 2. Redistributions in binary form must reproduce the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer in the documentation and/or other materials */ +/* provided with the distribution. */ +/* */ +/* THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, */ +/* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF */ +/* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE */ +/* DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, */ +/* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES */ +/* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE */ +/* GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR */ +/* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF */ +/* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT */ +/* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT */ +/* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE */ +/* POSSIBILITY OF SUCH DAMAGE. */ +/* */ +/* The views and conclusions contained in the software and */ +/* documentation are those of the authors and should not be */ +/* interpreted as representing official policies, either expressed */ +/* or implied, of The University of Texas at Austin. */ +/*********************************************************************/ + +#ifndef DEFLECT_MINIMAL_DEFLECTSERVER_H +#define DEFLECT_MINIMAL_DEFLECTSERVER_H + +#include "MockServer.h" + +#include + +class MinimalDeflectServer +{ +public: + explicit MinimalDeflectServer(int32_t versionOffset = 0); + ~MinimalDeflectServer(); + + quint16 serverPort() const { return _server->serverPort(); } +private: + QThread _thread; + MockServer* _server; +}; + +#endif