Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions include/pulsar/c/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,51 @@ PULSAR_PUBLIC pulsar_result resume_message_listener(pulsar_consumer_t *consumer)
*/
PULSAR_PUBLIC void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer);

/**
* Reset the subscription associated with this consumer to a specific message id.
*
* @param consumer The consumer
* @param messageId The message id can either be a specific message or represent the first or last messages in
* the topic.
* @param callback The callback for this async operation
* @param ctx The context for the callback
*/
PULSAR_PUBLIC void pulsar_consumer_seek_async(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
pulsar_result_callback callback, void *ctx);

/**
* Reset the subscription asynchronously associated with this consumer to a specific message id.
*
* @param consumer The consumer
* @param messageId The message id can either be a specific message or represent the first or last messages in
* the topic.
* @return Operation result
*/
PULSAR_PUBLIC pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId);

/**
* Reset the subscription associated with this consumer to a specific message publish time.
*
* @param consumer The consumer
* @param timestamp The message publish time where to reposition the subscription. The timestamp format should
* be Unix time in milliseconds.
* @param callback The callback for this async operation
* @param ctx The context for the callback
*/
PULSAR_PUBLIC void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, uint64_t timestamp,
pulsar_result_callback callback, void *ctx);

/**
* Reset the subscription asynchronously associated with this consumer to a specific message publish time.
*
* @param consumer The consumer
* @param timestamp The message publish time where to reposition the subscription. The timestamp format should
* be Unix time in milliseconds.
* @return Operation result
*/
PULSAR_PUBLIC pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer,
uint64_t timestamp);

PULSAR_PUBLIC int pulsar_consumer_is_connected(pulsar_consumer_t *consumer);

PULSAR_PUBLIC pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,
Expand Down
44 changes: 44 additions & 0 deletions include/pulsar/c/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,50 @@ PULSAR_PUBLIC pulsar_result pulsar_reader_read_next(pulsar_reader_t *reader, pul
PULSAR_PUBLIC pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader,
pulsar_message_t **msg, int timeoutMs);

/**
* Reset the subscription associated with this reader to a specific message id.
*
* @param reader The reader
* @param messageId The message id can either be a specific message or represent the first or last messages in
* the topic.
* @param callback The callback for this async operation
* @param ctx The context for the callback
*/
PULSAR_PUBLIC void pulsar_reader_seek_async(pulsar_reader_t *reader, pulsar_message_id_t *messageId,
pulsar_result_callback callback, void *ctx);

/**
* Reset the subscription asynchronously associated with this reader to a specific message id.
*
* @param reader The reader
* @param messageId The message id can either be a specific message or represent the first or last messages in
* the topic.
* @return Operation result
*/
PULSAR_PUBLIC pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, pulsar_message_id_t *messageId);

/**
* Reset the subscription associated with this reader to a specific message publish time.
*
* @param reader The reader
* @param timestamp The message publish time where to reposition the subscription. The timestamp format should
* be Unix time in milliseconds.
* @param callback The callback for this async operation
* @param ctx The context for the callback
*/
PULSAR_PUBLIC void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t *reader, uint64_t timestamp,
pulsar_result_callback callback, void *ctx);

/**
* Reset the subscription asynchronously associated with this reader to a specific message publish time.
*
* @param reader The reader
* @param timestamp The message publish time where to reposition the subscription. The timestamp format should
* be Unix time in milliseconds.
* @return Operation result
*/
PULSAR_PUBLIC pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t *reader, uint64_t timestamp);

PULSAR_PUBLIC pulsar_result pulsar_reader_close(pulsar_reader_t *reader);

PULSAR_PUBLIC void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback,
Expand Down
10 changes: 10 additions & 0 deletions lib/c/c_Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_i
return (pulsar_result)consumer->consumer.seek(messageId->messageId);
}

void pulsar_consumer_seek_by_timestamp_async(pulsar_consumer_t *consumer, uint64_t timestamp,
pulsar_result_callback callback, void *ctx) {
consumer->consumer.seekAsync(timestamp,
std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
}

pulsar_result pulsar_consumer_seek_by_timestamp(pulsar_consumer_t *consumer, uint64_t timestamp) {
return (pulsar_result)consumer->consumer.seek(timestamp);
}

int pulsar_consumer_is_connected(pulsar_consumer_t *consumer) { return consumer->consumer.isConnected(); }

pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,
Expand Down
20 changes: 20 additions & 0 deletions lib/c/c_Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
return (pulsar_result)res;
}

void pulsar_reader_seek_async(pulsar_reader_t *reader, pulsar_message_id_t *messageId,
pulsar_result_callback callback, void *ctx) {
reader->reader.seekAsync(messageId->messageId,
std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
}

pulsar_result pulsar_reader_seek(pulsar_reader_t *reader, pulsar_message_id_t *messageId) {
return (pulsar_result)reader->reader.seek(messageId->messageId);
}

void pulsar_reader_seek_by_timestamp_async(pulsar_reader_t *reader, uint64_t timestamp,
pulsar_result_callback callback, void *ctx) {
reader->reader.seekAsync(timestamp,
std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
}

pulsar_result pulsar_reader_seek_by_timestamp(pulsar_reader_t *reader, uint64_t timestamp) {
return (pulsar_result)reader->reader.seek(timestamp);
}

pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return (pulsar_result)reader->reader.close(); }

void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx) {
Expand Down
Loading