From 8fca2cac4ac785f5550711e4a854f536dd1991f5 Mon Sep 17 00:00:00 2001 From: Sergio Champel Date: Sun, 29 Jan 2023 23:03:12 +0100 Subject: [PATCH] Upgrade to reactor core 3.5 Signed-off-by: Sergio Champel --- .../io/dapr/examples/OpenTelemetryConfig.java | 2 +- .../dapr/examples/pubsub/BulkPublisher.java | 2 +- .../examples/pubsub/PublisherWithTracing.java | 2 +- .../dapr/examples/tracing/InvokeClient.java | 2 +- .../TracingDemoMiddleServiceController.java | 4 +- .../io/dapr/actors/client/DaprGrpcClient.java | 6 +- .../main/resources/META-INF/spring.factories | 2 - sdk/pom.xml | 2 +- .../java/io/dapr/client/DaprClientGrpc.java | 32 ++--- .../java/io/dapr/client/DaprClientHttp.java | 116 +++++++++--------- .../main/java/io/dapr/client/DaprHttp.java | 30 ++--- .../dapr/internal/opencensus/GrpcWrapper.java | 4 +- .../client/DaprClientGrpcTelemetryTest.java | 2 +- .../io/dapr/client/DaprClientHttpTest.java | 2 +- .../java/io/dapr/client/DaprHttpStub.java | 7 +- 15 files changed, 107 insertions(+), 108 deletions(-) delete mode 100644 sdk-springboot/src/main/resources/META-INF/spring.factories diff --git a/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java b/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java index e273ae55cf..84909e2911 100644 --- a/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java +++ b/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java @@ -89,7 +89,7 @@ public static OpenTelemetry createOpenTelemetry() { * Converts current OpenTelemetry's context into Reactor's context. * @return Reactor's context. */ - public static reactor.util.context.Context getReactorContext() { + public static reactor.util.context.ContextView getReactorContext() { return getReactorContext(Context.current()); } diff --git a/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java b/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java index e1c1306b0a..a6552f9cd4 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java @@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception { System.out.println("Going to publish message : " + message); } BulkPublishResponse res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages) - .subscriberContext(getReactorContext()).block(); + .contextWrite(getReactorContext()).block(); System.out.println("Published the set of messages in a single call to Dapr"); if (res != null) { if (res.getFailedEntries().size() > 0) { diff --git a/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java b/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java index fe9bf93e5e..cc8b3d1acd 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception { client.publishEvent( PUBSUB_NAME, TOPIC_NAME, - message).subscriberContext(getReactorContext()).block(); + message).contextWrite(getReactorContext()).block(); System.out.println("Published message: " + message); try { diff --git a/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java b/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java index 9078fb0d53..3ddfa1d38c 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java +++ b/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception { InvokeMethodRequest sleepRequest = new InvokeMethodRequest(SERVICE_APP_ID, "proxy_sleep") .setHttpExtension(HttpExtension.POST); return client.invokeMethod(sleepRequest, TypeRef.get(Void.class)); - }).subscriberContext(getReactorContext()).block(); + }).contextWrite(getReactorContext()).block(); } } } diff --git a/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java b/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java index 9a3111e859..323d4c0232 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java +++ b/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java @@ -58,7 +58,7 @@ public Mono echo( InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "echo") .setBody(body) .setHttpExtension(HttpExtension.POST); - return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)); + return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)); } /** @@ -71,7 +71,7 @@ public Mono echo( public Mono sleep(@RequestAttribute(name = "opentelemetry-context") Context context) { InvokeMethodRequest request = new InvokeMethodRequest(INVOKE_APP_ID, "sleep") .setHttpExtension(HttpExtension.POST); - return client.invokeMethod(request, TypeRef.get(byte[].class)).subscriberContext(getReactorContext(context)).then(); + return client.invokeMethod(request, TypeRef.get(byte[].class)).contextWrite(getReactorContext(context)).then(); } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java index a5b963a8f2..844d2ace36 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java @@ -29,7 +29,7 @@ import io.grpc.stub.StreamObserver; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; -import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -65,7 +65,7 @@ public Mono invoke(String actorType, String actorId, String methodName, .setMethod(methodName) .setData(jsonPayload == null ? ByteString.EMPTY : ByteString.copyFrom(jsonPayload)) .build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, client).invokeActor(req, it) ) @@ -109,7 +109,7 @@ public void start(final Listener responseListener, final Metadata metadat * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) { + private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { return GrpcWrapper.intercept(context, client); } diff --git a/sdk-springboot/src/main/resources/META-INF/spring.factories b/sdk-springboot/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 58ff4b58d6..0000000000 --- a/sdk-springboot/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,2 +0,0 @@ -org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -io.dapr.springboot.DaprAutoConfiguration diff --git a/sdk/pom.xml b/sdk/pom.xml index 09efa68bf8..7acd9e603e 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -44,7 +44,7 @@ io.projectreactor reactor-core - 3.3.11.RELEASE + 3.5.0 com.squareup.okhttp3 diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index cdc6c88b63..e038ea4c29 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -67,7 +67,7 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; -import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.io.Closeable; import java.io.IOException; @@ -181,7 +181,7 @@ public Mono publishEvent(PublishEventRequest request) { envelopeBuilder.putAllMetadata(metadata); } - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it) @@ -254,7 +254,7 @@ public Mono> publishEvents(BulkPublishRequest requ for (BulkPublishEntry entry: request.getEntries()) { entryMap.put(entry.getEntryId(), entry); } - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).bulkPublishEventAlpha1(envelopeBuilder.build(), it) @@ -298,7 +298,7 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef // gRPC to gRPC does not handle metadata in Dapr runtime proto. // gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342 - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).invokeService(envelope, it) ) @@ -345,7 +345,7 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) } DaprProtos.InvokeBindingRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).invokeBinding(envelope, it) ) @@ -392,7 +392,7 @@ public Mono> getState(GetStateRequest request, TypeRef type) { DaprProtos.GetStateRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).getState(envelope, it) @@ -441,7 +441,7 @@ public Mono>> getBulkState(GetBulkStateRequest request, TypeRe DaprProtos.GetBulkStateRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub) .getBulkState(envelope, it) ) @@ -525,7 +525,7 @@ public Mono executeStateTransaction(ExecuteStateTransactionRequest request } DaprProtos.ExecuteStateTransactionRequest req = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it)) ).then(); } catch (Exception e) { @@ -551,7 +551,7 @@ public Mono saveBulkState(SaveStateRequest request) { } DaprProtos.SaveStateRequest req = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).saveState(req, it)) ).then(); } catch (Exception ex) { @@ -635,7 +635,7 @@ public Mono deleteState(DeleteStateRequest request) { DaprProtos.DeleteStateRequest req = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).deleteState(req, it)) ).then(); } catch (Exception ex) { @@ -713,7 +713,7 @@ public Mono> getSecret(GetSecretRequest request) { } DaprProtos.GetSecretRequest req = requestBuilder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono(it -> intercept(context, asyncStub).getSecret(req, it)) ).map(DaprProtos.GetSecretResponse::getDataMap); } @@ -738,7 +738,7 @@ public Mono>> getBulkSecret(GetBulkSecretRequest DaprProtos.GetBulkSecretRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).getBulkSecret(envelope, it) @@ -791,7 +791,7 @@ public Mono> queryState(QueryStateRequest request, Typ DaprProtos.QueryStateRequest envelope = builder.build(); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).queryStateAlpha1(envelope, it) ) @@ -855,7 +855,7 @@ public void close() throws Exception { */ @Override public Mono shutdown() { - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it)) ).then(); @@ -889,7 +889,7 @@ public Mono> getConfiguration(GetConfigurationReq } private Mono> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) { - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.createMono( it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it) @@ -1034,7 +1034,7 @@ public void start(final Listener responseListener, final Metadata metadat * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) { + private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { return GrpcWrapper.intercept(context, client); } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index 2ec7bf780a..286dbe8c13 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -177,7 +177,7 @@ public Mono publishEvent(PublishEventRequest request) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic }; Map> queryArgs = metadataToQueryArgs(metadata); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context ) @@ -237,7 +237,7 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef } else { headers.put(Metadata.CONTENT_TYPE, objectSerializer.getContentType()); } - Mono response = Mono.subscriberContext().flatMap( + Mono response = Mono.deferContextual( context -> this.client.invokeApi(httpMethod, pathSegments.toArray(new String[0]), httpExtension.getQueryParams(), serializedRequestBody, headers, context) ); @@ -309,7 +309,7 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "bindings", name }; - Mono response = Mono.subscriberContext().flatMap( + Mono response = Mono.deferContextual( context -> this.client.invokeApi( httpMethod, pathSegments, null, payload, null, context) ); @@ -349,7 +349,7 @@ public Mono>> getBulkState(GetBulkStateRequest request, TypeRe String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "bulk" }; Map> queryArgs = metadataToQueryArgs(metadata); - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, requestBody, null, context) ).flatMap(s -> { @@ -394,7 +394,7 @@ public Mono> getState(GetStateRequest request, TypeRef type) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryParams, null, context) ).flatMap(s -> { @@ -452,7 +452,7 @@ public Mono executeStateTransaction(ExecuteStateTransactionRequest request String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, "transaction" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedOperationBody, null, context ) @@ -500,7 +500,7 @@ public Mono saveBulkState(SaveStateRequest request) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.POST.name(), pathSegments, null, serializedStateBody, null, context) ).then(); @@ -543,7 +543,7 @@ public Mono deleteState(DeleteStateRequest request) { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "state", stateStoreName, key }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client.invokeApi( DaprHttp.HttpMethods.DELETE.name(), pathSegments, queryParams, headers, context) ).then(); @@ -631,7 +631,7 @@ public Mono> getSecret(GetSecretRequest request) { Map> queryArgs = metadataToQueryArgs(metadata); String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, key }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context) ).flatMap(response -> { @@ -667,7 +667,7 @@ public Mono>> getBulkSecret(GetBulkSecretRequest Map> queryArgs = metadataToQueryArgs(metadata); String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "secrets", secretStoreName, "bulk" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi(DaprHttp.HttpMethods.GET.name(), pathSegments, queryArgs, (String) null, null, context) ).flatMap(response -> { @@ -709,17 +709,17 @@ public Mono> queryState(QueryStateRequest request, Typ } else { throw new IllegalArgumentException("Both query and queryString fields are not set."); } - return Mono.subscriberContext().flatMap( - context -> this.client - .invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, - queryArgs, serializedRequest, null, context) - ).flatMap(response -> { - try { - return Mono.justOrEmpty(buildQueryStateResponse(response, type)); - } catch (Exception e) { - return DaprException.wrapMono(e); - } - }); + return Mono.deferContextual( + context -> this.client + .invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, + queryArgs, serializedRequest, null, context) + ).flatMap(response -> { + try { + return Mono.justOrEmpty(buildQueryStateResponse(response, type)); + } catch (Exception e) { + return DaprException.wrapMono(e); + } + }); } catch (Exception e) { return DaprException.wrapMono(e); } @@ -739,14 +739,14 @@ public void close() { @Override public Mono shutdown() { String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, null, null, context)) .then(); } private QueryStateResponse buildQueryStateResponse(DaprHttp.Response response, - TypeRef type) throws IOException { + TypeRef type) throws IOException { JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody()); if (!root.has("results")) { return new QueryStateResponse<>(Collections.emptyList(), null); @@ -810,36 +810,36 @@ public Mono> getConfiguration(GetConfigurationReq queryParams.putAll(queryArgs); String[] pathSegments = new String[] {DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName }; - return Mono.subscriberContext().flatMap( - context -> this.client - .invokeApi( - DaprHttp.HttpMethods.GET.name(), - pathSegments, queryParams, - (String) null, null, context) + return Mono.deferContextual( + context -> this.client + .invokeApi( + DaprHttp.HttpMethods.GET.name(), + pathSegments, queryParams, + (String) null, null, context) ).map( - response -> { - try { - Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class); - Set set = m.keySet(); - JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody()); - Iterator itr = set.iterator(); - Map result = new HashMap<>(); - while (itr.hasNext()) { - String key = itr.next(); - String value = root.get(key).path("value").asText(); - String version = root.get(key).path("version").asText(); - result.put(key, new ConfigurationItem( - key, - value, - version, - new HashMap<>() - )); - } - return Collections.unmodifiableMap(result); - } catch (IOException e) { - throw new RuntimeException(e); - } + response -> { + try { + Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class); + Set set = m.keySet(); + JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody()); + Iterator itr = set.iterator(); + Map result = new HashMap<>(); + while (itr.hasNext()) { + String key = itr.next(); + String value = root.get(key).path("value").asText(); + String version = root.get(key).path("version").asText(); + result.put(key, new ConfigurationItem( + key, + value, + version, + new HashMap<>() + )); } + return Collections.unmodifiableMap(result); + } catch (IOException e) { + throw new RuntimeException(e); + } + } ); } catch (Exception ex) { return DaprException.wrapMono(ex); @@ -871,12 +871,12 @@ public Flux subscribeConfiguration(SubscribeConf String[] pathSegments = new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName, "subscribe" }; - SubscribeConfigurationResponse res = Mono.subscriberContext().flatMap( - context -> this.client.invokeApi( - DaprHttp.HttpMethods.GET.name(), - pathSegments, queryParams, - (String) null, null, context - ) + SubscribeConfigurationResponse res = Mono.deferContextual( + context -> this.client.invokeApi( + DaprHttp.HttpMethods.GET.name(), + pathSegments, queryParams, + (String) null, null, context + ) ).map(response -> { try { JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody()); @@ -913,7 +913,7 @@ public Mono unsubscribeConfiguration(Unsubscri String[] pathSegments = new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configStoreName, id, "unsubscribe" }; - return Mono.subscriberContext().flatMap( + return Mono.deferContextual( context -> this.client .invokeApi( DaprHttp.HttpMethods.GET.name(), diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index 66b9c9a06d..ad71e25865 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -30,6 +30,7 @@ import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -183,7 +184,7 @@ public Mono invokeApi( String[] pathSegments, Map> urlParameters, Map headers, - Context context) { + ContextView context) { return this.invokeApi(method, pathSegments, urlParameters, (byte[]) null, headers, context); } @@ -204,7 +205,7 @@ public Mono invokeApi( Map> urlParameters, String content, Map headers, - Context context) { + ContextView context) { return this.invokeApi( method, pathSegments, urlParameters, content == null @@ -224,12 +225,12 @@ public Mono invokeApi( * @return Asynchronous response */ public Mono invokeApi( - String method, - String[] pathSegments, - Map> urlParameters, - byte[] content, - Map headers, - Context context) { + String method, + String[] pathSegments, + Map> urlParameters, + byte[] content, + Map headers, + ContextView context) { // fromCallable() is needed so the invocation does not happen early, causing a hot mono. return Mono.fromCallable(() -> doInvokeApi(method, pathSegments, urlParameters, content, headers, context)) .flatMap(f -> Mono.fromFuture(f)); @@ -256,10 +257,10 @@ public void close() { * @return CompletableFuture for Response. */ private CompletableFuture doInvokeApi(String method, - String[] pathSegments, - Map> urlParameters, - byte[] content, Map headers, - Context context) { + String[] pathSegments, + Map> urlParameters, + byte[] content, Map headers, + ContextView context) { final String requestId = UUID.randomUUID().toString(); RequestBody body; @@ -282,8 +283,8 @@ private CompletableFuture doInvokeApi(String method, Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream() .forEach(urlParameter -> Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream() - .forEach(urlParameterValue -> - urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue))); + .forEach(urlParameterValue -> + urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue))); Request.Builder requestBuilder = new Request.Builder() .url(urlBuilder.build()) @@ -305,7 +306,6 @@ private CompletableFuture doInvokeApi(String method, if (daprApiToken != null) { requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken); } - requestBuilder.addHeader(Headers.DAPR_USER_AGENT, Version.getSdkVersion()); if (headers != null) { diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java b/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java index b4940d8530..61db83627b 100644 --- a/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java +++ b/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java @@ -23,9 +23,9 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.util.Map; -import java.util.Optional; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -61,7 +61,7 @@ private GrpcWrapper() { * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - public static DaprGrpc.DaprStub intercept(final Context context, DaprGrpc.DaprStub client) { + public static DaprGrpc.DaprStub intercept(final ContextView context, DaprGrpc.DaprStub client) { ClientInterceptor interceptor = new ClientInterceptor() { @Override public ClientCall interceptCall( diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java index e323d9b574..0b27bf815f 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java @@ -189,7 +189,7 @@ public void invokeServiceVoidWithTracingTest() { .setBody("request") .setHttpExtension(HttpExtension.NONE); Mono result = this.client.invokeMethod(req, TypeRef.get(Void.class)) - .subscriberContext(it -> it.putAll(contextCopy == null ? Context.empty() : contextCopy)); + .contextWrite((contextCopy == null ? Context.empty() : contextCopy)); result.block(); } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index d1831fb23b..91935641b8 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -422,7 +422,7 @@ public void invokeServiceWithContext() { .setBody("request") .setHttpExtension(HttpExtension.POST); Mono result = daprClientHttp.invokeMethod(req, TypeRef.get(Void.class)) - .subscriberContext(it -> it.putAll(context)); + .contextWrite(context); result.block(); } diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java index d83f517c54..6112aa409f 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java @@ -15,6 +15,7 @@ import reactor.core.publisher.Mono; import reactor.util.context.Context; +import reactor.util.context.ContextView; import java.util.List; import java.util.Map; @@ -45,7 +46,7 @@ public Mono invokeApi(String method, String[] pathSegments, Map> urlParameters, Map headers, - Context context) { + ContextView context) { return Mono.empty(); } @@ -58,7 +59,7 @@ public Mono invokeApi(String method, Map> urlParameters, String content, Map headers, - Context context) { + ContextView context) { return Mono.empty(); } @@ -71,7 +72,7 @@ public Mono invokeApi(String method, Map> urlParameters, byte[] content, Map headers, - Context context) { + ContextView context) { return Mono.empty(); }