Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions ACKNOWLEDGEMENTS.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
===========================================================================
moodycamel::ConcurrentQueue (https://github.com/cameron314/concurrentqueue)
===========================================================================

Simplified BSD License:

Copyright (c) 2013-2016, Cameron Desrochers.
All rights reserved.

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

- Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice, this list of
conditions and the following disclaimer in the documentation and/or other materials
provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.



3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ if(NOT LibJpegTurbo_FOUND)
list(APPEND COMMON_FIND_PACKAGE_DEFINES DEFLECT_USE_LEGACY_LIBJPEGTURBO)
endif()
common_find_package(OpenGL)
common_find_package(OpenMP)
common_find_package(Qt5Concurrent REQUIRED SYSTEM)
common_find_package(Qt5Core REQUIRED)
if(APPLE)
Expand All @@ -48,6 +49,8 @@ if(NOT Qt5Quick_VERSION VERSION_LESS 5.5)
option(DEFLECT_QMLSTREAMER_MULTITHREADED "Use multithreaded-rendering in QMLStreamer" ON)
endif()

set(LCOV_EXCLUDE "deflect/moodycamel/*")

add_subdirectory(deflect)
add_subdirectory(apps)
if(Boost_FOUND)
Expand Down
4 changes: 2 additions & 2 deletions deflect/ImageSegmenter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Segment ImageSegmenter::compressSingleSegment(const ImageWrapper& image)
if (segments.size() > 1)
throw std::runtime_error(
"compressSingleSegment only works for small images");
ImageSegmenter::_computeJpeg(segments[0], false);
_computeJpeg(segments[0], false);
return segments[0];
#else
throw std::runtime_error(
Expand Down Expand Up @@ -234,7 +234,7 @@ SegmentParametersList ImageSegmenter::_makeSegmentParameters(
p.y = image.y + j * info.height;
p.width = (i < info.countX - 1) ? info.width : info.lastWidth;
p.height = (j < info.countY - 1) ? info.height : info.lastHeight;
parameters.push_back(p);
parameters.emplace_back(p);
}
}
return parameters;
Expand Down
1 change: 1 addition & 0 deletions deflect/ImageSegmenter.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class ImageSegmenter
*
* @param image The image to be compressed
* @return the compressed segment
* @threadsafe
*/
DEFLECT_API Segment compressSingleSegment(const ImageWrapper& image);

Expand Down
4 changes: 1 addition & 3 deletions deflect/MessageHeader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ MessageHeader::MessageHeader(const MessageType type_, const uint32_t size_,
QDataStream& operator<<(QDataStream& out, const deflect::MessageHeader& header)
{
out << (qint32)header.type << (quint32)header.size;

for (size_t i = 0; i < MESSAGE_HEADER_URI_LENGTH; ++i)
out << (quint8)header.uri[i];
out.writeRawData(header.uri, MESSAGE_HEADER_URI_LENGTH);

return out;
}
Expand Down
20 changes: 13 additions & 7 deletions deflect/Socket.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/*********************************************************************/
/* Copyright (c) 2011 - 2012, The University of Texas at Austin. */
/* Copyright (c) 2015-2017, EPFL/Blue Brain Project */
/* Raphael Dumusc <raphael.dumusc@epfl.ch> */
/* Daniel Nachbaur <daniel.nachbaur@epfl.ch>*/
/* All rights reserved. */
/* */
/* Redistribution and use in source and binary forms, with or */
Expand Down Expand Up @@ -33,7 +35,7 @@
/* The views and conclusions contained in the software and */
/* documentation are those of the authors and should not be */
/* interpreted as representing official policies, either expressed */
/* or implied, of The University of Texas at Austin. */
/* or implied, of Ecole polytechnique federale de Lausanne. */
/*********************************************************************/

#include "Socket.h"
Expand Down Expand Up @@ -108,7 +110,8 @@ bool Socket::hasMessage(const size_t messageSize) const
(int)(MessageHeader::serializedSize + messageSize);
}

bool Socket::send(const MessageHeader& messageHeader, const QByteArray& message)
bool Socket::send(const MessageHeader& messageHeader, const QByteArray& message,
const bool waitForBytesWritten)
{
QMutexLocker locker(&_socketMutex);
if (!isConnected())
Expand All @@ -123,10 +126,13 @@ bool Socket::send(const MessageHeader& messageHeader, const QByteArray& message)
// send message
const bool allSent = _write(message);

// Needed in the absence of event loop, otherwise the reception is frozen.
while (_socket->bytesToWrite() > 0 && isConnected())
_socket->waitForBytesWritten();

if (waitForBytesWritten)
{
// Needed in the absence of event loop, otherwise the reception is
// frozen.
while (_socket->bytesToWrite() > 0 && isConnected())
_socket->waitForBytesWritten();
}
return allSent;
}

Expand Down
12 changes: 9 additions & 3 deletions deflect/Socket.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/*********************************************************************/
/* Copyright (c) 2011 - 2012, The University of Texas at Austin. */
/* Copyright (c) 2015-2017, EPFL/Blue Brain Project */
/* Raphael Dumusc <raphael.dumusc@epfl.ch> */
/* Daniel Nachbaur <daniel.nachbaur@epfl.ch>*/
/* All rights reserved. */
/* */
/* Redistribution and use in source and binary forms, with or */
Expand Down Expand Up @@ -33,7 +35,7 @@
/* The views and conclusions contained in the software and */
/* documentation are those of the authors and should not be */
/* interpreted as representing official policies, either expressed */
/* or implied, of The University of Texas at Austin. */
/* or implied, of Ecole polytechnique federale de Lausanne. */
/*********************************************************************/

#ifndef DEFLECT_SOCKET_H
Expand Down Expand Up @@ -103,9 +105,13 @@ class Socket : public QObject
* Send a message.
* @param messageHeader The message header
* @param message The message data
* @param waitForBytesWritten wait until the message is completely send; in
* case of multiple sends per frame it is advised to do this only
* once per frame
* @return true if the message could be sent, false otherwise
*/
bool send(const MessageHeader& messageHeader, const QByteArray& message);
bool send(const MessageHeader& messageHeader, const QByteArray& message,
bool waitForBytesWritten);

/**
* Receive a message.
Expand Down
104 changes: 71 additions & 33 deletions deflect/StreamSendWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace deflect
StreamSendWorker::StreamSendWorker(Socket& socket, const std::string& id)
: _socket(socket)
, _id(id)
, _dequeuedRequests(std::thread::hardware_concurrency() / 2)
{
_imageSegmenter.setNominalSegmentDimensions(SEGMENT_SIZE, SEGMENT_SIZE);
}
Expand All @@ -71,62 +72,93 @@ void StreamSendWorker::run()
_running = true;
while (true)
{
// Copy request, unlock enqueue methods during processing of tasks
std::unique_lock<std::mutex> lock(_mutex);
while (_requests.empty() && _running)
_condition.wait(lock);

if (!_running)
break;

const auto request = std::move(_requests.front());
_requests.pop_front();
lock.unlock();
size_t count = 0;
if (!_pendingFinish)
count = _requests.wait_dequeue_bulk(_dequeuedRequests.begin(),
_dequeuedRequests.size());
else
{
// in case we encountered a finish request, get all remaining send
// requests w/o waiting
count = _requests.try_dequeue_bulk(_dequeuedRequests.begin(),
_dequeuedRequests.size());

// no more pending sends, now process the finish request and reset
// for next finish
if (count == 0)
{
count = 1;
_finishRequest.isFinish = false; // reset this to process this
// request now
_dequeuedRequests[0] = _finishRequest;
_pendingFinish = false;
}
}

bool success = true;
for (auto& task : request.tasks)
for (size_t i = 0; i < count; ++i)
{
if (!task())
bool success = true;
auto& request = _dequeuedRequests[i];

// postpone a finish request to maintain order (as the lockfree
// does not guarantee order)
if (request.isFinish)
{
if (_pendingFinish)
throw std::runtime_error("Already have pending finish");

_finishRequest = request;
_pendingFinish = true;
continue;
}

for (auto& task : request.tasks)
{
success = false;
break;
if (!task())
{
success = false;
break;
}
}
if (request.promise)
request.promise->set_value(success);
}
request.promise->set_value(success);
}
}

void StreamSendWorker::stop()
{
{
std::lock_guard<std::mutex> lock(_mutex);
if (!_running)
return;
_running = false;
_condition.notify_all();
_enqueueRequest(std::vector<Task>());
}

quit();
wait();

while (!_requests.empty())
Request request;
while (_requests.try_dequeue(request))
{
_requests.front().promise->set_value(false);
_requests.pop_front();
if (request.promise)
request.promise->set_value(false);
}
}

Stream::Future StreamSendWorker::enqueueImage(const ImageWrapper& image,
const bool finish)
{
if (_pendingFinish)
throw(std::runtime_error{"Pending finish, no send allowed"});

if (image.compressionPolicy != COMPRESSION_ON && image.pixelFormat != RGBA)
{
std::cerr << "Currently, RAW images can only be sent in RGBA format. "
"Other formats support remain to be implemented."
<< std::endl;
std::promise<bool> promise;
promise.set_value(false);
return promise.get_future();
return make_ready_future(false);
}

std::vector<Task> tasks;
Expand All @@ -136,6 +168,12 @@ Stream::Future StreamSendWorker::enqueueImage(const ImageWrapper& image,
{
auto segment = _imageSegmenter.compressSingleSegment(image);
tasks.emplace_back([this, segment] { return _sendSegment(segment); });

// as we expect to encounter a lot of these small sends, be optimistic
// and fulfill the promise already to reduce load in the send thread
// (c.f. lock ops performance on KNL)
_requests.enqueue({nullptr, tasks, false});
return make_ready_future(true);
}
else
tasks.emplace_back([this, image] { return _sendImage(image); });
Expand All @@ -148,7 +186,7 @@ Stream::Future StreamSendWorker::enqueueImage(const ImageWrapper& image,

Stream::Future StreamSendWorker::enqueueFinish()
{
return _enqueueRequest({[this] { return _sendFinish(); }});
return _enqueueRequest({[this] { return _sendFinish(); }}, true);
}

Stream::Future StreamSendWorker::enqueueOpen()
Expand Down Expand Up @@ -195,13 +233,11 @@ Stream::Future StreamSendWorker::enqueueData(const QByteArray data)
{[this, data] { return _send(MESSAGE_TYPE_DATA, data); }});
}

Stream::Future StreamSendWorker::_enqueueRequest(std::vector<Task>&& tasks)
Stream::Future StreamSendWorker::_enqueueRequest(std::vector<Task>&& tasks,
const bool isFinish)
{
PromisePtr promise(new Promise);

std::lock_guard<std::mutex> lock(_mutex);
_requests.push_back({promise, tasks});
_condition.notify_all();
_requests.enqueue({promise, tasks, isFinish});
return promise->get_future();
}

Expand Down Expand Up @@ -230,16 +266,18 @@ bool StreamSendWorker::_sendSegment(const Segment& segment)
auto message = QByteArray{(const char*)(&segment.parameters),
sizeof(SegmentParameters)};
message.append(segment.imageData);
return _send(MESSAGE_TYPE_PIXELSTREAM, message);
return _send(MESSAGE_TYPE_PIXELSTREAM, message, false);
}

bool StreamSendWorker::_sendFinish()
{
return _send(MESSAGE_TYPE_PIXELSTREAM_FINISH_FRAME, {});
}

bool StreamSendWorker::_send(const MessageType type, const QByteArray& message)
bool StreamSendWorker::_send(const MessageType type, const QByteArray& message,
const bool waitForBytesWritten)
{
return _socket.send(MessageHeader(type, message.size(), _id), message);
return _socket.send(MessageHeader(type, message.size(), _id), message,
waitForBytesWritten);
}
}
Loading