diff --git a/cloudfoundry-client-reactor/pom.xml b/cloudfoundry-client-reactor/pom.xml
index 92c1dd4ca6e..2aebe8fa6be 100644
--- a/cloudfoundry-client-reactor/pom.xml
+++ b/cloudfoundry-client-reactor/pom.xml
@@ -66,7 +66,7 @@
test
- io.projectreactor.ipc
+ io.projectreactor.netty
reactor-netty
diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/AbstractRootProvider.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/AbstractRootProvider.java
index ce54ca4f32d..b5b8998e441 100644
--- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/AbstractRootProvider.java
+++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/AbstractRootProvider.java
@@ -16,10 +16,17 @@
package org.cloudfoundry.reactor;
+import org.cloudfoundry.reactor.util.JsonCodec;
+import org.cloudfoundry.reactor.util.Operator;
+import org.cloudfoundry.reactor.util.OperatorContext;
+import org.cloudfoundry.reactor.util.UserAgent;
import org.immutables.value.Value;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
+
+import io.netty.handler.codec.http.HttpHeaders;
import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
import java.util.Optional;
import java.util.regex.Matcher;
@@ -41,19 +48,19 @@ public final void checkForValidApiHost() {
Matcher matcher = HOSTNAME_PATTERN.matcher(getApiHost());
if (!matcher.matches()) {
- throw new IllegalArgumentException(String.format("API hostname %s is not correctly formatted (e.g. 'api.local.pcfdev.io')", getApiHost()));
+ throw new IllegalArgumentException(String.format("API hostname %s is not correctly formatted (e.g. 'api.local.pcfdev.io')",
+ getApiHost()));
}
}
/**
- * The hostname of the API root. Typically something like {@code api.run.pivotal.io}.
+ * The hostname of the API root. Typically something like {@code api.run.pivotal.io}.
*/
public abstract String getApiHost();
@Override
public final Mono getRoot(ConnectionContext connectionContext) {
- Mono cached = doGetRoot(connectionContext)
- .delayUntil(uri -> trust(uri.getHost(), uri.getPort(), connectionContext))
+ Mono cached = doGetRoot(connectionContext).delayUntil(uri -> trust(uri.getHost(), uri.getPort(), connectionContext))
.map(UriComponents::toUriString);
return connectionContext.getCacheDuration()
@@ -63,8 +70,7 @@ public final Mono getRoot(ConnectionContext connectionContext) {
@Override
public final Mono getRoot(String key, ConnectionContext connectionContext) {
- Mono cached = doGetRoot(key, connectionContext)
- .delayUntil(uri -> trust(uri.getHost(), uri.getPort(), connectionContext))
+ Mono cached = doGetRoot(key, connectionContext).delayUntil(uri -> trust(uri.getHost(), uri.getPort(), connectionContext))
.map(UriComponents::toUriString);
return connectionContext.getCacheDuration()
@@ -77,7 +83,9 @@ public final Mono getRoot(String key, ConnectionContext connectionContex
protected abstract Mono doGetRoot(String key, ConnectionContext connectionContext);
protected final UriComponents getRoot() {
- UriComponentsBuilder builder = UriComponentsBuilder.newInstance().scheme("https").host(getApiHost());
+ UriComponentsBuilder builder = UriComponentsBuilder.newInstance()
+ .scheme("https")
+ .host(getApiHost());
getPort().ifPresent(builder::port);
return normalize(builder);
@@ -92,7 +100,8 @@ protected final UriComponents normalize(UriComponentsBuilder builder) {
builder.port(getPort().orElse(DEFAULT_PORT));
}
- return builder.build().encode();
+ return builder.build()
+ .encode();
}
/**
@@ -101,7 +110,7 @@ protected final UriComponents normalize(UriComponentsBuilder builder) {
abstract Optional getPort();
/**
- * Whether the connection to the root API should be secure (i.e. using HTTPS). Defaults to {@code true}.
+ * Whether the connection to the root API should be secure (i.e. using HTTPS). Defaults to {@code true}.
*/
abstract Optional getSecure();
@@ -117,4 +126,16 @@ private Mono trust(String host, int port, ConnectionContext connectionCont
return connectionContext.trust(host, port);
}
+ public Mono createOperator(ConnectionContext connectionContext) {
+ HttpClient httpClient = connectionContext.getHttpClient();
+ return getRoot(connectionContext).map(root -> OperatorContext.of(connectionContext, root))
+ .map(operatorContext -> new Operator(operatorContext, httpClient))
+ .map(operator -> operator.headers(this::addHeaders));
+ }
+
+ private void addHeaders(HttpHeaders httpHeaders) {
+ UserAgent.setUserAgent(httpHeaders);
+ JsonCodec.setDecodeHeaders(httpHeaders);
+ }
+
}
diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java
index 1ad135f75fc..ee1acfba99c 100644
--- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java
+++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java
@@ -18,7 +18,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import reactor.core.publisher.Mono;
-import reactor.ipc.netty.http.client.HttpClient;
+import reactor.netty.http.client.HttpClient;
import java.time.Duration;
import java.util.Optional;
diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java
index 586ff82d6e2..40c8c9b544c 100644
--- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java
+++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java
@@ -16,11 +16,24 @@
package org.cloudfoundry.reactor;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.deser.DeserializationProblemHandler;
-import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import io.netty.buffer.PooledByteBufAllocator;
+import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
+import static io.netty.channel.ChannelOption.SO_RCVBUF;
+import static io.netty.channel.ChannelOption.SO_SNDBUF;
+
+import java.lang.management.ManagementFactory;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.net.ssl.TrustManagerFactory;
+
import org.cloudfoundry.Nullable;
import org.cloudfoundry.reactor.util.ByteBufAllocatorMetricProviderWrapper;
import org.cloudfoundry.reactor.util.DefaultSslCertificateTruster;
@@ -29,38 +42,32 @@
import org.immutables.value.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Mono;
-import reactor.ipc.netty.http.client.HttpClient;
-import reactor.ipc.netty.resources.LoopResources;
-import reactor.ipc.netty.resources.PoolResources;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.management.JMException;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.lang.management.ManagementFactory;
-import java.time.Duration;
-import java.util.List;
-import java.util.Optional;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.deser.DeserializationProblemHandler;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
-import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
-import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
-import static io.netty.channel.ChannelOption.SO_RCVBUF;
-import static io.netty.channel.ChannelOption.SO_SNDBUF;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.handler.ssl.SslContextBuilder;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.resources.ConnectionProvider;
+import reactor.netty.resources.LoopResources;
+import reactor.netty.tcp.SslProvider;
+import reactor.netty.tcp.SslProvider.DefaultConfigurationType;
+import reactor.netty.tcp.TcpClient;
/**
- * The default implementation of the {@link ConnectionContext} interface. This is the implementation that should be used for most non-testing cases.
+ * The default implementation of the {@link ConnectionContext} interface. This is the implementation that should be used for most
+ * non-testing cases.
*/
@Value.Immutable
abstract class _DefaultConnectionContext implements ConnectionContext {
private static final int DEFAULT_PORT = 443;
- private static final int RECEIVE_BUFFER_SIZE = 10 * 1024 * 1024;
-
- private static final int SEND_BUFFER_SIZE = 10 * 1024 * 1024;
+ private static final int SEND_RECEIVE_BUFFER_SIZE = 10 * 1024 * 1024;
private final Logger logger = LoggerFactory.getLogger("cloudfoundry-client");
@@ -69,14 +76,16 @@ abstract class _DefaultConnectionContext implements ConnectionContext {
*/
@PreDestroy
public final void dispose() {
- getConnectionPool().ifPresent(PoolResources::dispose);
+ getConnectionProvider().ifPresent(ConnectionProvider::dispose);
getThreadPool().dispose();
try {
ObjectName name = getByteBufAllocatorObjectName();
- if (ManagementFactory.getPlatformMBeanServer().isRegistered(name)) {
- ManagementFactory.getPlatformMBeanServer().unregisterMBean(name);
+ if (ManagementFactory.getPlatformMBeanServer()
+ .isRegistered(name)) {
+ ManagementFactory.getPlatformMBeanServer()
+ .unregisterMBean(name);
}
} catch (JMException e) {
this.logger.error("Unable to register ByteBufAllocator MBean", e);
@@ -87,42 +96,74 @@ public final void dispose() {
public abstract Optional getCacheDuration();
/**
- * The number of connections to use when processing requests and responses. Setting this to {@code null} disables connection pooling.
+ * The number of connections to use when processing requests and responses. Setting this to {@code null} disables connection pooling.
*/
@Nullable
@Value.Default
public Integer getConnectionPoolSize() {
- return PoolResources.DEFAULT_POOL_MAX_CONNECTION;
+ return ConnectionProvider.DEFAULT_POOL_MAX_CONNECTIONS;
}
@Override
@Value.Default
public HttpClient getHttpClient() {
- return HttpClient.create(options -> {
- options
- .compression(true)
- .loopResources(getThreadPool())
- .option(SO_SNDBUF, SEND_BUFFER_SIZE)
- .option(SO_RCVBUF, RECEIVE_BUFFER_SIZE)
- .disablePool();
-
- options.sslSupport(ssl -> getSslCertificateTruster().ifPresent(trustManager -> ssl.trustManager(new StaticTrustManagerFactory(trustManager))));
-
- getConnectionPool().ifPresent(options::poolResources);
- getConnectTimeout().ifPresent(socketTimeout -> options.option(CONNECT_TIMEOUT_MILLIS, (int) socketTimeout.toMillis()));
- getKeepAlive().ifPresent(keepAlive -> options.option(SO_KEEPALIVE, keepAlive));
- getSslHandshakeTimeout().ifPresent(options::sslHandshakeTimeout);
- getSslCloseNotifyFlushTimeout().ifPresent(options::sslCloseNotifyFlushTimeout);
- getSslCloseNotifyReadTimeout().ifPresent(options::sslCloseNotifyReadTimeout);
- getProxyConfiguration().ifPresent(c -> c.configure(options));
- });
+ return createHttpClient().compress(true)
+ .tcpConfiguration(this::configureTcpClient)
+ .secure(this::configureSsl);
+ }
+
+ private HttpClient createHttpClient() {
+ return getConnectionProvider().map(HttpClient::create)
+ .orElse(HttpClient.create());
+ }
+
+ private TcpClient configureTcpClient(TcpClient tcpClient) {
+ tcpClient = configureProxy(tcpClient);
+ tcpClient = tcpClient.runOn(getThreadPool())
+ .option(SO_SNDBUF, SEND_RECEIVE_BUFFER_SIZE)
+ .option(SO_RCVBUF, SEND_RECEIVE_BUFFER_SIZE);
+ tcpClient = configureKeepAlive(tcpClient);
+ return configureConnectTimeout(tcpClient);
+ }
+
+ private TcpClient configureProxy(TcpClient tcpClient) {
+ return getProxyConfiguration().map(proxyConfiguration -> proxyConfiguration.configure(tcpClient))
+ .orElse(tcpClient);
+ }
+
+ private TcpClient configureKeepAlive(TcpClient tcpClient) {
+ return getKeepAlive().map(keepAlive -> tcpClient.option(SO_KEEPALIVE, keepAlive))
+ .orElse(tcpClient);
+ }
+
+ private TcpClient configureConnectTimeout(TcpClient tcpClient) {
+ return getConnectTimeout().map(connectTimeout -> tcpClient.option(CONNECT_TIMEOUT_MILLIS, (int) connectTimeout.toMillis()))
+ .orElse(tcpClient);
+ }
+
+ private void configureSsl(SslProvider.SslContextSpec ssl) {
+ SslProvider.Builder builder = ssl.sslContext(createSslContextBuilder())
+ .defaultConfiguration(DefaultConfigurationType.TCP);
+ getSslCloseNotifyReadTimeout().ifPresent(builder::closeNotifyReadTimeout);
+ getSslHandshakeTimeout().ifPresent(builder::handshakeTimeout);
+ getSslCloseNotifyFlushTimeout().ifPresent(builder::closeNotifyFlushTimeout);
+ }
+
+ private SslContextBuilder createSslContextBuilder() {
+ SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
+ getSslCertificateTruster().map(this::createTrustManagerFactory)
+ .ifPresent(sslContextBuilder::trustManager);
+ return sslContextBuilder;
+ }
+
+ private TrustManagerFactory createTrustManagerFactory(SslCertificateTruster sslCertificateTruster) {
+ return new StaticTrustManagerFactory(sslCertificateTruster);
}
@Override
@Value.Default
public ObjectMapper getObjectMapper() {
- ObjectMapper objectMapper = new ObjectMapper()
- .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+ ObjectMapper objectMapper = new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY)
.registerModule(new Jdk8Module())
.setSerializationInclusion(NON_NULL);
@@ -153,13 +194,12 @@ public Integer getThreadPoolSize() {
@Override
public Mono trust(String host, int port) {
- return getSslCertificateTruster()
- .map(t -> t.trust(host, port, Duration.ofSeconds(30)))
+ return getSslCertificateTruster().map(t -> t.trust(host, port, Duration.ofSeconds(30)))
.orElse(Mono.empty());
}
/**
- * The hostname of the API root. Typically something like {@code api.run.pivotal.io}.
+ * The hostname of the API root. Typically something like {@code api.run.pivotal.io}.
*/
abstract String getApiHost();
@@ -169,9 +209,9 @@ public Mono trust(String host, int port) {
abstract Optional getConnectTimeout();
@Value.Derived
- Optional getConnectionPool() {
+ Optional getConnectionProvider() {
return Optional.ofNullable(getConnectionPoolSize())
- .map(connectionPoolSize -> PoolResources.fixed("cloudfoundry-client", connectionPoolSize));
+ .map(connectionPoolSize -> ConnectionProvider.fixed("cloudfoundry-client", connectionPoolSize));
}
/**
@@ -185,7 +225,7 @@ Optional getConnectionPool() {
abstract Optional getPort();
/**
- * Jackson deserialization problem handlers. Typically only used for testing.
+ * Jackson deserialization problem handlers. Typically only used for testing.
*/
abstract List getProblemHandlers();
@@ -195,12 +235,12 @@ Optional getConnectionPool() {
abstract Optional getProxyConfiguration();
/**
- * Whether the connection to the root API should be secure (i.e. using HTTPS). Defaults to {@code true}.
+ * Whether the connection to the root API should be secure (i.e. using HTTPS). Defaults to {@code true}.
*/
abstract Optional getSecure();
/**
- * Whether to skip SSL certificate validation for all hosts reachable from the API host. Defaults to {@code false}.
+ * Whether to skip SSL certificate validation for all hosts reachable from the API host. Defaults to {@code false}.
*/
abstract Optional getSkipSslValidation();
@@ -238,19 +278,24 @@ void monitorByteBufAllocator() {
try {
ObjectName name = getByteBufAllocatorObjectName();
- if (ManagementFactory.getPlatformMBeanServer().isRegistered(name)) {
- this.logger.warn("MBean '{}' is already registered and will be removed. You should only have a single DefaultConnectionContext per endpoint.", name);
- ManagementFactory.getPlatformMBeanServer().unregisterMBean(name);
+ if (ManagementFactory.getPlatformMBeanServer()
+ .isRegistered(name)) {
+ this.logger.warn("MBean '{}' is already registered and will be removed. You should only have a single DefaultConnectionContext per endpoint.",
+ name);
+ ManagementFactory.getPlatformMBeanServer()
+ .unregisterMBean(name);
}
- ManagementFactory.getPlatformMBeanServer().registerMBean(new ByteBufAllocatorMetricProviderWrapper(PooledByteBufAllocator.DEFAULT), name);
+ ManagementFactory.getPlatformMBeanServer()
+ .registerMBean(new ByteBufAllocatorMetricProviderWrapper(PooledByteBufAllocator.DEFAULT), name);
} catch (JMException e) {
this.logger.error("Unable to register ByteBufAllocator MBean", e);
}
}
private ObjectName getByteBufAllocatorObjectName() throws MalformedObjectNameException {
- return ObjectName.getInstance(String.format("org.cloudfoundry.reactor:type=ByteBufAllocator,endpoint=%s/%d", getApiHost(), getPort().orElse(DEFAULT_PORT)));
+ return ObjectName.getInstance(String.format("org.cloudfoundry.reactor:type=ByteBufAllocator,endpoint=%s/%d", getApiHost(),
+ getPort().orElse(DEFAULT_PORT)));
}
}
diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_HttpClientResponseWithBody.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_HttpClientResponseWithBody.java
new file mode 100644
index 00000000000..903f7c1ef9e
--- /dev/null
+++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_HttpClientResponseWithBody.java
@@ -0,0 +1,17 @@
+package org.cloudfoundry.reactor;
+
+import org.immutables.value.Value;
+
+import reactor.netty.ByteBufFlux;
+import reactor.netty.http.client.HttpClientResponse;
+
+@Value.Immutable
+public interface _HttpClientResponseWithBody {
+
+ @Value.Parameter
+ HttpClientResponse getResponse();
+
+ @Value.Parameter
+ ByteBufFlux getBody();
+
+}
diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_InfoPayloadRootProvider.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_InfoPayloadRootProvider.java
index bbe524aeb09..5da7e152721 100644
--- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_InfoPayloadRootProvider.java
+++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_InfoPayloadRootProvider.java
@@ -16,17 +16,15 @@
package org.cloudfoundry.reactor;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.cloudfoundry.reactor.util.JsonCodec;
-import org.cloudfoundry.reactor.util.NetworkLogging;
-import org.cloudfoundry.reactor.util.UserAgent;
+import java.util.Map;
+
import org.immutables.value.Value;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
-import reactor.core.publisher.Mono;
-import reactor.ipc.netty.http.client.HttpClientRequest;
-import java.util.Map;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import reactor.core.publisher.Mono;
/**
* A {@link RootProvider} that returns endpoints extracted from the `/v2/info` API for the configured endpoint.
@@ -39,14 +37,13 @@ protected Mono doGetRoot(ConnectionContext connectionContext) {
}
protected Mono doGetRoot(String key, ConnectionContext connectionContext) {
- return getInfo(connectionContext)
- .map(info -> {
- if (!info.containsKey(key)) {
- throw new IllegalArgumentException(String.format("Info payload does not contain key '%s'", key));
- }
+ return getInfo(connectionContext).map(info -> {
+ if (!info.containsKey(key)) {
+ throw new IllegalArgumentException(String.format("Info payload does not contain key '%s'", key));
+ }
- return normalize(UriComponentsBuilder.fromUriString(info.get(key)));
- });
+ return normalize(UriComponentsBuilder.fromUriString(info.get(key)));
+ });
}
abstract ObjectMapper getObjectMapper();
@@ -54,19 +51,17 @@ protected Mono doGetRoot(String key, ConnectionContext connection
@SuppressWarnings("unchecked")
@Value.Derived
private Mono