diff --git a/deflect/ImageJpegDecompressor.cpp b/deflect/ImageJpegDecompressor.cpp index 3273cba..57d8b7d 100644 --- a/deflect/ImageJpegDecompressor.cpp +++ b/deflect/ImageJpegDecompressor.cpp @@ -62,10 +62,7 @@ QByteArray ImageJpegDecompressor::decompress( const QByteArray& jpegData ) (unsigned long)jpegData.size(), &width, &height, &jpegSubsamp ); if( err != 0 ) - { - std::cerr << "libjpeg-turbo header decompression failure" << std::endl; - return QByteArray(); - } + throw std::runtime_error( "libjpeg-turbo header decompression failed" ); // decompress image data int pixelFormat = TJPF_RGBX; // Format for OpenGL texture (GL_RGBA) @@ -78,10 +75,7 @@ QByteArray ImageJpegDecompressor::decompress( const QByteArray& jpegData ) (unsigned char*)decodedData.data(), width, pitch, height, pixelFormat, flags ); if( err != 0 ) - { - std::cerr << "libjpeg-turbo image decompression failure" << std::endl; - return QByteArray(); - } + throw std::runtime_error( "libjpeg-turbo image decompression failed" ); return decodedData; } diff --git a/deflect/ImageJpegDecompressor.h b/deflect/ImageJpegDecompressor.h index 8b33f63..e71d71a 100644 --- a/deflect/ImageJpegDecompressor.h +++ b/deflect/ImageJpegDecompressor.h @@ -62,8 +62,8 @@ class ImageJpegDecompressor * Decompress a Jpeg image * * @param jpegData The compressed Jpeg data - * @return The decompressed image data in (GL_)RGBA format, or an - * empty array if the image could not be decoded. + * @return The decompressed image data in (GL_)RGBA format + * @throw std::runtime_error if a decompression error occured */ DEFLECT_API QByteArray decompress( const QByteArray& jpegData ); diff --git a/deflect/SegmentDecoder.cpp b/deflect/SegmentDecoder.cpp index fc424ba..379af62 100644 --- a/deflect/SegmentDecoder.cpp +++ b/deflect/SegmentDecoder.cpp @@ -1,6 +1,6 @@ /*********************************************************************/ -/* Copyright (c) 2013, EPFL/Blue Brain Project */ -/* Raphael Dumusc */ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ +/* Raphael Dumusc */ /* All rights reserved. */ /* */ /* Redistribution and use in source and binary forms, with or */ @@ -69,20 +69,31 @@ SegmentDecoder::SegmentDecoder() SegmentDecoder::~SegmentDecoder() {} -void decodeSegment( ImageJpegDecompressor* decompressor, Segment* segment ) +void _decodeSegment( ImageJpegDecompressor* decompressor, Segment* segment ) { - QByteArray decodedData = decompressor->decompress( segment->imageData ); + if( !segment->parameters.compressed ) + return; - if( !decodedData.isEmpty( )) + QByteArray decodedData; + try + { + decodedData = decompressor->decompress( segment->imageData ); + } + catch( const std::runtime_error& ) { - segment->imageData = decodedData; - segment->parameters.compressed = false; + throw; } + const auto& params = segment->parameters; + if( (size_t)decodedData.size() != params.height * params.width * 4 ) + throw std::runtime_error( "unexpected segment size" ); + + segment->imageData = decodedData; + segment->parameters.compressed = false; } void SegmentDecoder::decode( Segment& segment ) { - decodeSegment( &_impl->decompressor, &segment ); + _decodeSegment( &_impl->decompressor, &segment ); } void SegmentDecoder::startDecoding( Segment& segment ) @@ -95,14 +106,24 @@ void SegmentDecoder::startDecoding( Segment& segment ) return; } - _impl->decodingFuture = QtConcurrent::run( decodeSegment, + _impl->decodingFuture = QtConcurrent::run( _decodeSegment, &_impl->decompressor, &segment ); } void SegmentDecoder::waitDecoding() { - _impl->decodingFuture.waitForFinished(); + try + { + _impl->decodingFuture.waitForFinished(); + } + catch( const QUnhandledException& ) + { + // Let Qt throws a QUnhandledException and rewrite the error message. + // QtConcurrent::run can only forward QException subclasses, which does + // not even work on 5.7.1: https://bugreports.qt.io/browse/QTBUG-58021 + throw std::runtime_error( "Segment decoding failed" ); + } } bool SegmentDecoder::isRunning() const diff --git a/deflect/SegmentDecoder.h b/deflect/SegmentDecoder.h index 2de7d48..3fdac01 100644 --- a/deflect/SegmentDecoder.h +++ b/deflect/SegmentDecoder.h @@ -1,6 +1,6 @@ /*********************************************************************/ -/* Copyright (c) 2013, EPFL/Blue Brain Project */ -/* Raphael Dumusc */ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ +/* Raphael Dumusc */ /* All rights reserved. */ /* */ /* Redistribution and use in source and binary forms, with or */ @@ -64,6 +64,7 @@ class SegmentDecoder * @param segment The segment to decode. Upon success, its imageData member * will hold the decompressed data and its "compressed" flag will be * set to false. + * @throw std::runtime_error if a decompression error occured */ DEFLECT_API void decode( Segment& segment ); @@ -79,7 +80,11 @@ class SegmentDecoder */ DEFLECT_API void startDecoding( Segment& segment ); - /** Waits for the decoding of a segment to finish, initiated by startDecoding(). */ + /** + * Waits for the decoding of a segment to finish, initiated by + * startDecoding(). + * @throw std::runtime_error if a decompression error occured + */ DEFLECT_API void waitDecoding(); /** Check if the decoding thread is running. */ diff --git a/deflect/ServerWorker.cpp b/deflect/ServerWorker.cpp index e15acb9..fd759d4 100644 --- a/deflect/ServerWorker.cpp +++ b/deflect/ServerWorker.cpp @@ -1,5 +1,5 @@ /*********************************************************************/ -/* Copyright (c) 2013-2016, EPFL/Blue Brain Project */ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ /* Raphael Dumusc */ /* Daniel.Nachbaur@epfl.ch */ /* All rights reserved. */ @@ -47,7 +47,10 @@ #include -#define RECEIVE_TIMEOUT_MS 3000 +namespace +{ +const int RECEIVE_TIMEOUT_MS = 3000; +} namespace deflect { @@ -56,6 +59,7 @@ ServerWorker::ServerWorker( const int socketDescriptor ) // Ensure that tcpSocket_ parent is *this* so it gets moved to thread : _tcpSocket( new QTcpSocket( this )) , _sourceId( socketDescriptor ) + , _clientProtocolVersion( NETWORK_PROTOCOL_VERSION ) , _registeredToEvents( false ) { if( !_tcpSocket->setSocketDescriptor( socketDescriptor )) @@ -223,6 +227,9 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader, return; } _streamId = uri; + // The version is only sent by deflect clients since v. 0.13.0 + if( !byteArray.isEmpty( )) + _parseClientProtocolVersion( byteArray ); emit addStreamSource( _streamId, _sourceId ); break; @@ -263,17 +270,22 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader, } } -void ServerWorker::_handlePixelStreamMessage( const QByteArray& byteArray ) +void ServerWorker::_parseClientProtocolVersion( const QByteArray& message ) { - const SegmentParameters* parameters = - reinterpret_cast< const SegmentParameters* >( byteArray.data( )); + bool ok = false; + const int version = message.toInt( &ok ); + if( ok ) + _clientProtocolVersion = version; +} +void ServerWorker::_handlePixelStreamMessage( const QByteArray& message ) +{ Segment segment; - segment.parameters = *parameters; - QByteArray imageData = - byteArray.right( byteArray.size() - sizeof( SegmentParameters )); - segment.imageData = imageData; + const auto data = message.data(); + segment.parameters = *reinterpret_cast( data ); + segment.imageData = message.right( message.size() - + sizeof( SegmentParameters )); emit( receivedSegment( _streamId, _sourceId, segment )); } diff --git a/deflect/ServerWorker.h b/deflect/ServerWorker.h index 7cc0b03..4717cbe 100644 --- a/deflect/ServerWorker.h +++ b/deflect/ServerWorker.h @@ -96,6 +96,7 @@ private slots: QString _streamId; int _sourceId; + int _clientProtocolVersion; bool _registeredToEvents; QQueue _events; @@ -105,8 +106,9 @@ private slots: QByteArray _receiveMessageBody( int size ); void _handleMessage( const MessageHeader& messageHeader, - const QByteArray& byteArray ); - void _handlePixelStreamMessage( const QByteArray& byteArray ); + const QByteArray& message ); + void _parseClientProtocolVersion( const QByteArray& message ); + void _handlePixelStreamMessage( const QByteArray& message ); void _sendProtocolVersion(); void _sendBindReply( bool successful ); diff --git a/deflect/Socket.cpp b/deflect/Socket.cpp index 2ea6d75..360bf15 100644 --- a/deflect/Socket.cpp +++ b/deflect/Socket.cpp @@ -59,7 +59,7 @@ const unsigned short Socket::defaultPortNumber = DEFAULT_PORT_NUMBER; Socket::Socket( const std::string& host, const unsigned short port ) : _host( host ) , _socket( new QTcpSocket( )) - , _remoteProtocolVersion( INVALID_NETWORK_PROTOCOL_VERSION ) + , _serverProtocolVersion( INVALID_NETWORK_PROTOCOL_VERSION ) { // disable warnings which occur if no QCoreApplication is present during // _connect(): QObject::connect: Cannot connect (null)::destroyed() to @@ -91,6 +91,11 @@ bool Socket::isConnected() const return _socket->state() == QTcpSocket::ConnectedState; } +int32_t Socket::getServerProtocolVersion() const +{ + return _serverProtocolVersion; +} + int Socket::getFileDescriptor() const { return _socket->socketDescriptor(); @@ -172,11 +177,6 @@ bool Socket::receive( MessageHeader& messageHeader, QByteArray& message ) return true; } -int32_t Socket::getRemoteProtocolVersion() const -{ - return _remoteProtocolVersion; -} - bool Socket::_receiveHeader( MessageHeader& messageHeader ) { while( _socket->bytesAvailable() < qint64(MessageHeader::serializedSize) ) @@ -193,45 +193,42 @@ bool Socket::_receiveHeader( MessageHeader& messageHeader ) bool Socket::_connect( const std::string& host, const unsigned short port ) { - // make sure we're disconnected - _socket->disconnectFromHost(); - - // open connection _socket->connectToHost( host.c_str(), port ); - if( !_socket->waitForConnected( RECEIVE_TIMEOUT_MS )) { - std::cerr << "could not connect to host " << host << ":" << port + std::cerr << "could not connect to " << host << ":" << port << std::endl; return false; } - // handshake - if( _checkProtocolVersion( )) - return true; + if( !_receiveProtocolVersion( )) + { + std::cerr << "server protocol version was not received" << std::endl; + _socket->disconnectFromHost(); + return false; + } + + if( _serverProtocolVersion < NETWORK_PROTOCOL_VERSION ) + { + std::cerr << "server uses unsupported protocol: " + << _serverProtocolVersion << " < " + << NETWORK_PROTOCOL_VERSION << std::endl; + _socket->disconnectFromHost(); + return false; + } - std::cerr << "Protocol version check failed for host: " << host << ":" - << port << std::endl; - _socket->disconnectFromHost(); - return false; + return true; } -bool Socket::_checkProtocolVersion() +bool Socket::_receiveProtocolVersion() { while( _socket->bytesAvailable() < qint64(sizeof(int32_t)) ) { if( !_socket->waitForReadyRead( RECEIVE_TIMEOUT_MS )) return false; } - - _socket->read((char *)&_remoteProtocolVersion, sizeof(int32_t)); - - if( _remoteProtocolVersion == NETWORK_PROTOCOL_VERSION ) - return true; - - std::cerr << "unsupported protocol version " << _remoteProtocolVersion - << " != " << NETWORK_PROTOCOL_VERSION << std::endl; - return false; + _socket->read((char*)&_serverProtocolVersion, sizeof(int32_t)); + return true; } } diff --git a/deflect/Socket.h b/deflect/Socket.h index 228f7b9..36533f3 100644 --- a/deflect/Socket.h +++ b/deflect/Socket.h @@ -85,11 +85,8 @@ class Socket : public QObject /** Is the Socket connected */ DEFLECT_API bool isConnected() const; - /** - * Is there a pending message - * @param messageSize Minimum size of the message - */ - bool hasMessage( const size_t messageSize = 0 ) const; + /** @return the protocol version of the server. */ + int32_t getServerProtocolVersion() const; /** * Get the FileDescriptor for the Socket (for use by poll()) @@ -97,6 +94,12 @@ class Socket : public QObject */ int getFileDescriptor() const; + /** + * Is there a pending message + * @param messageSize Minimum size of the message + */ + bool hasMessage( const size_t messageSize = 0 ) const; + /** * Send a message. * @param messageHeader The message header @@ -113,9 +116,6 @@ class Socket : public QObject */ bool receive( MessageHeader& messageHeader, QByteArray& message ); - /** Get the protocol version of the remote host */ - int32_t getRemoteProtocolVersion() const; - signals: /** Signal that the socket has been disconnected. */ void disconnected(); @@ -123,13 +123,12 @@ class Socket : public QObject private: const std::string _host; QTcpSocket* _socket; - int32_t _remoteProtocolVersion; mutable QMutex _socketMutex; - - bool _connect( const std::string &host, const unsigned short port ); - bool _checkProtocolVersion(); + int32_t _serverProtocolVersion; bool _receiveHeader( MessageHeader& messageHeader ); + bool _connect( const std::string &host, const unsigned short port ); + bool _receiveProtocolVersion(); }; } diff --git a/deflect/StreamPrivate.cpp b/deflect/StreamPrivate.cpp index 2530274..49b1113 100644 --- a/deflect/StreamPrivate.cpp +++ b/deflect/StreamPrivate.cpp @@ -41,6 +41,7 @@ #include "StreamPrivate.h" +#include "NetworkProtocol.h" #include "Segment.h" #include "SegmentParameters.h" #include "SizeHints.h" @@ -119,8 +120,9 @@ StreamPrivate::~StreamPrivate() void StreamPrivate::sendOpen() { - const MessageHeader mh( MESSAGE_TYPE_PIXELSTREAM_OPEN, 0, id ); - socket.send( mh, QByteArray( )); + const auto message = QByteArray::number( NETWORK_PROTOCOL_VERSION ); + const MessageHeader mh( MESSAGE_TYPE_PIXELSTREAM_OPEN, message.size(), id ); + socket.send( mh, message ); } void StreamPrivate::sendClose() diff --git a/doc/Changelog.md b/doc/Changelog.md index 34493f2..c8b830b 100644 --- a/doc/Changelog.md +++ b/doc/Changelog.md @@ -3,8 +3,12 @@ Changelog {#Changelog} ## Deflect 0.12 -### 0.12.1 (git master) +### 0.12.1 (01-02-2017) +* [147](https://github.com/BlueBrain/Deflect/pull/147): + Improved handling of network protocol updates. Future updates should be + possible without breaking any client/server based on this release. + Deflect server: better reporting of JPEG decompression errors. * [146](https://github.com/BlueBrain/Deflect/pull/146): Unified the command line options and help message of applications. * [145](https://github.com/BlueBrain/Deflect/pull/145): diff --git a/tests/cpp/SegmentDecoderTests.cpp b/tests/cpp/SegmentDecoderTests.cpp index 0ef932f..221e6a8 100644 --- a/tests/cpp/SegmentDecoderTests.cpp +++ b/tests/cpp/SegmentDecoderTests.cpp @@ -1,6 +1,6 @@ /*********************************************************************/ -/* Copyright (c) 2013, EPFL/Blue Brain Project */ -/* Raphael Dumusc */ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ +/* Raphael Dumusc */ /* All rights reserved. */ /* */ /* Redistribution and use in source and binary forms, with or */ @@ -136,3 +136,22 @@ BOOST_AUTO_TEST_CASE( testImageSegmentationWithCompressionAndDecompression ) data.data() + segment.imageData.size(), dataOut, dataOut+segment.imageData.size( )); } + +BOOST_AUTO_TEST_CASE( testDecompressionOfInvalidData ) +{ + const QByteArray invalidJpegData{ "notjpeg923%^#8" }; + deflect::ImageJpegDecompressor decompressor; + BOOST_CHECK_THROW( decompressor.decompress( invalidJpegData ), + std::runtime_error ); + + deflect::Segment segment; + segment.parameters.width = 32; + segment.parameters.height = 32; + segment.imageData = invalidJpegData; + + deflect::SegmentDecoder decoder; + BOOST_CHECK_THROW( decoder.decode( segment ), std::runtime_error ); + + BOOST_CHECK_NO_THROW( decoder.startDecoding( segment )); + BOOST_CHECK_THROW( decoder.waitDecoding(), std::runtime_error ); +} diff --git a/tests/cpp/SocketTests.cpp b/tests/cpp/SocketTests.cpp index 575c01b..318e8c3 100644 --- a/tests/cpp/SocketTests.cpp +++ b/tests/cpp/SocketTests.cpp @@ -44,6 +44,7 @@ namespace ut = boost::unit_test; #include "MinimalGlobalQtApp.h" #include "MockServer.h" +#include #include #include @@ -53,14 +54,14 @@ BOOST_GLOBAL_FIXTURE( MinimalGlobalQtApp ); void testSocketConnect( const int32_t versionOffset ) { QThread thread; - MockServer* server = new MockServer( NETWORK_PROTOCOL_VERSION + versionOffset ); + auto server = new MockServer( NETWORK_PROTOCOL_VERSION + versionOffset ); server->moveToThread( &thread ); server->connect( &thread, &QThread::finished, server, &QObject::deleteLater ); thread.start(); deflect::Socket socket( "localhost", server->serverPort( )); - BOOST_CHECK( socket.isConnected() == (versionOffset == 0)); + BOOST_CHECK( socket.isConnected() == (versionOffset >= 0)); thread.quit(); thread.wait(); diff --git a/tests/mock/MockServer.h b/tests/mock/MockServer.h index b63bb21..8dd6f68 100644 --- a/tests/mock/MockServer.h +++ b/tests/mock/MockServer.h @@ -46,7 +46,6 @@ typedef __int32 int32_t; #include #include -#include #include @@ -55,8 +54,7 @@ class MockServer : public QTcpServer Q_OBJECT public: - DEFLECT_API explicit MockServer( int32_t protocolVersion = - NETWORK_PROTOCOL_VERSION ); + DEFLECT_API explicit MockServer( int32_t protocolVersion ); DEFLECT_API virtual ~MockServer(); protected: