From 828d2c4043ca72935df571339c0c551fdaffcc3d Mon Sep 17 00:00:00 2001 From: CHAE-PIL LIM Date: Mon, 17 Oct 2016 21:59:49 +0900 Subject: [PATCH 1/3] Zmq sequence (#1) * Fixes ZMQ startup with bad arguments. pr 7621 * [ZMQ] append a message sequence number to every ZMQ notification - pr 7762 - contrib/zmq/zmq_sub.py to python 3 compatible --- contrib/zmq/zmq_sub.py | 46 ++++++++++++++++------------ doc/release-notes.md | 9 ++++++ doc/zmq.md | 5 +++ src/zmq/zmqnotificationinterface.cpp | 1 - src/zmq/zmqpublishnotifier.cpp | 43 ++++++++++++++++++-------- src/zmq/zmqpublishnotifier.h | 12 ++++++++ 6 files changed, 83 insertions(+), 33 deletions(-) diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index 2dbe2923b561..62c3d34fb793 100755 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -1,45 +1,51 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python import array import binascii import zmq +import struct port = 28332 zmqContext = zmq.Context() zmqSubSocket = zmqContext.socket(zmq.SUB) -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock") -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx") -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtxlock") -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock") -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx") -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtxlock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashblock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtx") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtxlock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawblock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtx") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtxlock") zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) try: while True: msg = zmqSubSocket.recv_multipart() - topic = str(msg[0]) + topic = str(msg[0].decode("utf-8")) body = msg[1] + sequence = "Unknown"; + + if len(msg[-1]) == 4: + msgSequence = struct.unpack(' mapPublishNotifiers; +static const char *MSG_HASHBLOCK = "hashblock"; +static const char *MSG_HASHTX = "hashtx"; +static const char *MSG_HASHTXLOCK = "hashtxlock"; +static const char *MSG_RAWBLOCK = "rawblock"; +static const char *MSG_RAWTX = "rawtx"; +static const char *MMSG_RAWTXLOCK = "rawtxlock"; + // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) { @@ -69,6 +76,7 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) if (rc!=0) { zmqError("Failed to bind address"); + zmq_close(psocket); return false; } @@ -117,6 +125,23 @@ void CZMQAbstractPublishNotifier::Shutdown() psocket = 0; } +bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) +{ + assert(psocket); + + /* send three parts, command & data & a LE 4byte sequence number */ + unsigned char msgseq[sizeof(uint32_t)]; + WriteLE32(&msgseq[0], nSequence); + int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0); + if (rc == -1) + return false; + + /* increment memory only sequence number after sending */ + nSequence++; + + return true; +} + bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); @@ -124,8 +149,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0); - return rc == 0; + return SendMessage(MSG_HASHBLOCK, data, 32); } bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) @@ -135,8 +159,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0); - return rc == 0; + return SendMessage(MSG_HASHTX, data, 32); } bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransaction &transaction) @@ -146,8 +169,7 @@ bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransa char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - int rc = zmq_send_multipart(psocket, "hashtxlock", 10, data, 32, 0); - return rc == 0; + return SendMessage(MSG_HASHTXLOCK, data, 32); } bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) @@ -168,8 +190,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) ss << block; } - int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0); - return rc == 0; + return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) @@ -178,8 +199,7 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << transaction; - int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0); - return rc == 0; + return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransaction &transaction) @@ -188,6 +208,5 @@ bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransac LogPrint("zmq", "zmq: Publish rawtxlock %s\n", hash.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << transaction; - int rc = zmq_send_multipart(psocket, "rawtxlock", 9, &(*ss.begin()), ss.size(), 0); - return rc == 0; + return SendMessage(MMSG_RAWTXLOCK, &(*ss.begin()), ss.size()); } diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index c9215047250b..c2617c0f043a 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -11,7 +11,19 @@ class CBlockIndex; class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier { +private: + uint32_t nSequence; // upcounting per message sequence number + public: + + /* send zmq multipart message + parts: + * command + * data + * message sequence number + */ + bool SendMessage(const char *command, const void* data, size_t size); + bool Initialize(void *pcontext); void Shutdown(); }; From 5bf0639c9c599ac8cb3262c947363f0e1db60906 Mon Sep 17 00:00:00 2001 From: CHAE-PIL LIM Date: Mon, 17 Oct 2016 23:42:12 +0900 Subject: [PATCH 2/3] typo in MSG_RAWTXLOCK MMSG_RAWTXLOCK to MSG_RAWTXLOCK --- src/zmq/zmqpublishnotifier.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index bf54cb1015b2..e2d334c951b8 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -14,7 +14,7 @@ static const char *MSG_HASHTX = "hashtx"; static const char *MSG_HASHTXLOCK = "hashtxlock"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; -static const char *MMSG_RAWTXLOCK = "rawtxlock"; +static const char *MSG_RAWTXLOCK = "rawtxlock"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) @@ -208,5 +208,5 @@ bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransac LogPrint("zmq", "zmq: Publish rawtxlock %s\n", hash.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << transaction; - return SendMessage(MMSG_RAWTXLOCK, &(*ss.begin()), ss.size()); + return SendMessage(MSG_RAWTXLOCK, &(*ss.begin()), ss.size()); } From a05ca4ca788a323beb5fa311a0e41eb0a6cfc1d5 Mon Sep 17 00:00:00 2001 From: CHAE-PIL LIM Date: Tue, 18 Oct 2016 04:22:39 +0900 Subject: [PATCH 3/3] s/Bitcoind/dashd/ --- doc/zmq.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/zmq.md b/doc/zmq.md index fa2ec69fedee..95f982c0edbe 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -104,5 +104,5 @@ retrieve the chain from the last known block to the new tip. There are several possibilities that ZMQ notification can get lost during transmission depending on the communication type your are -using. Bitcoind appends an up-counting sequence number to each -notification which allows listeners to detect lost notifications. \ No newline at end of file +using. Dashd appends an up-counting sequence number to each +notification which allows listeners to detect lost notifications.