diff --git a/include/pulsar/c/consumer.h b/include/pulsar/c/consumer.h index 99ff8c80..bb8ae37f 100644 --- a/include/pulsar/c/consumer.h +++ b/include/pulsar/c/consumer.h @@ -241,11 +241,51 @@ PULSAR_PUBLIC pulsar_result resume_message_listener(pulsar_consumer_t *consumer) */ PULSAR_PUBLIC void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer); +/** + * Reset the subscription associated with this consumer to a specific message id. + * + * @param consumer The consumer + * @param messageId The message id can either be a specific message or represent the first or last messages in + * the topic. + * @param callback The callback for this async operation + * @param ctx The context for the callback + */ PULSAR_PUBLIC void pulsar_consumer_seek_async(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId, pulsar_result_callback callback, void *ctx); +/** + * Reset the subscription asynchronously associated with this consumer to a specific message id. + * + * @param consumer The consumer + * @param messageId The message id can either be a specific message or represent the first or last messages in + * the topic. + * @return Operation result + */ PULSAR_PUBLIC pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId); +/** + * Reset the subscription associated with this consumer to a specific message publish time. + * + * @param consumer The consumer + * @param timestamp The message publish time where to reposition the subscription. The timestamp format should + * be Unix time in milliseconds. + * @param callback The callback for this async operation + * @param ctx The context for the callback + */ +PULSAR_PUBLIC void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, uint64_t timestamp, + pulsar_result_callback callback, void *ctx); + +/** + * Reset the subscription asynchronously associated with this consumer to a specific message publish time. + * + * @param consumer The consumer + * @param timestamp The message publish time where to reposition the subscription. The timestamp format should + * be Unix time in milliseconds. + * @return Operation result + */ +PULSAR_PUBLIC pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer, + uint64_t timestamp); + PULSAR_PUBLIC int pulsar_consumer_is_connected(pulsar_consumer_t *consumer); PULSAR_PUBLIC pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer, diff --git a/include/pulsar/c/reader.h b/include/pulsar/c/reader.h index 4c546f80..12321fd2 100644 --- a/include/pulsar/c/reader.h +++ b/include/pulsar/c/reader.h @@ -59,6 +59,50 @@ PULSAR_PUBLIC pulsar_result pulsar_reader_read_next(pulsar_reader_t *reader, pul PULSAR_PUBLIC pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, pulsar_message_t **msg, int timeoutMs); +/** + * Reset the subscription associated with this reader to a specific message id. + * + * @param reader The reader + * @param messageId The message id can either be a specific message or represent the first or last messages in + * the topic. + * @param callback The callback for this async operation + * @param ctx The context for the callback + */ +PULSAR_PUBLIC void pulsar_reader_seek_async(pulsar_reader_t *reader, pulsar_message_id_t *messageId, + pulsar_result_callback callback, void *ctx); + +/** + * Reset the subscription asynchronously associated with this reader to a specific message id. + * + * @param reader The reader + * @param messageId The message id can either be a specific message or represent the first or last messages in + * the topic. + * @return Operation result + */ +PULSAR_PUBLIC pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, pulsar_message_id_t *messageId); + +/** + * Reset the subscription associated with this reader to a specific message publish time. + * + * @param reader The reader + * @param timestamp The message publish time where to reposition the subscription. The timestamp format should + * be Unix time in milliseconds. + * @param callback The callback for this async operation + * @param ctx The context for the callback + */ +PULSAR_PUBLIC void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t *reader, uint64_t timestamp, + pulsar_result_callback callback, void *ctx); + +/** + * Reset the subscription asynchronously associated with this reader to a specific message publish time. + * + * @param reader The reader + * @param timestamp The message publish time where to reposition the subscription. The timestamp format should + * be Unix time in milliseconds. + * @return Operation result + */ +PULSAR_PUBLIC pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t *reader, uint64_t timestamp); + PULSAR_PUBLIC pulsar_result pulsar_reader_close(pulsar_reader_t *reader); PULSAR_PUBLIC void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, diff --git a/lib/c/c_Consumer.cc b/lib/c/c_Consumer.cc index 062c801f..df4c9f3f 100644 --- a/lib/c/c_Consumer.cc +++ b/lib/c/c_Consumer.cc @@ -156,6 +156,16 @@ pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_i return (pulsar_result)consumer->consumer.seek(messageId->messageId); } +void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, uint64_t timestamp, + pulsar_result_callback callback, void *ctx) { + consumer->consumer.seekAsync(timestamp, + std::bind(handle_result_callback, std::placeholders::_1, callback, ctx)); +} + +pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer, uint64_t timestamp) { + return (pulsar_result)consumer->consumer.seek(timestamp); +} + int pulsar_consumer_is_connected(pulsar_consumer_t *consumer) { return consumer->consumer.isConnected(); } pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer, diff --git a/lib/c/c_Reader.cc b/lib/c/c_Reader.cc index 3490b540..c4bdc497 100644 --- a/lib/c/c_Reader.cc +++ b/lib/c/c_Reader.cc @@ -45,6 +45,26 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls return (pulsar_result)res; } +void pulsar_reader_seek_async(pulsar_reader_t *reader, pulsar_message_id_t *messageId, + pulsar_result_callback callback, void *ctx) { + reader->reader.seekAsync(messageId->messageId, + std::bind(handle_result_callback, std::placeholders::_1, callback, ctx)); +} + +pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, pulsar_message_id_t *messageId) { + return (pulsar_result)reader->reader.seek(messageId->messageId); +} + +void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t *reader, uint64_t timestamp, + pulsar_result_callback callback, void *ctx) { + reader->reader.seekAsync(timestamp, + std::bind(handle_result_callback, std::placeholders::_1, callback, ctx)); +} + +pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t *reader, uint64_t timestamp) { + return (pulsar_result)reader->reader.seek(timestamp); +} + pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return (pulsar_result)reader->reader.close(); } void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx) { diff --git a/tests/c/c_SeekTest.cc b/tests/c/c_SeekTest.cc new file mode 100644 index 00000000..cfa8a188 --- /dev/null +++ b/tests/c/c_SeekTest.cc @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include + +struct seek_ctx { + std::promise *promise; +}; + +static void seek_callback(pulsar_result async_result, void *ctx) { + auto *seek_ctx = (struct seek_ctx *)ctx; + seek_ctx->promise->set_value(async_result); +} + +void prepare_client(pulsar_client_t **client) { + const char *lookup_url = "pulsar://localhost:6650"; + pulsar_client_configuration_t *conf = pulsar_client_configuration_create(); + *client = pulsar_client_create(lookup_url, conf); + pulsar_client_configuration_free(conf); +} + +TEST(c_SeekTest, testConsumerSeekMessageId) { + auto topic_name_str = "test-c-seek-msgid-" + std::to_string(time(nullptr)); + const char *topic_name = topic_name_str.c_str(); + + pulsar_client_t *client; + prepare_client(&client); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); + pulsar_consumer_t *consumer; + result = pulsar_client_subscribe(client, topic_name, "seek-time", consumer_conf, &consumer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_message_t *seek_message = nullptr; + + for (int i = 0; i < 10; i++) { + char content[10]; + sprintf(content, "msg-%d", i); + pulsar_message_t *msg = pulsar_message_create(); + pulsar_message_set_content(msg, content, strlen(content)); + pulsar_producer_send(producer, msg); + if (i == 5) { + seek_message = msg; + } else { + pulsar_message_free(msg); + } + } + + pulsar_consumer_seek(consumer, pulsar_message_get_message_id(seek_message)); + + pulsar_message_t *message; + ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); + pulsar_message_free(message); + + // Test seek asynchronously + std::promise seek_promise; + std::future seek_future = seek_promise.get_future(); + struct seek_ctx seek_ctx = {&seek_promise}; + pulsar_consumer_seek_async(consumer, pulsar_message_get_message_id(seek_message), seek_callback, + &seek_ctx); + ASSERT_EQ(pulsar_result_Ok, seek_future.get()); + ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); + + if (seek_message != NULL) { + pulsar_message_free(seek_message); + } + pulsar_consumer_free(consumer); + pulsar_consumer_configuration_free(consumer_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); +} + +TEST(c_SeekTest, testConsumerSeekTime) { + auto topic_name_str = "test-c-seek-time-" + std::to_string(time(nullptr)); + const char *topic_name = topic_name_str.c_str(); + + pulsar_client_t *client; + prepare_client(&client); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create(); + pulsar_consumer_t *consumer; + result = pulsar_client_subscribe(client, topic_name, "seek-time", consumer_conf, &consumer); + ASSERT_EQ(pulsar_result_Ok, result); + + for (int i = 0; i < 10; i++) { + char content[10]; + sprintf(content, "msg-%d", i); + pulsar_message_t *msg = pulsar_message_create(); + pulsar_message_set_content(msg, content, strlen(content)); + pulsar_producer_send(producer, msg); + pulsar_message_free(msg); + } + + uint64_t currentTime = pulsar::TimeUtils::currentTimeMillis(); + + pulsar_consumer_seek_by_timestamp(consumer, currentTime); + + pulsar_message_t *message; + ASSERT_EQ(pulsar_result_Timeout, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + + pulsar_consumer_seek_by_timestamp(consumer, currentTime - 100000); // Seek to 100 seconds ago + + ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-0"); + + // Test seek asynchronously + std::promise seek_promise; + std::future seek_future = seek_promise.get_future(); + struct seek_ctx seek_ctx = {&seek_promise}; + pulsar_consumer_seek_by_timestamp_async(consumer, currentTime, seek_callback, &seek_ctx); + ASSERT_EQ(pulsar_result_Ok, seek_future.get()); + ASSERT_EQ(pulsar_result_Timeout, pulsar_consumer_receive_with_timeout(consumer, &message, 1000)); + + pulsar_consumer_free(consumer); + pulsar_consumer_configuration_free(consumer_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); +} + +TEST(c_SeekTest, testReaderSeekMessageId) { + auto topic_name_str = "test-c-reader-seek-msgid-" + std::to_string(time(nullptr)); + const char *topic_name = topic_name_str.c_str(); + + pulsar_client_t *client; + prepare_client(&client); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_reader_configuration_t *reader_conf = pulsar_reader_configuration_create(); + pulsar_reader_t *reader; + result = + pulsar_client_create_reader(client, topic_name, pulsar_message_id_earliest(), reader_conf, &reader); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_message_t *seek_message = nullptr; + + for (int i = 0; i < 10; i++) { + char content[10]; + sprintf(content, "msg-%d", i); + pulsar_message_t *msg = pulsar_message_create(); + pulsar_message_set_content(msg, content, strlen(content)); + pulsar_producer_send(producer, msg); + if (i == 5) { + seek_message = msg; + } else { + pulsar_message_free(msg); + } + } + + pulsar_reader_seek(reader, pulsar_message_get_message_id(seek_message)); + + pulsar_message_t *message; + ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); + pulsar_message_free(message); + + // Test seek asynchronously + std::promise seek_promise; + std::future seek_future = seek_promise.get_future(); + struct seek_ctx seek_ctx = {&seek_promise}; + pulsar_reader_seek_async(reader, pulsar_message_get_message_id(seek_message), seek_callback, &seek_ctx); + ASSERT_EQ(pulsar_result_Ok, seek_future.get()); + ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-6"); + + if (seek_message != NULL) { + pulsar_message_free(seek_message); + } + pulsar_reader_free(reader); + pulsar_reader_configuration_free(reader_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); +} + +TEST(c_SeekTest, testReaderSeekTime) { + auto topic_name_str = "test-c-reader-seek-time-" + std::to_string(time(nullptr)); + const char *topic_name = topic_name_str.c_str(); + + pulsar_client_t *client; + prepare_client(&client); + + pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create(); + pulsar_producer_t *producer; + pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer); + ASSERT_EQ(pulsar_result_Ok, result); + + pulsar_reader_configuration_t *reader_conf = pulsar_reader_configuration_create(); + pulsar_reader_t *reader; + result = + pulsar_client_create_reader(client, topic_name, pulsar_message_id_earliest(), reader_conf, &reader); + ASSERT_EQ(pulsar_result_Ok, result); + + for (int i = 0; i < 10; i++) { + char content[10]; + sprintf(content, "msg-%d", i); + pulsar_message_t *msg = pulsar_message_create(); + pulsar_message_set_content(msg, content, strlen(content)); + pulsar_producer_send(producer, msg); + pulsar_message_free(msg); + } + + uint64_t currentTime = pulsar::TimeUtils::currentTimeMillis(); + + pulsar_reader_seek_by_timestamp(reader, currentTime); + + pulsar_message_t *message; + ASSERT_EQ(pulsar_result_Timeout, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); + + pulsar_reader_seek_by_timestamp(reader, currentTime - 100000); // Seek to 100 seconds ago + + ASSERT_EQ(pulsar_result_Ok, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); + ASSERT_STREQ((const char *)pulsar_message_get_data(message), "msg-0"); + + // Test seek asynchronously + std::promise seek_promise; + std::future seek_future = seek_promise.get_future(); + struct seek_ctx seek_ctx = {&seek_promise}; + pulsar_reader_seek_by_timestamp_async(reader, currentTime, seek_callback, &seek_ctx); + ASSERT_EQ(pulsar_result_Ok, seek_future.get()); + ASSERT_EQ(pulsar_result_Timeout, pulsar_reader_read_next_with_timeout(reader, &message, 1000)); + + pulsar_reader_free(reader); + pulsar_reader_configuration_free(reader_conf); + pulsar_producer_free(producer); + pulsar_producer_configuration_free(producer_conf); + pulsar_client_free(client); +}