Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Daniel Nachbaur <daniel.nachbaur@epfl.ch>

cmake_minimum_required(VERSION 3.1 FATAL_ERROR)
project(Deflect VERSION 1.0.0)
project(Deflect VERSION 1.0.1)
set(Deflect_VERSION_ABI 7)

list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/CMake/common)
Expand Down
76 changes: 40 additions & 36 deletions deflect/server/FrameDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include "ReceiveBuffer.h"

#include <cassert>
#include <iostream>

namespace deflect
{
Expand All @@ -55,10 +54,10 @@ class FrameDispatcher::Impl
Impl() {}
FramePtr consumeLatestFrame(const QString& uri)
{
FramePtr frame(new Frame);
auto frame = std::make_shared<Frame>();
frame->uri = uri;

ReceiveBuffer& buffer = streamBuffers[uri];
auto& buffer = streams[uri].buffer;

while (buffer.hasCompleteFrame())
frame->tiles = buffer.popFrame();
Expand All @@ -81,9 +80,18 @@ class FrameDispatcher::Impl
tile.y = height - tile.y - tile.height;
}

typedef std::map<QString, ReceiveBuffer> StreamBuffers;
StreamBuffers streamBuffers;
std::map<QString, size_t> observers;
bool allConnectionsClosed(const QString& uri) const
{
const auto& stream = streams.at(uri);
return stream.buffer.getSourceCount() == 0 && stream.observers == 0;
}

struct Stream
{
ReceiveBuffer buffer;
size_t observers = 0;
};
std::map<QString, Stream> streams;
};

FrameDispatcher::FrameDispatcher(QObject* parent_)
Expand All @@ -98,60 +106,60 @@ FrameDispatcher::~FrameDispatcher()

void FrameDispatcher::addSource(const QString uri, const size_t sourceIndex)
{
_impl->streamBuffers[uri].addSource(sourceIndex);
auto& stream = _impl->streams[uri];

if (_impl->streamBuffers[uri].getSourceCount() == 1 &&
_impl->observers[uri] == 0)
{
stream.buffer.addSource(sourceIndex);
if (stream.observers == 0 && stream.buffer.getSourceCount() == 1)
emit pixelStreamOpened(uri);
}
}

void FrameDispatcher::removeSource(const QString uri, const size_t sourceIndex)
{
if (!_impl->streamBuffers.count(uri))
if (!_impl->streams.count(uri))
return;

_impl->streamBuffers[uri].removeSource(sourceIndex);
_impl->streams[uri].buffer.removeSource(sourceIndex);

deleteStream(uri);
if (_impl->allConnectionsClosed(uri))
deleteStream(uri);
}

void FrameDispatcher::addObserver(const QString uri)
{
++_impl->observers[uri];
auto& stream = _impl->streams[uri];

if (_impl->observers[uri] == 1 &&
(!_impl->streamBuffers.count(uri) ||
_impl->streamBuffers[uri].getSourceCount() == 0))
{
_impl->streamBuffers.emplace(uri, ReceiveBuffer());
++stream.observers;
if (stream.observers == 1 && stream.buffer.getSourceCount() == 0)
emit pixelStreamOpened(uri);
}
}

void FrameDispatcher::removeObserver(QString uri)
{
if (_impl->observers[uri] > 0)
--_impl->observers[uri];
if (!_impl->streams.count(uri))
return;

deleteStream(uri);
auto& stream = _impl->streams[uri];
if (stream.observers > 0)
--stream.observers;

if (_impl->allConnectionsClosed(uri))
deleteStream(uri);
}

void FrameDispatcher::processTile(const QString uri, const size_t sourceIndex,
deflect::server::Tile tile)
{
if (_impl->streamBuffers.count(uri))
_impl->streamBuffers[uri].insert(tile, sourceIndex);
if (_impl->streams.count(uri))
_impl->streams[uri].buffer.insert(tile, sourceIndex);
}

void FrameDispatcher::processFrameFinished(const QString uri,
const size_t sourceIndex)
{
if (!_impl->streamBuffers.count(uri))
if (!_impl->streams.count(uri))
return;

ReceiveBuffer& buffer = _impl->streamBuffers[uri];
auto& buffer = _impl->streams[uri].buffer;
try
{
buffer.finishFrameForSource(sourceIndex);
Expand All @@ -166,10 +174,10 @@ void FrameDispatcher::processFrameFinished(const QString uri,

void FrameDispatcher::requestFrame(const QString uri)
{
if (!_impl->streamBuffers.count(uri))
if (!_impl->streams.count(uri))
return;

ReceiveBuffer& buffer = _impl->streamBuffers[uri];
auto& buffer = _impl->streams[uri].buffer;
buffer.setAllowedToSend(true);
try
{
Expand All @@ -184,12 +192,8 @@ void FrameDispatcher::requestFrame(const QString uri)

void FrameDispatcher::deleteStream(const QString uri)
{
if (_impl->streamBuffers[uri].getSourceCount() == 0 &&
_impl->streamBuffers.count(uri) && _impl->observers[uri] == 0)
{
_impl->streamBuffers.erase(uri);
emit pixelStreamClosed(uri);
}
_impl->streams.erase(uri);
emit pixelStreamClosed(uri);
}
}
}
12 changes: 9 additions & 3 deletions deflect/server/ServerWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,8 @@ void ServerWorker::closeConnection(const QString uri)
if (uri != _streamId)
return;

Event closeEvent;
closeEvent.type = Event::EVT_CLOSE;
_send(closeEvent);
if (_registeredToEvents)
_sendCloseEvent();

emit(connectionClosed());
}
Expand Down Expand Up @@ -364,6 +363,13 @@ void ServerWorker::_send(const Event& evt)
_flushSocket();
}

void ServerWorker::_sendCloseEvent()
{
Event closeEvent;
closeEvent.type = Event::EVT_CLOSE;
_send(closeEvent);
}

void ServerWorker::_sendQuit()
{
MessageHeader mh(MESSAGE_TYPE_QUIT, 0);
Expand Down
1 change: 1 addition & 0 deletions deflect/server/ServerWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private slots:
void _sendProtocolVersion();
void _sendBindReply(bool successful);
void _send(const Event& evt);
void _sendCloseEvent();
void _sendQuit();
bool _send(const MessageHeader& messageHeader);
void _flushSocket();
Expand Down
4 changes: 4 additions & 0 deletions doc/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ Changelog {#Changelog}

## Deflect 1.0

### 1.0.1 (master)
* [198](https://github.com/BlueBrain/Deflect/pull/198):
Fix unreliable server-side stream close.

### 1.0.0 (17-05-2018)
* [196](https://github.com/BlueBrain/Deflect/pull/196):
Multi-channel support for streaming multiple synchronized views of a scene.
Expand Down