Skip to content

Commit fefc1fc

Browse files
Feature: configurable secondary cache and cache proxy timeouts (#162)
Co-authored-by: osulzhenko <125548596+osulzhenko@users.noreply.github.com>
1 parent 997a009 commit fefc1fc

File tree

5 files changed

+120
-49
lines changed

5 files changed

+120
-49
lines changed

src/main/java/org/prebid/cache/handlers/cache/GetCacheHandler.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.github.benmanes.caffeine.cache.Caffeine;
44
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
55
import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator;
6+
import io.netty.channel.ChannelOption;
67
import lombok.extern.slf4j.Slf4j;
78
import org.apache.commons.lang3.StringUtils;
89
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -23,14 +24,15 @@
2324
import org.springframework.beans.factory.annotation.Value;
2425
import org.springframework.http.HttpStatus;
2526
import org.springframework.http.MediaType;
27+
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
2628
import org.springframework.stereotype.Component;
2729
import org.springframework.web.reactive.function.client.ClientResponse;
2830
import org.springframework.web.reactive.function.client.WebClient;
2931
import org.springframework.web.reactive.function.server.ServerRequest;
3032
import org.springframework.web.reactive.function.server.ServerResponse;
3133
import reactor.core.publisher.Mono;
32-
import reactor.core.publisher.SynchronousSink;
3334
import reactor.core.scheduler.Schedulers;
35+
import reactor.netty.http.client.HttpClient;
3436

3537
import java.time.Duration;
3638
import java.util.Map;
@@ -120,39 +122,51 @@ private Mono<ServerResponse> processProxyRequest(final ServerRequest request,
120122
final String idKeyParam,
121123
final String cacheUrl) {
122124

123-
final WebClient webClient = clientsCache.computeIfAbsent(cacheUrl, WebClient::create);
125+
final WebClient webClient = clientsCache.computeIfAbsent(cacheUrl, this::createWebClient);
124126

125127
return webClient.get()
126128
.uri(uriBuilder -> uriBuilder.queryParam(ID_KEY, idKeyParam).build())
127129
.headers(httpHeaders -> httpHeaders.addAll(request.headers().asHttpHeaders()))
128-
.exchange()
130+
.exchangeToMono(clientResponse -> {
131+
updateProxyMetrics(clientResponse);
132+
return fromClientResponse(clientResponse);
133+
})
129134
.transform(CircuitBreakerOperator.of(circuitBreaker))
130135
.timeout(Duration.ofMillis(config.getTimeoutMs()))
131136
.subscribeOn(Schedulers.parallel())
132-
.handle(this::updateProxyMetrics)
133-
.flatMap(GetCacheHandler::fromClientResponse)
134137
.doOnError(error -> {
135138
metricsRecorder.getProxyFailure().increment();
136-
log.info("Failed to send request: '{}', cause: '{}'",
139+
log.error("Failed to send request: '{}', cause: '{}'",
137140
ExceptionUtils.getMessage(error), ExceptionUtils.getMessage(error));
138141
});
139142
}
140143

141-
private void updateProxyMetrics(final ClientResponse clientResponse,
142-
final SynchronousSink<ClientResponse> sink) {
144+
private WebClient createWebClient(String cacheUrl) {
145+
HttpClient httpClient = HttpClient.create()
146+
.responseTimeout(Duration.ofMillis(config.getTimeoutMs()))
147+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getTimeoutMs());
148+
149+
return WebClient.builder()
150+
.baseUrl(cacheUrl)
151+
.clientConnector(new ReactorClientHttpConnector(httpClient))
152+
.build();
153+
}
154+
155+
private void updateProxyMetrics(final ClientResponse clientResponse) {
143156
if (HttpStatus.OK.equals(clientResponse.statusCode())) {
144157
metricsRecorder.getProxySuccess().increment();
145158
} else {
146159
metricsRecorder.getProxyFailure().increment();
147160
}
148-
149-
sink.next(clientResponse);
150161
}
151162

152163
private static Mono<ServerResponse> fromClientResponse(final ClientResponse clientResponse) {
153-
return ServerResponse.status(clientResponse.statusCode())
154-
.headers(headerConsumer -> clientResponse.headers().asHttpHeaders().forEach(headerConsumer::addAll))
155-
.body(clientResponse.bodyToMono(String.class), String.class);
164+
// This is a workaround to handle the race condition when the response body is consumed
165+
// https://github.com/spring-projects/spring-boot/issues/15320
166+
return clientResponse.bodyToMono(String.class)
167+
.flatMap(body -> ServerResponse.status(clientResponse.statusCode())
168+
.headers(headers -> clientResponse.headers().asHttpHeaders().forEach(headers::addAll))
169+
.body(Mono.just(body), String.class));
156170
}
157171

158172
private Mono<ServerResponse> processRequest(final ServerRequest request, final String keyIdParam) {

src/main/java/org/prebid/cache/handlers/cache/PostCacheHandler.java

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.google.common.collect.ImmutableMap;
55
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
66
import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator;
7+
import io.netty.channel.ChannelOption;
78
import lombok.extern.slf4j.Slf4j;
89
import org.apache.commons.lang3.StringUtils;
910
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -30,18 +31,21 @@
3031
import org.springframework.http.HttpHeaders;
3132
import org.springframework.http.HttpStatus;
3233
import org.springframework.http.MediaType;
34+
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
3335
import org.springframework.stereotype.Component;
3436
import org.springframework.web.reactive.function.BodyExtractors;
3537
import org.springframework.web.reactive.function.client.WebClient;
3638
import org.springframework.web.reactive.function.server.ServerRequest;
3739
import org.springframework.web.reactive.function.server.ServerResponse;
3840
import reactor.core.publisher.Flux;
3941
import reactor.core.publisher.Mono;
42+
import reactor.core.publisher.ParallelFlux;
4043
import reactor.core.publisher.SynchronousSink;
4144
import reactor.core.scheduler.Schedulers;
45+
import reactor.netty.http.client.HttpClient;
4246

4347
import java.io.IOException;
44-
import java.util.ArrayList;
48+
import java.time.Duration;
4549
import java.util.HashMap;
4650
import java.util.List;
4751
import java.util.Map;
@@ -80,7 +84,15 @@ public PostCacheHandler(final ReactiveRepository<PayloadWrapper, String> reposit
8084
this.repository = repository;
8185
this.config = config;
8286
if (config.getSecondaryUris() != null) {
83-
config.getSecondaryUris().forEach(ip -> webClients.put(ip, WebClient.create(ip)));
87+
config.getSecondaryUris().forEach(url -> {
88+
HttpClient httpClient = HttpClient.create()
89+
.responseTimeout(Duration.ofMillis(config.getSecondaryCacheTimeoutMs()))
90+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getSecondaryCacheTimeoutMs());
91+
webClients.put(url, WebClient.builder()
92+
.baseUrl(url)
93+
.clientConnector(new ReactorClientHttpConnector(httpClient))
94+
.build());
95+
});
8496
}
8597
this.builder = builder;
8698
this.metricTagPrefix = "write";
@@ -202,33 +214,46 @@ private long adjustExpiry(Long expiry) {
202214
}
203215

204216
private void sendRequestToSecondaryPrebidCacheHosts(List<PayloadWrapper> payloadWrappers, String secondaryCache) {
205-
if (!"yes".equals(secondaryCache) && webClients.size() != 0) {
206-
final List<PayloadTransfer> payloadTransfers = new ArrayList<>();
207-
for (PayloadWrapper payloadWrapper : payloadWrappers) {
208-
payloadTransfers.add(wrapperToTransfer(payloadWrapper));
209-
}
217+
if (!"yes".equals(secondaryCache) && !webClients.isEmpty()) {
218+
Flux.fromIterable(payloadWrappers)
219+
.map(this::wrapperToTransfer)
220+
.collectList()
221+
.flatMapMany(this::createSecondaryCacheRequests)
222+
.subscribe();
223+
}
224+
}
225+
226+
private ParallelFlux<Void> createSecondaryCacheRequests(List<PayloadTransfer> payloadTransfers) {
227+
return Flux.fromIterable(webClients.entrySet())
228+
.parallel()
229+
.runOn(Schedulers.parallel())
230+
.flatMap(entry -> sendRequestToSecondaryCache(entry.getValue(), entry.getKey(), payloadTransfers));
231+
}
210232

211-
webClients.forEach((ip, webClient) -> webClient.post()
212-
.uri(uriBuilder -> uriBuilder.path(config.getSecondaryCachePath())
213-
.queryParam("secondaryCache", "yes").build())
214-
.contentType(MediaType.APPLICATION_JSON)
215-
.headers(enrichWithSecurityHeader())
216-
.bodyValue(RequestObject.of(payloadTransfers))
217-
.exchange()
218-
.transform(CircuitBreakerOperator.of(circuitBreaker))
219-
.doOnError(throwable -> {
233+
private Mono<Void> sendRequestToSecondaryCache(WebClient webClient,
234+
String url,
235+
List<PayloadTransfer> payloadTransfers) {
236+
return webClient.post()
237+
.uri(uriBuilder -> uriBuilder.path(config.getSecondaryCachePath())
238+
.queryParam("secondaryCache", "yes").build())
239+
.contentType(MediaType.APPLICATION_JSON)
240+
.headers(enrichWithSecurityHeader())
241+
.bodyValue(RequestObject.of(payloadTransfers))
242+
.exchangeToMono(clientResponse -> {
243+
if (clientResponse.statusCode() != HttpStatus.OK) {
220244
metricsRecorder.getSecondaryCacheWriteError().increment();
221-
log.info("Failed to send request: '{}', cause: '{}'",
222-
ExceptionUtils.getMessage(throwable), ExceptionUtils.getMessage(throwable));
223-
})
224-
.subscribe(clientResponse -> {
225-
if (clientResponse.statusCode() != HttpStatus.OK) {
226-
metricsRecorder.getSecondaryCacheWriteError().increment();
227-
log.debug(clientResponse.statusCode().toString());
228-
log.info("Failed to write to remote address : {}", ip);
229-
}
230-
}));
231-
}
245+
log.debug(clientResponse.statusCode().toString());
246+
log.error("Failed to write to remote address: {}", url);
247+
}
248+
return clientResponse.releaseBody();
249+
})
250+
.transform(CircuitBreakerOperator.of(circuitBreaker))
251+
.doOnError(throwable -> {
252+
metricsRecorder.getSecondaryCacheWriteError().increment();
253+
log.error("Failed to send request: '{}', cause: '{}'",
254+
ExceptionUtils.getMessage(throwable), ExceptionUtils.getMessage(throwable));
255+
})
256+
.then();
232257
}
233258

234259
private Consumer<HttpHeaders> enrichWithSecurityHeader() {

src/main/java/org/prebid/cache/repository/CacheConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ public class CacheConfig {
2222
private boolean allowExternalUUID;
2323
private List<String> secondaryUris;
2424
private String secondaryCachePath;
25+
private int secondaryCacheTimeoutMs;
2526
private int clientsCacheDuration;
2627
private int clientsCacheSize;
2728
private String allowedProxyHost;
2829
private String hostParamProtocol;
2930
}
30-

src/test/java/org/prebid/cache/handlers/PostCacheHandlerTests.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,19 @@ void testSecondaryCacheSuccess() {
176176
@Test
177177
void testExternalUUIDInvalid() {
178178
//given
179-
final var cacheConfigLocal = new CacheConfig(cacheConfig.getPrefix(), cacheConfig.getExpirySec(),
179+
final var cacheConfigLocal = new CacheConfig(cacheConfig.getPrefix(),
180+
cacheConfig.getExpirySec(),
180181
cacheConfig.getTimeoutMs(),
181-
cacheConfig.getMinExpiry(), cacheConfig.getMaxExpiry(),
182-
false, Collections.emptyList(), cacheConfig.getSecondaryCachePath(), 100, 100, "example.com", "http");
182+
cacheConfig.getMinExpiry(),
183+
cacheConfig.getMaxExpiry(),
184+
false,
185+
Collections.emptyList(),
186+
cacheConfig.getSecondaryCachePath(),
187+
100,
188+
100,
189+
100,
190+
"example.com",
191+
"http");
183192
final var handler = new PostCacheHandler(repository, cacheConfigLocal, metricsRecorder, builder,
184193
webClientCircuitBreaker, samplingRate, apiConfig);
185194

@@ -207,10 +216,19 @@ void testUUIDDuplication() {
207216
.willReturn(Mono.just(PAYLOAD_WRAPPER))
208217
.willReturn(Mono.error(new DuplicateKeyException("")));
209218

210-
final CacheConfig cacheConfigLocal = new CacheConfig(cacheConfig.getPrefix(), cacheConfig.getExpirySec(),
219+
final CacheConfig cacheConfigLocal = new CacheConfig(cacheConfig.getPrefix(),
220+
cacheConfig.getExpirySec(),
211221
cacheConfig.getTimeoutMs(),
212-
5, cacheConfig.getMaxExpiry(), cacheConfig.isAllowExternalUUID(),
213-
Collections.emptyList(), cacheConfig.getSecondaryCachePath(), 100, 100, "example.com", "http");
222+
5,
223+
cacheConfig.getMaxExpiry(),
224+
cacheConfig.isAllowExternalUUID(),
225+
Collections.emptyList(),
226+
cacheConfig.getSecondaryCachePath(),
227+
100,
228+
100,
229+
100,
230+
"example.com",
231+
"http");
214232
final PostCacheHandler handler = new PostCacheHandler(repository, cacheConfigLocal, metricsRecorder, builder,
215233
webClientCircuitBreaker, samplingRate, apiConfig);
216234

src/test/kotlin/org/prebid/cache/functional/testcontainers/client/WebCacheContainerClient.kt

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@ class WebCacheContainerClient(mockServerHost: String, mockServerPort: Int) {
4040
.withBody(body, mediaType)
4141
)
4242

43-
fun getSecondaryCacheRecordedRequests(uuidKey: String): Array<out HttpRequest>? =
44-
mockServerClient.retrieveRecordedRequests(getSecondaryCacheRequest(uuidKey))
43+
fun getSecondaryCacheRecordedRequests(uuidKey: String): Array<out HttpRequest>? {
44+
val secondaryCacheRequest = getSecondaryCacheRequest(uuidKey)
45+
waitUntil({ mockServerClient.retrieveRecordedRequests(secondaryCacheRequest)!!.isNotEmpty() })
46+
return mockServerClient.retrieveRecordedRequests(secondaryCacheRequest)
47+
}
4548

4649
fun initSecondaryCacheResponse(): Array<out Expectation>? =
4750
mockServerClient.`when`(getSecondaryCacheRequest())
@@ -59,4 +62,15 @@ class WebCacheContainerClient(mockServerHost: String, mockServerPort: Int) {
5962
request().withMethod(POST.name())
6063
.withPath("/$WEB_CACHE_PATH")
6164
.withBody(jsonPath("\$.puts[?(@.key == '$uuidKey')]"))
65+
66+
private fun waitUntil(closure: () -> Boolean, timeoutMs: Long = 5000, pollInterval: Long = 100) {
67+
val startTime = System.currentTimeMillis()
68+
while (System.currentTimeMillis() - startTime <= timeoutMs) {
69+
if (closure()) {
70+
return
71+
}
72+
Thread.sleep(pollInterval)
73+
}
74+
throw IllegalStateException("Condition was not fulfilled within $timeoutMs ms.")
75+
}
6276
}

0 commit comments

Comments
 (0)