From 5a8f25b1c8a0a0afed38c37b7b1c4f78ff5f5d05 Mon Sep 17 00:00:00 2001 From: Daniel Nachbaur Date: Thu, 24 May 2018 19:06:14 +0200 Subject: [PATCH 1/2] Fix data races for ServerTests --- deflect/StreamSendWorker.h | 2 +- tests/mock/DeflectServer.cpp | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/deflect/StreamSendWorker.h b/deflect/StreamSendWorker.h index 6d91084..3d7e730 100644 --- a/deflect/StreamSendWorker.h +++ b/deflect/StreamSendWorker.h @@ -103,7 +103,7 @@ class StreamSendWorker : public QThread const std::string& _id; moodycamel::BlockingConcurrentQueue _requests; - bool _running = false; + std::atomic_bool _running{false}; View _currentView = View::mono; RowOrder _currentRowOrder = RowOrder::top_down; uint8_t _currentChannel = 0; diff --git a/tests/mock/DeflectServer.cpp b/tests/mock/DeflectServer.cpp index fa37a1e..b79a6c2 100644 --- a/tests/mock/DeflectServer.cpp +++ b/tests/mock/DeflectServer.cpp @@ -51,8 +51,8 @@ DeflectServer::DeflectServer() _server->connect(_server, &deflect::server::Server::pixelStreamOpened, [&](const QString) { - ++_openedStreams; _mutex.lock(); + ++_openedStreams; _receivedState = true; _received.wakeAll(); _mutex.unlock(); @@ -60,8 +60,8 @@ DeflectServer::DeflectServer() _server->connect(_server, &deflect::server::Server::pixelStreamClosed, [&](const QString) { - --_openedStreams; _mutex.lock(); + --_openedStreams; _receivedState = true; _received.wakeAll(); _mutex.unlock(); @@ -69,9 +69,9 @@ DeflectServer::DeflectServer() _server->connect(_server, &deflect::server::Server::receivedSizeHints, [&](const QString id, const deflect::SizeHints hints) { + _mutex.lock(); if (_sizeHintsCallback) _sizeHintsCallback(id, hints); - _mutex.lock(); _receivedState = true; _received.wakeAll(); _mutex.unlock(); @@ -79,9 +79,9 @@ DeflectServer::DeflectServer() _server->connect(_server, &deflect::server::Server::receivedData, [&](const QString id, QByteArray data) { + _mutex.lock(); if (_dataReceivedCallback) _dataReceivedCallback(id, data); - _mutex.lock(); _receivedState = true; _received.wakeAll(); _mutex.unlock(); @@ -89,10 +89,10 @@ DeflectServer::DeflectServer() _server->connect(_server, &deflect::server::Server::receivedFrame, [&](deflect::server::FramePtr frame) { + _mutex.lock(); if (_frameReceivedCallback) _frameReceivedCallback(frame); ++_receivedFrames; - _mutex.lock(); _receivedState = true; _received.wakeAll(); _mutex.unlock(); @@ -120,7 +120,7 @@ DeflectServer::~DeflectServer() void DeflectServer::waitForMessage() { - for (size_t j = 0; j < 20; ++j) + for (;;) { _mutex.lock(); _received.wait(&_mutex, 100 /*ms*/); From 180ecb1f2b918f47d7e472159f9a31b5ef771ef8 Mon Sep 17 00:00:00 2001 From: Daniel Nachbaur Date: Thu, 24 May 2018 19:06:37 +0200 Subject: [PATCH 2/2] Update moodycamel queue to latest version --- deflect/moodycamel/blockingconcurrentqueue.h | 18 +++---- deflect/moodycamel/concurrentqueue.h | 56 ++++++++++++-------- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/deflect/moodycamel/blockingconcurrentqueue.h b/deflect/moodycamel/blockingconcurrentqueue.h index 48405fa..a5e40de 100644 --- a/deflect/moodycamel/blockingconcurrentqueue.h +++ b/deflect/moodycamel/blockingconcurrentqueue.h @@ -131,14 +131,14 @@ class Semaphore bool timed_wait(std::uint64_t timeout_usecs) { mach_timespec_t ts; - ts.tv_sec = timeout_usecs / 1000000; + ts.tv_sec = static_cast(timeout_usecs / 1000000); ts.tv_nsec = (timeout_usecs % 1000000) * 1000; // added in OSX 10.10: // https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html kern_return_t rc = semaphore_timedwait(m_sema, ts); - return rc != KERN_OPERATION_TIMED_OUT; + return rc != KERN_OPERATION_TIMED_OUT && rc != KERN_ABORTED; } void signal() { semaphore_signal(m_sema); } @@ -200,7 +200,7 @@ class Semaphore ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000; // sem_timedwait bombs if you have more than 1e9 in tv_nsec // so we have to clean things up before passing it in - if (ts.tv_nsec > nsecs_in_1_sec) + if (ts.tv_nsec >= nsecs_in_1_sec) { ts.tv_nsec -= nsecs_in_1_sec; ++ts.tv_sec; @@ -556,7 +556,7 @@ class BlockingConcurrentQueue // Thread-safe. inline bool enqueue(T const& item) { - if (details::likely(inner.enqueue(item))) + if ((details::likely)(inner.enqueue(item))) { sema->signal(); return true; @@ -573,7 +573,7 @@ class BlockingConcurrentQueue // Thread-safe. inline bool enqueue(T&& item) { - if (details::likely(inner.enqueue(std::move(item)))) + if ((details::likely)(inner.enqueue(std::move(item)))) { sema->signal(); return true; @@ -587,7 +587,7 @@ class BlockingConcurrentQueue // Thread-safe. inline bool enqueue(producer_token_t const& token, T const& item) { - if (details::likely(inner.enqueue(token, item))) + if ((details::likely)(inner.enqueue(token, item))) { sema->signal(); return true; @@ -602,7 +602,7 @@ class BlockingConcurrentQueue // Thread-safe. inline bool enqueue(producer_token_t const& token, T&& item) { - if (details::likely(inner.enqueue(token, std::move(item)))) + if ((details::likely)(inner.enqueue(token, std::move(item)))) { sema->signal(); return true; @@ -622,7 +622,7 @@ class BlockingConcurrentQueue template inline bool enqueue_bulk(It itemFirst, size_t count) { - if (details::likely( + if ((details::likely)( inner.enqueue_bulk(std::forward(itemFirst), count))) { sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count); @@ -641,7 +641,7 @@ class BlockingConcurrentQueue inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count) { - if (details::likely( + if ((details::likely)( inner.enqueue_bulk(token, std::forward(itemFirst), count))) { sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count); diff --git a/deflect/moodycamel/concurrentqueue.h b/deflect/moodycamel/concurrentqueue.h index 3dabb2a..5dff3d0 100644 --- a/deflect/moodycamel/concurrentqueue.h +++ b/deflect/moodycamel/concurrentqueue.h @@ -1,4 +1,4 @@ -// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free +// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free // queue. // An overview, including benchmark results, is provided here: // http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++ @@ -336,20 +336,20 @@ namespace moodycamel namespace details { #if defined(__GNUC__) -inline bool likely(bool x) +static inline bool(likely)(bool x) { return __builtin_expect((x), true); } -inline bool unlikely(bool x) +static inline bool(unlikely)(bool x) { return __builtin_expect((x), false); } #else -inline bool likely(bool x) +static inline bool(likely)(bool x) { return x; } -inline bool unlikely(bool x) +static inline bool(unlikely)(bool x) { return x; } @@ -377,12 +377,23 @@ struct const_numeric_max : static_cast(-1); }; -#if defined(__GNUC__) && !defined(__clang__) -typedef ::max_align_t max_align_t; // GCC forgot to add it to std:: for a while +#if defined(__GLIBCXX__) +typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: + // for a while #else -typedef std::max_align_t max_align_t; // Others (e.g. MSVC) insist it can *only* - // be accessed via std:: +typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can + // *only* be accessed via std:: #endif + +// Some platforms have incorrectly set max_align_t to a type with <8 bytes +// alignment even while supporting +// 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our +// own union. See issue #64. +typedef union { + std_max_align_t x; + long long y; + void* z; +} max_align_t; } // Default traits for the ConcurrentQueue. To change some of the @@ -1420,7 +1431,7 @@ class ConcurrentQueue // tried if (nonEmptyCount > 0) { - if (details::likely(best->dequeue(item))) + if ((details::likely)(best->dequeue(item))) { return true; } @@ -1683,7 +1694,10 @@ class ConcurrentQueue private: friend struct ProducerToken; friend struct ConsumerToken; + struct ExplicitProducer; friend struct ExplicitProducer; + struct ImplicitProducer; + friend struct ImplicitProducer; friend class ConcurrentQueueTests; enum AllocationMode @@ -1745,7 +1759,7 @@ class ConcurrentQueue auto prodCount = producerCount.load(std::memory_order_relaxed); auto globalOffset = globalExplicitConsumerOffset.load(std::memory_order_relaxed); - if (details::unlikely(token.desiredProducer == nullptr)) + if ((details::unlikely)(token.desiredProducer == nullptr)) { // Aha, first time we're dequeueing anything. // Figure out our local position @@ -1885,7 +1899,7 @@ class ConcurrentQueue // Decrease refcount twice, once for our ref, and once for // the list's ref - head->freeListRefs.fetch_add(-2, std::memory_order_release); + head->freeListRefs.fetch_sub(2, std::memory_order_release); return head; } @@ -1895,7 +1909,7 @@ class ConcurrentQueue // do need to ensure that the reference // count decrement happens-after the CAS on the head. refs = - prevHead->freeListRefs.fetch_add(-1, + prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel); if (refs == SHOULD_BE_ON_FREELIST + 1) { @@ -2635,7 +2649,7 @@ class ConcurrentQueue // coherency (as defined in the standard), explained here: // http://en.cppreference.com/w/cpp/atomic/memory_order tail = this->tailIndex.load(std::memory_order_acquire); - if (details::likely(details::circular_less_than( + if ((details::likely)(details::circular_less_than( myDequeueCount - overcommit, tail))) { // Guaranteed to be at least one element to dequeue! @@ -3483,7 +3497,7 @@ class ConcurrentQueue 1, std::memory_order_relaxed); assert(overcommit <= myDequeueCount); tail = this->tailIndex.load(std::memory_order_acquire); - if (details::likely(details::circular_less_than( + if ((details::likely)(details::circular_less_than( myDequeueCount - overcommit, tail))) { index_t index = @@ -4756,8 +4770,8 @@ class ConcurrentQueue if (raw == nullptr) { // Allocation failed - implicitProducerHashCount.fetch_add( - -1, std::memory_order_relaxed); + implicitProducerHashCount.fetch_sub( + 1, std::memory_order_relaxed); implicitProducerHashResizeInProgress.clear( std::memory_order_relaxed); return nullptr; @@ -4802,14 +4816,14 @@ class ConcurrentQueue recycle_or_create_producer(false, recycled)); if (producer == nullptr) { - implicitProducerHashCount.fetch_add( - -1, std::memory_order_relaxed); + implicitProducerHashCount.fetch_sub( + 1, std::memory_order_relaxed); return nullptr; } if (recycled) { - implicitProducerHashCount.fetch_add( - -1, std::memory_order_relaxed); + implicitProducerHashCount.fetch_sub( + 1, std::memory_order_relaxed); } #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED