Skip to content

Conversation

@shibd
Copy link
Member

@shibd shibd commented Jan 9, 2023

Master Issue: apache/pulsar#2543

Motivation

#163

ServiceUrlProvider is provided so that users can dynamically update serviceUrl, and can be implemented #164 cluster level auto failover.

Modifications

  • Add the ServiceUrlProvider function define.
  • Add the updateServiceUrl method in the Client.
  • Client add new constructor method to support set ServiceUrlProvider instance param.

Verifying this change

  • Add ServiceUrlProviderTest to cover custom ServiceUrlProvider.

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to support passing a std::function<std::string()>. The initialize and close methods are redundant. IMO, the interface was not designed well in Java caused by the abuse of virtual functions. It makes it impossible to use a lambda as the service URL provider in Java.

If you really want to wrap the initialization and close logics, you can implement the function like:

class ServiceUrlProvider {
 public:
  ServiceUrlProvider(/* we can pass any type we want */) { /* initializations ... */ }
  void close() {/* close the resource... */}
  std:string operator()() const { /* ... */ }
};

For example, if we need to load the available URLs from a file periodically, we can pass the file path in the constructor and close the files in close.

There are also internal implementations in Java client like AutoClusterFailover. However, it actually makes use of PulsarClientImpl, not PulsarClient.

@shibd shibd requested a review from BewareMyPower January 13, 2023 07:14
@shibd
Copy link
Member Author

shibd commented Jan 13, 2023

We only need to support passing a std::functionstd::string(). The initialize and close methods are redundant.

Yes, you are right. The initialize and close methods are redundant.

In fact, The client already provides the updateServiceUrl method, which the user can use to implement dynamic update service URL. As for how the user implements it and how to close his resources, this is left to the user.

I changed to pass a std::function<const std::string&()> when creating a Client.

