From 11b1cccbb134cb30d53e9f71a361c60abc62a310 Mon Sep 17 00:00:00 2001 From: savearray2 Date: Sat, 12 Sep 2020 21:15:43 +0900 Subject: [PATCH 1/2] Includes Support for ZSTD and SNAPPY Compression A version bump to the latest version of the Pulsar C++ library (2.6.1) was required to make use of the compression types. As the latest version of the C++ library requires GLIBCXX_3.4.22, the testing suite code was altered to automatically update the Docker image with the latest version of libstdc++6 and gcc-4.9 before executing the library tests. --- pulsar-version.txt | 2 +- run-unit-tests.sh | 4 ++++ src/ProducerConfig.cc | 8 +++++-- tests/end_to_end.test.js | 48 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 3 deletions(-) diff --git a/pulsar-version.txt b/pulsar-version.txt index 437459cd..6a6a3d8e 100755 --- a/pulsar-version.txt +++ b/pulsar-version.txt @@ -1 +1 @@ -2.5.0 +2.6.1 diff --git a/run-unit-tests.sh b/run-unit-tests.sh index 7f2accc8..5e2994f5 100755 --- a/run-unit-tests.sh +++ b/run-unit-tests.sh @@ -30,6 +30,10 @@ rm -rf $PULSAR_PKG_DIR for pkg in apache-pulsar-client-dev.deb apache-pulsar-client.deb;do curl -L --create-dir "https://archive.apache.org/dist/pulsar/pulsar-${VERSION}/DEB/${pkg}" -o $PULSAR_PKG_DIR/$pkg done; +apt-get -y update +apt-get install -y software-properties-common +add-apt-repository ppa:ubuntu-toolchain-r/test && apt-get -y update +apt-get -y install gcc-4.9 && apt-get upgrade -y libstdc++6 apt install $PULSAR_PKG_DIR/apache-pulsar-client*.deb ./pulsar-test-service-start.sh diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 76fd25c2..b9c0a758 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -46,8 +46,12 @@ static const std::map HASHING_SCHEME = { {"JavaStringHash", pulsar_JavaStringHash}, }; -static std::map COMPRESSION_TYPE = {{"Zlib", pulsar_CompressionZLib}, - {"LZ4", pulsar_CompressionLZ4}}; +static std::map COMPRESSION_TYPE = { + {"Zlib", pulsar_CompressionZLib}, + {"LZ4", pulsar_CompressionLZ4}, + {"ZSTD", pulsar_CompressionZSTD}, + {"SNAPPY", pulsar_CompressionSNAPPY}, +}; ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { this->cProducerConfig = pulsar_producer_configuration_create(); diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 0afe8640..b4f77809 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -376,5 +376,53 @@ const Pulsar = require('../index.js'); await reader.close(); await client.close(); }); + + test('Produce/Read (Compression)', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + expect(client).not.toBeNull(); + + const topic = 'persistent://public/default/produce-read-compression'; + const producer = await client.createProducer({ + topic, + sendTimeoutMs: 30000, + batchingEnabled: true, + compressionType: 'ZSTD', + }); + expect(producer).not.toBeNull(); + + const reader = await client.createReader({ + topic, + startMessageId: Pulsar.MessageId.latest(), + }); + expect(reader).not.toBeNull(); + + const messages = []; + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + messages.push(msg); + } + await producer.flush(); + + expect(reader.hasNext()).toBe(true); + + const results = []; + for (let i = 0; i < 10; i += 1) { + const msg = await reader.readNext(); + results.push(msg.getData().toString()); + } + expect(lodash.difference(messages, results)).toEqual([]); + + expect(reader.hasNext()).toBe(false); + + await producer.close(); + await reader.close(); + await client.close(); + }); }); })(); From 7d4e11d0913423bda1a2b6cca49fc0b5dcd34b8f Mon Sep 17 00:00:00 2001 From: savearray2 <46784573+savearray2@users.noreply.github.com> Date: Sat, 19 Sep 2020 00:07:24 +0900 Subject: [PATCH 2/2] Update pulsar-version.txt --- pulsar-version.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-version.txt b/pulsar-version.txt index 6a6a3d8e..e70b4523 100755 --- a/pulsar-version.txt +++ b/pulsar-version.txt @@ -1 +1 @@ -2.6.1 +2.6.0