From 9f7f6cdd23f06bf67541bfa90f66cc9de31e5abb Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Mon, 14 Nov 2022 11:46:11 +0800 Subject: [PATCH 1/6] [feat] Support consumer seek by timestamp for C Api --- include/pulsar/c/consumer.h | 5 +++ lib/c/c_Consumer.cc | 10 +++++ tests/c/c_ConsumerTest.cc | 87 +++++++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+) create mode 100644 tests/c/c_ConsumerTest.cc diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h index 99ff8c80..6059af4e 100644 --- a/include/pulsar/c/consumer.h +++ b/include/pulsar/c/consumer.h @@ -246,6 +246,11 @@ PULSAR_PUBLIC void pulsar_consumer_seek_async(pulsar_consumer_t *consumer, pulsa PULSAR_PUBLIC pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId); +PULSAR_PUBLIC void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, uint64_t timestamp, + pulsar_result_callback callback, void *ctx); + +PULSAR_PUBLIC pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer, uint64_t timestamp); + PULSAR_PUBLIC int pulsar_consumer_is_connected(pulsar_consumer_t *consumer); PULSAR_PUBLIC pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer, diff --git a/lib/c/c_Consumer.cc b/lib/c/c_Consumer.cc index 062c801f..df4c9f3f 100644 --- a/lib/c/c_Consumer.cc +++ b/lib/c/c_Consumer.cc @@ -156,6 +156,16 @@ pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_i return (pulsar_result)consumer->consumer.seek(messageId->messageId); } +void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, uint64_t timestamp, + pulsar_result_callback callback, void *ctx) { + consumer->consumer.seekAsync(timestamp, + std::bind(handle_result_callback, std::placeholders::_1, callback, ctx)); +} + +pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer, uint64_t timestamp) { + return (pulsar_result)consumer->consumer.seek(timestamp); +} + int pulsar_consumer_is_connected(pulsar_consumer_t *consumer) { return consumer->consumer.isConnected(); } pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer, diff --git a/tests/c/c_ConsumerTest.cc b/tests/c/c_ConsumerTest.cc new file mode 100644 index 00000000..e5a81422 --- /dev/null +++ b/tests/c/c_ConsumerTest.cc @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include + +struct seek_ctx { + std::promise *promise; +}; + +static void seek_callback(pulsar_result async_result, void *ctx) { + auto *seek_ctx = (struct seek_ctx *)ctx; + seek_ctx->promise->set_value(async_result); +} + +TEST(c_ConsumerTest, testSeekTime) { + const char *lookup_url = "pulsar://localhost:6650"; + auto topic_name_str = "test-c-seek-time-" + std::to_string(time(nullptr)); + const char *topic_name = topic_name_str.c_str(); + + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); + pulsar_consumer_t *consumer; + result = pulsar_client_subscribe(client, topic_name, "seek-time", consumer_conf, &consumer); + ASSERT_EQ(pulsar_result_Ok, result); + + for (int i = 0; i < 10; i++) { + char content[10]; + sprintf(content, "msg-%d", i); + pulsar_message_t *msg = pulsar_message_create(); + pulsar_message_set_content(msg, content, strlen(content)); + pulsar_producer_send(producer, msg); + } + + uint64_t currentTime = pulsar::TimeUtils::currentTimeMillis(); + + pulsar_consumer_seek_by_timestamp(consumer, currentTime); + + pulsar_message_t *message; + ASSERT_EQ(pulsar_result_Timeout, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + + pulsar_consumer_seek_by_timestamp(consumer, currentTime - 100000); // Seek to 100 seconds ago + + ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-0"); + + // Test seek asynchronously + std::promise seek_promise; + std::future seek_future = seek_promise.get_future(); + struct seek_ctx seek_ctx = {&seek_promise}; + pulsar_consumer_seek_by_timestamp_async(consumer, currentTime, seek_callback, &seek_ctx); + ASSERT_EQ(pulsar_result_Ok, seek_future.get()); + ASSERT_EQ(pulsar_result_Timeout, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + + pulsar_consumer_free(consumer); + pulsar_consumer_configuration_free(consumer_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); + pulsar_client_configuration_free(conf); +} From 4d475e6a60b85e2873e1264a963a6f45d7fb2527 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 15 Nov 2022 09:19:45 +0800 Subject: [PATCH 2/6] Fix format --- include/pulsar/c/consumer.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h index 6059af4e..63b94bd5 100644 --- a/include/pulsar/c/consumer.h +++ b/include/pulsar/c/consumer.h @@ -247,9 +247,10 @@ PULSAR_PUBLIC void pulsar_consumer_seek_async(pulsar_consumer_t *consumer, pulsa PULSAR_PUBLIC pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId); PULSAR_PUBLIC void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, uint64_t timestamp, - pulsar_result_callback callback, void *ctx); + pulsar_result_callback callback, void *ctx); -PULSAR_PUBLIC pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer, uint64_t timestamp); +PULSAR_PUBLIC pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer, + uint64_t timestamp); PULSAR_PUBLIC int pulsar_consumer_is_connected(pulsar_consumer_t *consumer); From 5f1fefc6e4501d9c68d87427e42f46313a1417a8 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 15 Nov 2022 15:39:36 +0800 Subject: [PATCH 3/6] Add reader seek and tests --- include/pulsar/c/consumer.h | 34 +++++ include/pulsar/c/reader.h | 44 ++++++ lib/c/c_Reader.cc | 20 +++ tests/c/c_ConsumerTest.cc | 87 ------------ tests/c/c_SeekTest.cc | 264 ++++++++++++++++++++++++++++++++++++ 5 files changed, 362 insertions(+), 87 deletions(-) delete mode 100644 tests/c/c_ConsumerTest.cc create mode 100644 tests/c/c_SeekTest.cc diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h index 63b94bd5..bb8ae37f 100644 --- a/include/pulsar/c/consumer.h +++ b/include/pulsar/c/consumer.h @@ -241,14 +241,48 @@ PULSAR_PUBLIC pulsar_result resume_message_listener(pulsar_consumer_t *consumer) */ PULSAR_PUBLIC void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer); +/** + * Reset the subscription associated with this consumer to a specific message id. + * + * @param consumer The consumer + * @param messageId The message id can either be a specific message or represent the first or last messages in + * the topic. + * @param callback The callback for this async operation + * @param ctx The context for the callback + */ PULSAR_PUBLIC void pulsar_consumer_seek_async(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId, pulsar_result_callback callback, void *ctx); +/** + * Reset the subscription asynchronously associated with this consumer to a specific message id. + * + * @param consumer The consumer + * @param messageId The message id can either be a specific message or represent the first or last messages in + * the topic. + * @return Operation result + */ PULSAR_PUBLIC pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId); +/** + * Reset the subscription associated with this consumer to a specific message publish time. + * + * @param consumer The consumer + * @param timestamp The message publish time where to reposition the subscription. The timestamp format should + * be Unix time in milliseconds. + * @param callback The callback for this async operation + * @param ctx The context for the callback + */ PULSAR_PUBLIC void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, uint64_t timestamp, pulsar_result_callback callback, void *ctx); +/** + * Reset the subscription asynchronously associated with this consumer to a specific message publish time. + * + * @param consumer The consumer + * @param timestamp The message publish time where to reposition the subscription. The timestamp format should + * be Unix time in milliseconds. + * @return Operation result + */ PULSAR_PUBLIC pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer, uint64_t timestamp); diff --git a/include/pulsar/c/reader.h b/include/pulsar/c/reader.h index 4c546f80..12321fd2 100644 --- a/include/pulsar/c/reader.h +++ b/include/pulsar/c/reader.h @@ -59,6 +59,50 @@ PULSAR_PUBLIC pulsar_result pulsar_reader_read_next(pulsar_reader_t *reader, pul PULSAR_PUBLIC pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, pulsar_message_t **msg, int timeoutMs); +/** + * Reset the subscription associated with this reader to a specific message id. + * + * @param reader The reader + * @param messageId The message id can either be a specific message or represent the first or last messages in + * the topic. + * @param callback The callback for this async operation + * @param ctx The context for the callback + */ +PULSAR_PUBLIC void pulsar_reader_seek_async(pulsar_reader_t *reader, pulsar_message_id_t *messageId, + pulsar_result_callback callback, void *ctx); + +/** + * Reset the subscription asynchronously associated with this reader to a specific message id. + * + * @param reader The reader + * @param messageId The message id can either be a specific message or represent the first or last messages in + * the topic. + * @return Operation result + */ +PULSAR_PUBLIC pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, pulsar_message_id_t *messageId); + +/** + * Reset the subscription associated with this reader to a specific message publish time. + * + * @param reader The reader + * @param timestamp The message publish time where to reposition the subscription. The timestamp format should + * be Unix time in milliseconds. + * @param callback The callback for this async operation + * @param ctx The context for the callback + */ +PULSAR_PUBLIC void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t *reader, uint64_t timestamp, + pulsar_result_callback callback, void *ctx); + +/** + * Reset the subscription asynchronously associated with this reader to a specific message publish time. + * + * @param reader The reader + * @param timestamp The message publish time where to reposition the subscription. The timestamp format should + * be Unix time in milliseconds. + * @return Operation result + */ +PULSAR_PUBLIC pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t *reader, uint64_t timestamp); + PULSAR_PUBLIC pulsar_result pulsar_reader_close(pulsar_reader_t *reader); PULSAR_PUBLIC void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, diff --git a/lib/c/c_Reader.cc b/lib/c/c_Reader.cc index 3490b540..c4bdc497 100644 --- a/lib/c/c_Reader.cc +++ b/lib/c/c_Reader.cc @@ -45,6 +45,26 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls return (pulsar_result)res; } +void pulsar_reader_seek_async(pulsar_reader_t *reader, pulsar_message_id_t *messageId, + pulsar_result_callback callback, void *ctx) { + reader->reader.seekAsync(messageId->messageId, + std::bind(handle_result_callback, std::placeholders::_1, callback, ctx)); +} + +pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, pulsar_message_id_t *messageId) { + return (pulsar_result)reader->reader.seek(messageId->messageId); +} + +void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t *reader, uint64_t timestamp, + pulsar_result_callback callback, void *ctx) { + reader->reader.seekAsync(timestamp, + std::bind(handle_result_callback, std::placeholders::_1, callback, ctx)); +} + +pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t *reader, uint64_t timestamp) { + return (pulsar_result)reader->reader.seek(timestamp); +} + pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return (pulsar_result)reader->reader.close(); } void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx) { diff --git a/tests/c/c_ConsumerTest.cc b/tests/c/c_ConsumerTest.cc deleted file mode 100644 index e5a81422..00000000 --- a/tests/c/c_ConsumerTest.cc +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include -#include -#include - -#include - -struct seek_ctx { - std::promise *promise; -}; - -static void seek_callback(pulsar_result async_result, void *ctx) { - auto *seek_ctx = (struct seek_ctx *)ctx; - seek_ctx->promise->set_value(async_result); -} - -TEST(c_ConsumerTest, testSeekTime) { - const char *lookup_url = "pulsar://localhost:6650"; - auto topic_name_str = "test-c-seek-time-" + std::to_string(time(nullptr)); - const char *topic_name = topic_name_str.c_str(); - - pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); - pulsar_client_t *client = pulsar_client_create(lookup_url, conf); - - pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); - pulsar_producer_t *producer; - pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); - ASSERT_EQ(pulsar_result_Ok, result); - - pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); - pulsar_consumer_t *consumer; - result = pulsar_client_subscribe(client, topic_name, "seek-time", consumer_conf, &consumer); - ASSERT_EQ(pulsar_result_Ok, result); - - for (int i = 0; i < 10; i++) { - char content[10]; - sprintf(content, "msg-%d", i); - pulsar_message_t *msg = pulsar_message_create(); - pulsar_message_set_content(msg, content, strlen(content)); - pulsar_producer_send(producer, msg); - } - - uint64_t currentTime = pulsar::TimeUtils::currentTimeMillis(); - - pulsar_consumer_seek_by_timestamp(consumer, currentTime); - - pulsar_message_t *message; - ASSERT_EQ(pulsar_result_Timeout, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); - - pulsar_consumer_seek_by_timestamp(consumer, currentTime - 100000); // Seek to 100 seconds ago - - ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); - ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-0"); - - // Test seek asynchronously - std::promise seek_promise; - std::future seek_future = seek_promise.get_future(); - struct seek_ctx seek_ctx = {&seek_promise}; - pulsar_consumer_seek_by_timestamp_async(consumer, currentTime, seek_callback, &seek_ctx); - ASSERT_EQ(pulsar_result_Ok, seek_future.get()); - ASSERT_EQ(pulsar_result_Timeout, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); - - pulsar_consumer_free(consumer); - pulsar_consumer_configuration_free(consumer_conf); - pulsar_producer_free(producer); - pulsar_producer_configuration_free(producer_conf); - pulsar_client_free(client); - pulsar_client_configuration_free(conf); -} diff --git a/tests/c/c_SeekTest.cc b/tests/c/c_SeekTest.cc new file mode 100644 index 00000000..794f1a5f --- /dev/null +++ b/tests/c/c_SeekTest.cc @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include + +struct seek_ctx { + std::promise *promise; +}; + +static void seek_callback(pulsar_result async_result, void *ctx) { + auto *seek_ctx = (struct seek_ctx *)ctx; + seek_ctx->promise->set_value(async_result); +} + +TEST(c_SeekTest, testConsumerSeekMessageId) { + const char *lookup_url = "pulsar://localhost:6650"; + auto topic_name_str = "test-c-seek-msgid-" + std::to_string(time(nullptr)); + const char *topic_name = topic_name_str.c_str(); + + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); + pulsar_consumer_t *consumer; + result = pulsar_client_subscribe(client, topic_name, "seek-time", consumer_conf, &consumer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_message_t *seek_message; + + for (int i = 0; i < 10; i++) { + char content[10]; + sprintf(content, "msg-%d", i); + pulsar_message_t *msg = pulsar_message_create(); + pulsar_message_set_content(msg, content, strlen(content)); + pulsar_producer_send(producer, msg); + if (i == 5) { + seek_message = msg; + } else { + pulsar_message_free(msg); + } + } + + pulsar_consumer_seek(consumer, pulsar_message_get_message_id(seek_message)); + + pulsar_message_t *message; + ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); + pulsar_message_free(message); + + // Test seek asynchronously + std::promise seek_promise; + std::future seek_future = seek_promise.get_future(); + struct seek_ctx seek_ctx = {&seek_promise}; + pulsar_consumer_seek_async(consumer, pulsar_message_get_message_id(seek_message), seek_callback, + &seek_ctx); + ASSERT_EQ(pulsar_result_Ok, seek_future.get()); + ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); + + pulsar_message_free(seek_message); + pulsar_consumer_free(consumer); + pulsar_consumer_configuration_free(consumer_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); + pulsar_client_configuration_free(conf); +} + +TEST(c_SeekTest, testConsumerSeekTime) { + const char *lookup_url = "pulsar://localhost:6650"; + auto topic_name_str = "test-c-seek-time-" + std::to_string(time(nullptr)); + const char *topic_name = topic_name_str.c_str(); + + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); + pulsar_consumer_t *consumer; + result = pulsar_client_subscribe(client, topic_name, "seek-time", consumer_conf, &consumer); + ASSERT_EQ(pulsar_result_Ok, result); + + for (int i = 0; i < 10; i++) { + char content[10]; + sprintf(content, "msg-%d", i); + pulsar_message_t *msg = pulsar_message_create(); + pulsar_message_set_content(msg, content, strlen(content)); + pulsar_producer_send(producer, msg); + pulsar_message_free(msg); + } + + uint64_t currentTime = pulsar::TimeUtils::currentTimeMillis(); + + pulsar_consumer_seek_by_timestamp(consumer, currentTime); + + pulsar_message_t *message; + ASSERT_EQ(pulsar_result_Timeout, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + + pulsar_consumer_seek_by_timestamp(consumer, currentTime - 100000); // Seek to 100 seconds ago + + ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-0"); + + // Test seek asynchronously + std::promise seek_promise; + std::future seek_future = seek_promise.get_future(); + struct seek_ctx seek_ctx = {&seek_promise}; + pulsar_consumer_seek_by_timestamp_async(consumer, currentTime, seek_callback, &seek_ctx); + ASSERT_EQ(pulsar_result_Ok, seek_future.get()); + ASSERT_EQ(pulsar_result_Timeout, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + + pulsar_consumer_free(consumer); + pulsar_consumer_configuration_free(consumer_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); + pulsar_client_configuration_free(conf); +} + + +TEST(c_SeekTest, testReaderSeekMessageId) { + const char *lookup_url = "pulsar://localhost:6650"; + auto topic_name_str = "test-c-reader-seek-msgid-" + std::to_string(time(nullptr)); + const char *topic_name = topic_name_str.c_str(); + + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_reader_configuration_t *reader_conf = pulsar_reader_configuration_create(); + pulsar_reader_t *reader; + result = pulsar_client_create_reader(client, topic_name, pulsar_message_id_earliest(), reader_conf, &reader); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_message_t *seek_message; + + for (int i = 0; i < 10; i++) { + char content[10]; + sprintf(content, "msg-%d", i); + pulsar_message_t *msg = pulsar_message_create(); + pulsar_message_set_content(msg, content, strlen(content)); + pulsar_producer_send(producer, msg); + if (i == 5) { + seek_message = msg; + } else { + pulsar_message_free(msg); + } + } + + pulsar_reader_seek(reader, pulsar_message_get_message_id(seek_message)); + + pulsar_message_t *message; + ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); + pulsar_message_free(message); + + // Test seek asynchronously + std::promise seek_promise; + std::future seek_future = seek_promise.get_future(); + struct seek_ctx seek_ctx = {&seek_promise}; + pulsar_reader_seek_async(reader, pulsar_message_get_message_id(seek_message), seek_callback, + &seek_ctx); + ASSERT_EQ(pulsar_result_Ok, seek_future.get()); + ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); + + pulsar_message_free(seek_message); + pulsar_reader_free(reader); + pulsar_reader_configuration_free(reader_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); + pulsar_client_configuration_free(conf); +} + +TEST(c_SeekTest, testReaderSeekTime) { + const char *lookup_url = "pulsar://localhost:6650"; + auto topic_name_str = "test-c-reader-seek-time-" + std::to_string(time(nullptr)); + const char *topic_name = topic_name_str.c_str(); + + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_reader_configuration_t *reader_conf = pulsar_reader_configuration_create(); + pulsar_reader_t *reader; + result = + pulsar_client_create_reader(client, topic_name, pulsar_message_id_earliest(), reader_conf, &reader); + ASSERT_EQ(pulsar_result_Ok, result); + + for (int i = 0; i < 10; i++) { + char content[10]; + sprintf(content, "msg-%d", i); + pulsar_message_t *msg = pulsar_message_create(); + pulsar_message_set_content(msg, content, strlen(content)); + pulsar_producer_send(producer, msg); + pulsar_message_free(msg); + } + + uint64_t currentTime = pulsar::TimeUtils::currentTimeMillis(); + + pulsar_reader_seek_by_timestamp(reader, currentTime); + + pulsar_message_t *message; + ASSERT_EQ(pulsar_result_Timeout, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); + + pulsar_reader_seek_by_timestamp(reader, currentTime - 100000); // Seek to 100 seconds ago + + ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-0"); + + // Test seek asynchronously + std::promise seek_promise; + std::future seek_future = seek_promise.get_future(); + struct seek_ctx seek_ctx = {&seek_promise}; + pulsar_reader_seek_by_timestamp_async(reader, currentTime, seek_callback, &seek_ctx); + ASSERT_EQ(pulsar_result_Ok, seek_future.get()); + ASSERT_EQ(pulsar_result_Timeout, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); + + pulsar_reader_free(reader); + pulsar_reader_configuration_free(reader_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); + pulsar_client_configuration_free(conf); +} + From 7f461069f6f71d3dd2ddbfe23b9276e1b7d8789a Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 15 Nov 2022 15:44:59 +0800 Subject: [PATCH 4/6] Fix format --- tests/c/c_SeekTest.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/c/c_SeekTest.cc b/tests/c/c_SeekTest.cc index 794f1a5f..1c3d900f 100644 --- a/tests/c/c_SeekTest.cc +++ b/tests/c/c_SeekTest.cc @@ -146,7 +146,6 @@ TEST(c_SeekTest, testConsumerSeekTime) { pulsar_client_configuration_free(conf); } - TEST(c_SeekTest, testReaderSeekMessageId) { const char *lookup_url = "pulsar://localhost:6650"; auto topic_name_str = "test-c-reader-seek-msgid-" + std::to_string(time(nullptr)); @@ -162,7 +161,8 @@ TEST(c_SeekTest, testReaderSeekMessageId) { pulsar_reader_configuration_t *reader_conf = pulsar_reader_configuration_create(); pulsar_reader_t *reader; - result = pulsar_client_create_reader(client, topic_name, pulsar_message_id_earliest(), reader_conf, &reader); + result = + pulsar_client_create_reader(client, topic_name, pulsar_message_id_earliest(), reader_conf, &reader); ASSERT_EQ(pulsar_result_Ok, result); pulsar_message_t *seek_message; @@ -191,8 +191,7 @@ TEST(c_SeekTest, testReaderSeekMessageId) { std::promise seek_promise; std::future seek_future = seek_promise.get_future(); struct seek_ctx seek_ctx = {&seek_promise}; - pulsar_reader_seek_async(reader, pulsar_message_get_message_id(seek_message), seek_callback, - &seek_ctx); + pulsar_reader_seek_async(reader, pulsar_message_get_message_id(seek_message), seek_callback, &seek_ctx); ASSERT_EQ(pulsar_result_Ok, seek_future.get()); ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); @@ -261,4 +260,3 @@ TEST(c_SeekTest, testReaderSeekTime) { pulsar_client_free(client); pulsar_client_configuration_free(conf); } - From c165d0d8835a3dc66ea0d93ad8008755af0a604d Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 15 Nov 2022 16:13:49 +0800 Subject: [PATCH 5/6] Refine the test --- tests/c/c_SeekTest.cc | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/tests/c/c_SeekTest.cc b/tests/c/c_SeekTest.cc index 1c3d900f..3c3193ae 100644 --- a/tests/c/c_SeekTest.cc +++ b/tests/c/c_SeekTest.cc @@ -32,13 +32,19 @@ static void seek_callback(pulsar_result async_result, void *ctx) { seek_ctx->promise->set_value(async_result); } -TEST(c_SeekTest, testConsumerSeekMessageId) { +void prepare_client(pulsar_client_t **client) { const char *lookup_url = "pulsar://localhost:6650"; + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + *client = pulsar_client_create(lookup_url, conf); + pulsar_client_configuration_free(conf); +} + +TEST(c_SeekTest, testConsumerSeekMessageId) { auto topic_name_str = "test-c-seek-msgid-" + std::to_string(time(nullptr)); const char *topic_name = topic_name_str.c_str(); - pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); - pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + pulsar_client_t *client; + prepare_client(&client); pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); pulsar_producer_t *producer; @@ -88,16 +94,14 @@ TEST(c_SeekTest, testConsumerSeekMessageId) { pulsar_producer_free(producer); pulsar_producer_configuration_free(producer_conf); pulsar_client_free(client); - pulsar_client_configuration_free(conf); } TEST(c_SeekTest, testConsumerSeekTime) { - const char *lookup_url = "pulsar://localhost:6650"; auto topic_name_str = "test-c-seek-time-" + std::to_string(time(nullptr)); const char *topic_name = topic_name_str.c_str(); - pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); - pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + pulsar_client_t *client; + prepare_client(&client); pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); pulsar_producer_t *producer; @@ -143,16 +147,14 @@ TEST(c_SeekTest, testConsumerSeekTime) { pulsar_producer_free(producer); pulsar_producer_configuration_free(producer_conf); pulsar_client_free(client); - pulsar_client_configuration_free(conf); } TEST(c_SeekTest, testReaderSeekMessageId) { - const char *lookup_url = "pulsar://localhost:6650"; auto topic_name_str = "test-c-reader-seek-msgid-" + std::to_string(time(nullptr)); const char *topic_name = topic_name_str.c_str(); - pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); - pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + pulsar_client_t *client; + prepare_client(&client); pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); pulsar_producer_t *producer; @@ -202,16 +204,14 @@ TEST(c_SeekTest, testReaderSeekMessageId) { pulsar_producer_free(producer); pulsar_producer_configuration_free(producer_conf); pulsar_client_free(client); - pulsar_client_configuration_free(conf); } TEST(c_SeekTest, testReaderSeekTime) { - const char *lookup_url = "pulsar://localhost:6650"; auto topic_name_str = "test-c-reader-seek-time-" + std::to_string(time(nullptr)); const char *topic_name = topic_name_str.c_str(); - pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); - pulsar_client_t *client = pulsar_client_create(lookup_url, conf); + pulsar_client_t *client; + prepare_client(&client); pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); pulsar_producer_t *producer; @@ -258,5 +258,4 @@ TEST(c_SeekTest, testReaderSeekTime) { pulsar_producer_free(producer); pulsar_producer_configuration_free(producer_conf); pulsar_client_free(client); - pulsar_client_configuration_free(conf); } From 6804821b90e799b5acf698fc109d4ea246c3c6e9 Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Tue, 15 Nov 2022 16:17:46 +0800 Subject: [PATCH 6/6] =?UTF-8?q?Fix=20=E2=80=98seek=5Fmessage=E2=80=99=20ma?= =?UTF-8?q?y=20be=20used=20uninitialized=20in=20the=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/c/c_SeekTest.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/c/c_SeekTest.cc b/tests/c/c_SeekTest.cc index 3c3193ae..cfa8a188 100644 --- a/tests/c/c_SeekTest.cc +++ b/tests/c/c_SeekTest.cc @@ -56,7 +56,7 @@ TEST(c_SeekTest, testConsumerSeekMessageId) { result = pulsar_client_subscribe(client, topic_name, "seek-time", consumer_conf, &consumer); ASSERT_EQ(pulsar_result_Ok, result); - pulsar_message_t *seek_message; + pulsar_message_t *seek_message = nullptr; for (int i = 0; i < 10; i++) { char content[10]; @@ -88,7 +88,9 @@ TEST(c_SeekTest, testConsumerSeekMessageId) { ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); - pulsar_message_free(seek_message); + if (seek_message != NULL) { + pulsar_message_free(seek_message); + } pulsar_consumer_free(consumer); pulsar_consumer_configuration_free(consumer_conf); pulsar_producer_free(producer); @@ -167,7 +169,7 @@ TEST(c_SeekTest, testReaderSeekMessageId) { pulsar_client_create_reader(client, topic_name, pulsar_message_id_earliest(), reader_conf, &reader); ASSERT_EQ(pulsar_result_Ok, result); - pulsar_message_t *seek_message; + pulsar_message_t *seek_message = nullptr; for (int i = 0; i < 10; i++) { char content[10]; @@ -198,7 +200,9 @@ TEST(c_SeekTest, testReaderSeekMessageId) { ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); - pulsar_message_free(seek_message); + if (seek_message != NULL) { + pulsar_message_free(seek_message); + } pulsar_reader_free(reader); pulsar_reader_configuration_free(reader_conf); pulsar_producer_free(producer);