@shibd shibd self-assigned this Jan 19, 2023
@shibd shibd added the enhancement New feature or request label Jan 19, 2023
@shibd shibd added this to the 3.2.0 milestone Jan 19, 2023
@shibd shibd force-pushed the service_url_provider branch from cd94e93 to be414f7 Compare January 19, 2023 04:20
Comment on lines +70 to +80
/**
* Create a Pulsar client object connecting to the specified cluster address and using the default
* configuration.
*
* <p>Instead of specifying a static service URL string (with {@link #serviceUrl(String)}), an application
* can pass a {@link ServiceUrlProvider} function that dynamically provide a service URL.
*
* @param serviceUrlProvider The serviceUrlProvider used to generate ServiceUrl.
* @throw std::invalid_argument if `serviceUrlProvider()` return a invalid url.
*/
Client(ServiceUrlProvider serviceUrlProvider);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* Create a Pulsar client object connecting to the specified cluster address and using the default
* configuration.
*
* <p>Instead of specifying a static service URL string (with {@link #serviceUrl(String)}), an application
* can pass a {@link ServiceUrlProvider} function that dynamically provide a service URL.
*
* @param serviceUrlProvider The serviceUrlProvider used to generate ServiceUrl.
* @throw std::invalid_argument if `serviceUrlProvider()` return a invalid url.
*/
Client(ServiceUrlProvider serviceUrlProvider);
/**
* @see Client(ServiceUrlProvider, const ClientConfiguration&)
*/
Client(ServiceUrlProvider serviceUrlProvider) : Client(serviceUrlProvider, ClientConfiguration{}) {}

Avoid unnecessary duplicated code and API docs.

@BewareMyPower
Copy link
Contributor

The initialize and close methods are redundant.

I reviewed the ServiceUrlProvider in Java client again. I have thought getServiceUrl() is called each time the service URL is needed (e.g. topic lookup) but it's actually not. The update of the service URL is actually done by PulsarClient#updateServiceUrl, which is triggered by the periodical task of ServiceUrlProvide#initialize.

If we want to implement AutoClusterFailover and ControlledClusterFailover in future, we actually don't need to implement a function that returns the service URL. Instead, we only need to implement the following methods:

  • PulsarClient#updateServiceUrl
  • PulsarClientImpl#updateAuthentication
  • PulsarClientImpl#updateTlsTrustCertsFilePath

The way of using ServiceUrlProvider is bad. It's more like a provider that can modify the internal state of PulsarClient.

For C++ client, this PR implements the similar way with Java that the ServiceUrlProvider (a function that returns the service URL) is only triggerred when constructing the Client instance. There will be no difference between

ServiceUrlProvider provider = /* ... */;
Client client(provider());

and

ServiceUrlProvider provider = /* ... */;
String serviceUrl = provider();
Client client(serviceUrl);

l'm going to send an email to dev mail list to discuss.

@shibd
Copy link
Member Author

shibd commented Jan 19, 2023

If we want to implement AutoClusterFailover and ControlledClusterFailover in future, we actually don't need to implement a function that returns the service URL. Instead, we only need to implement the following methods:

Yes, Actually, the ServiceUrlProvider implemented here is just convenient for the user to return a function in order to get the service URL from elsewhere.

If we do not expose the set ServiceUrlProvider, It's no problem too.

Use ServiceUrlProvider

Client client([]() {
    // get service URL from store.
    auto serviceUrl = xxx
    return serviceUrl;
})

ServiceUrlProvider is not used

    // get service URL from store.
    auto serviceUrl = xxx
    Client client(serviceUrl);

I exposed it just for ease of use.

@shibd
Copy link
Member Author

shibd commented Jan 19, 2023

If you feel redundant to expose this method, I can remove it. What do you think? /cc @BewareMyPower

@BewareMyPower
Copy link
Contributor

@BewareMyPower
Copy link
Contributor

In short, a natural design is to save the ServiceUrlProvider in Client and call getServiceUrl() method (in C++ it's just a function invocation) each time we need a service URL. Maybe we need to maintain a key-value (serviceUrl -> ServiceNameResolver), when the serviceUrlProvider() returns a different result from the key, creating a new ServiceNameResolver for related lookup requests.

@shibd
Copy link
Member Author

shibd commented Jan 19, 2023

In short, a natural design is to save the ServiceUrlProvider in Client and call getServiceUrl() method (in C++ it's just a function invocation) each time we need a service URL. Maybe we need to maintain a key-value (serviceUrl -> ServiceNameResolver), when the serviceUrlProvider() returns a different result from the key, creating a new ServiceNameResolver for related lookup requests.

I think only need the client to provide the updateServiceUrl method.

Users can take advantage of this approach to implement their own services. (Pseudocode)

class Store;
class AutomaticClusterService {
   public:
    explicit AutomaticClusterService(const Store& store, const Client pulsarClient)
        : store_(store), pulsarClient_(pulsarClient) {
        serviceUrlCache_ = store.queryServiceUrl();
        updateServiceUrlInterval_ = boost::posix_time::seconds(60);
        runUpdateServiceurl();
    }

    void start() {
        runUpdateServiceurl();
    }

    const std::string& getServiceUrl(){
        return serviceUrlCache_;
    }

    const void runUpdateServiceurl() {
        timer_->expires_from_now(updateServiceUrlInterval_);
        auto availableServiceUrl = store_.queryServiceUrl();
        timer_->async_wait([this](const boost::system::error_code& ec) {
          // Maybe some other implementation
          if (serviceUrlCache_ != availableServiceUrl) {
              serviceUrlCache_ = availableServiceUrl;
              pulsarClient_.updateServiceUrl(serviceUrlCache_);
            }
            runUpdateServiceurl();
        });
    }


   private:
    // External storage services
    Store store_;
    const Client pulsarClient_;
    std::string serviceUrlCache_;
    std::shared_ptr<boost::asio::deadline_timer> timer_;
    boost::posix_time::time_duration updateServiceUrlInterval_;
};

Store store;
Client client(store.queryServiceUrl());

AutomaticClusterService  automaticClusterService(store, client);
automaticClusterService.start();

//....

@BewareMyPower
Copy link
Contributor

I think only need the client to provide the updateServiceUrl method.

This idea also looks good to me.

@BewareMyPower
Copy link
Contributor

@shibd We should delay this feature until a result of this discussion.

@BewareMyPower
Copy link
Contributor

Close this PR for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants