diff --git a/pulsar-version.txt b/pulsar-version.txt index 437459cd..e70b4523 100755 --- a/pulsar-version.txt +++ b/pulsar-version.txt @@ -1 +1 @@ -2.5.0 +2.6.0 diff --git a/run-unit-tests.sh b/run-unit-tests.sh index 7f2accc8..5e2994f5 100755 --- a/run-unit-tests.sh +++ b/run-unit-tests.sh @@ -30,6 +30,10 @@ rm -rf $PULSAR_PKG_DIR for pkg in apache-pulsar-client-dev.deb apache-pulsar-client.deb;do curl -L --create-dir "https://archive.apache.org/dist/pulsar/pulsar-${VERSION}/DEB/${pkg}" -o $PULSAR_PKG_DIR/$pkg done; +apt-get -y update +apt-get install -y software-properties-common +add-apt-repository ppa:ubuntu-toolchain-r/test && apt-get -y update +apt-get -y install gcc-4.9 && apt-get upgrade -y libstdc++6 apt install $PULSAR_PKG_DIR/apache-pulsar-client*.deb ./pulsar-test-service-start.sh diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 76fd25c2..b9c0a758 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -46,8 +46,12 @@ static const std::map HASHING_SCHEME = { {"JavaStringHash", pulsar_JavaStringHash}, }; -static std::map COMPRESSION_TYPE = {{"Zlib", pulsar_CompressionZLib}, - {"LZ4", pulsar_CompressionLZ4}}; +static std::map COMPRESSION_TYPE = { + {"Zlib", pulsar_CompressionZLib}, + {"LZ4", pulsar_CompressionLZ4}, + {"ZSTD", pulsar_CompressionZSTD}, + {"SNAPPY", pulsar_CompressionSNAPPY}, +}; ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { this->cProducerConfig = pulsar_producer_configuration_create(); diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 1a6b60c6..b17e67e5 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -438,5 +438,53 @@ const Pulsar = require('../index.js'); await producer.close(); await client.close(); }); + + test('Produce/Read (Compression)', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + expect(client).not.toBeNull(); + + const topic = 'persistent://public/default/produce-read-compression'; + const producer = await client.createProducer({ + topic, + sendTimeoutMs: 30000, + batchingEnabled: true, + compressionType: 'ZSTD', + }); + expect(producer).not.toBeNull(); + + const reader = await client.createReader({ + topic, + startMessageId: Pulsar.MessageId.latest(), + }); + expect(reader).not.toBeNull(); + + const messages = []; + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + messages.push(msg); + } + await producer.flush(); + + expect(reader.hasNext()).toBe(true); + + const results = []; + for (let i = 0; i < 10; i += 1) { + const msg = await reader.readNext(); + results.push(msg.getData().toString()); + } + expect(lodash.difference(messages, results)).toEqual([]); + + expect(reader.hasNext()).toBe(false); + + await producer.close(); + await reader.close(); + await client.close(); + }); }); })();