From c77eb9256e87b97f76e8a57a5b586c9933b42613 Mon Sep 17 00:00:00 2001 From: Raphael Dumusc Date: Tue, 22 May 2018 18:22:57 +0200 Subject: [PATCH] Fix unreliable server-side stream close, cleanup FrameDispatcher --- CMakeLists.txt | 2 +- deflect/server/FrameDispatcher.cpp | 76 ++++++++++++++++-------------- deflect/server/ServerWorker.cpp | 12 +++-- deflect/server/ServerWorker.h | 1 + doc/Changelog.md | 4 ++ 5 files changed, 55 insertions(+), 40 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4d8cd37..5a8e16a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,7 @@ # Daniel Nachbaur cmake_minimum_required(VERSION 3.1 FATAL_ERROR) -project(Deflect VERSION 1.0.0) +project(Deflect VERSION 1.0.1) set(Deflect_VERSION_ABI 7) list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/CMake/common) diff --git a/deflect/server/FrameDispatcher.cpp b/deflect/server/FrameDispatcher.cpp index e839266..2e3e298 100644 --- a/deflect/server/FrameDispatcher.cpp +++ b/deflect/server/FrameDispatcher.cpp @@ -43,7 +43,6 @@ #include "ReceiveBuffer.h" #include -#include namespace deflect { @@ -55,10 +54,10 @@ class FrameDispatcher::Impl Impl() {} FramePtr consumeLatestFrame(const QString& uri) { - FramePtr frame(new Frame); + auto frame = std::make_shared(); frame->uri = uri; - ReceiveBuffer& buffer = streamBuffers[uri]; + auto& buffer = streams[uri].buffer; while (buffer.hasCompleteFrame()) frame->tiles = buffer.popFrame(); @@ -81,9 +80,18 @@ class FrameDispatcher::Impl tile.y = height - tile.y - tile.height; } - typedef std::map StreamBuffers; - StreamBuffers streamBuffers; - std::map observers; + bool allConnectionsClosed(const QString& uri) const + { + const auto& stream = streams.at(uri); + return stream.buffer.getSourceCount() == 0 && stream.observers == 0; + } + + struct Stream + { + ReceiveBuffer buffer; + size_t observers = 0; + }; + std::map streams; }; FrameDispatcher::FrameDispatcher(QObject* parent_) @@ -98,60 +106,60 @@ FrameDispatcher::~FrameDispatcher() void FrameDispatcher::addSource(const QString uri, const size_t sourceIndex) { - _impl->streamBuffers[uri].addSource(sourceIndex); + auto& stream = _impl->streams[uri]; - if (_impl->streamBuffers[uri].getSourceCount() == 1 && - _impl->observers[uri] == 0) - { + stream.buffer.addSource(sourceIndex); + if (stream.observers == 0 && stream.buffer.getSourceCount() == 1) emit pixelStreamOpened(uri); - } } void FrameDispatcher::removeSource(const QString uri, const size_t sourceIndex) { - if (!_impl->streamBuffers.count(uri)) + if (!_impl->streams.count(uri)) return; - _impl->streamBuffers[uri].removeSource(sourceIndex); + _impl->streams[uri].buffer.removeSource(sourceIndex); - deleteStream(uri); + if (_impl->allConnectionsClosed(uri)) + deleteStream(uri); } void FrameDispatcher::addObserver(const QString uri) { - ++_impl->observers[uri]; + auto& stream = _impl->streams[uri]; - if (_impl->observers[uri] == 1 && - (!_impl->streamBuffers.count(uri) || - _impl->streamBuffers[uri].getSourceCount() == 0)) - { - _impl->streamBuffers.emplace(uri, ReceiveBuffer()); + ++stream.observers; + if (stream.observers == 1 && stream.buffer.getSourceCount() == 0) emit pixelStreamOpened(uri); - } } void FrameDispatcher::removeObserver(QString uri) { - if (_impl->observers[uri] > 0) - --_impl->observers[uri]; + if (!_impl->streams.count(uri)) + return; - deleteStream(uri); + auto& stream = _impl->streams[uri]; + if (stream.observers > 0) + --stream.observers; + + if (_impl->allConnectionsClosed(uri)) + deleteStream(uri); } void FrameDispatcher::processTile(const QString uri, const size_t sourceIndex, deflect::server::Tile tile) { - if (_impl->streamBuffers.count(uri)) - _impl->streamBuffers[uri].insert(tile, sourceIndex); + if (_impl->streams.count(uri)) + _impl->streams[uri].buffer.insert(tile, sourceIndex); } void FrameDispatcher::processFrameFinished(const QString uri, const size_t sourceIndex) { - if (!_impl->streamBuffers.count(uri)) + if (!_impl->streams.count(uri)) return; - ReceiveBuffer& buffer = _impl->streamBuffers[uri]; + auto& buffer = _impl->streams[uri].buffer; try { buffer.finishFrameForSource(sourceIndex); @@ -166,10 +174,10 @@ void FrameDispatcher::processFrameFinished(const QString uri, void FrameDispatcher::requestFrame(const QString uri) { - if (!_impl->streamBuffers.count(uri)) + if (!_impl->streams.count(uri)) return; - ReceiveBuffer& buffer = _impl->streamBuffers[uri]; + auto& buffer = _impl->streams[uri].buffer; buffer.setAllowedToSend(true); try { @@ -184,12 +192,8 @@ void FrameDispatcher::requestFrame(const QString uri) void FrameDispatcher::deleteStream(const QString uri) { - if (_impl->streamBuffers[uri].getSourceCount() == 0 && - _impl->streamBuffers.count(uri) && _impl->observers[uri] == 0) - { - _impl->streamBuffers.erase(uri); - emit pixelStreamClosed(uri); - } + _impl->streams.erase(uri); + emit pixelStreamClosed(uri); } } } diff --git a/deflect/server/ServerWorker.cpp b/deflect/server/ServerWorker.cpp index 9977c8d..f762129 100644 --- a/deflect/server/ServerWorker.cpp +++ b/deflect/server/ServerWorker.cpp @@ -116,9 +116,8 @@ void ServerWorker::closeConnection(const QString uri) if (uri != _streamId) return; - Event closeEvent; - closeEvent.type = Event::EVT_CLOSE; - _send(closeEvent); + if (_registeredToEvents) + _sendCloseEvent(); emit(connectionClosed()); } @@ -364,6 +363,13 @@ void ServerWorker::_send(const Event& evt) _flushSocket(); } +void ServerWorker::_sendCloseEvent() +{ + Event closeEvent; + closeEvent.type = Event::EVT_CLOSE; + _send(closeEvent); +} + void ServerWorker::_sendQuit() { MessageHeader mh(MESSAGE_TYPE_QUIT, 0); diff --git a/deflect/server/ServerWorker.h b/deflect/server/ServerWorker.h index db67cdf..ee009d3 100644 --- a/deflect/server/ServerWorker.h +++ b/deflect/server/ServerWorker.h @@ -122,6 +122,7 @@ private slots: void _sendProtocolVersion(); void _sendBindReply(bool successful); void _send(const Event& evt); + void _sendCloseEvent(); void _sendQuit(); bool _send(const MessageHeader& messageHeader); void _flushSocket(); diff --git a/doc/Changelog.md b/doc/Changelog.md index da74efa..19ac80b 100644 --- a/doc/Changelog.md +++ b/doc/Changelog.md @@ -3,6 +3,10 @@ Changelog {#Changelog} ## Deflect 1.0 +### 1.0.1 (master) +* [198](https://github.com/BlueBrain/Deflect/pull/198): + Fix unreliable server-side stream close. + ### 1.0.0 (17-05-2018) * [196](https://github.com/BlueBrain/Deflect/pull/196): Multi-channel support for streaming multiple synchronized views of a scene.