Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
private PulsarClientImpl client;
private ConnectionPool connectionPool;
private ProxyService service;
private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
private State state;
private final Supplier<SslHandler> sslHandlerSupplier;
Expand Down Expand Up @@ -303,7 +302,6 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
try {
// init authn
this.clientConf = createClientConfiguration();
this.clientAuthentication = clientConf.getAuthentication();
int protocolVersion = getProtocolVersionToAdvertise(connect);

// authn not enabled, complete
Expand Down Expand Up @@ -427,10 +425,7 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters()));
}
clientConf.setAuthentication(this.getClientAuthentication());
if (proxyConfig.isTlsEnabledWithBroker()) {
clientConf.setUseTls(true);
if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
Expand Down Expand Up @@ -463,7 +458,7 @@ long newRequestId() {
}

public Authentication getClientAuthentication() {
return clientAuthentication;
return service.getProxyClientAuthenticationPlugin();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
Expand All @@ -69,6 +72,7 @@
public class ProxyService implements Closeable {

private final ProxyConfiguration proxyConfig;
private final Authentication proxyClientAuthentication;
private final Timer timer;
private String serviceUrl;
private String serviceUrlTls;
Expand Down Expand Up @@ -163,6 +167,12 @@ public ProxyService(ProxyConfiguration proxyConfig,
});
}, 60, TimeUnit.SECONDS);
this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
} else {
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
}
}

public void start() throws Exception {
Expand Down Expand Up @@ -367,5 +377,9 @@ public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataS
proxyConfig.getZookeeperSessionTimeoutMs());
}

public Authentication getProxyClientAuthenticationPlugin() {
return this.proxyClientAuthentication;
}

private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -144,9 +145,11 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
}

JsonObject element = JsonParser.parseString(commandData).getAsJsonObject();
log.info("Have log of {}", element);
long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString());
long currentTimeInMillis = System.currentTimeMillis();
if (expiryTimeInMillis < currentTimeInMillis) {
log.warn("Auth failed due to timeout");
throw new AuthenticationException("Authentication data has been expired");
}
return element.get("entityType").getAsString();
Expand Down Expand Up @@ -198,10 +201,10 @@ void testAuthentication() throws Exception {
String namespaceName = "my-property/my-ns";
String topicName = "persistent://my-property/my-ns/my-topic1";
String subscriptionName = "my-subscriber-name";
// expires after 6 seconds
String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 6 * 1000);
// expires after 3 seconds
String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3 * 1000);
// expires after 60 seconds
String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 60 * 1000);
// expires after 60 seconds
String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 60 * 1000);

admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
Expand Down Expand Up @@ -230,16 +233,18 @@ void testAuthentication() throws Exception {
proxyService.start();
final String proxyServiceUrl = proxyService.getServiceUrl();

// Step 3: Pass correct client params
// Step 3: Pass correct client params and use multiple connections
@Cleanup
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1);
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 3);
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
// Sleep for 4 seconds - wait for proxy auth params to expire
Thread.sleep(4 * 1000);
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
// Sleep for 3 seconds - wait for client auth parans to expire
Thread.sleep(3 * 1000);
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();

// Step 4: Ensure that all client contexts share the same auth provider
Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at least 3 clients");
proxyService.getClientCnxs().stream().forEach((cnx) -> {
Assert.assertSame(cnx.authenticationProvider, proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
});
}

private void updateAdminClient() throws PulsarClientException {
Expand Down