diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java index 1a4b8010d51bd..6e76d142c1266 100644 --- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java +++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.packages.management.core; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -45,8 +46,13 @@ public CompletableFuture writeAsync(String path, InputStream inputStream) CompletableFuture future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { try { - byte[] bytes = new byte[inputStream.available()]; - inputStream.read(bytes); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[8192]; + int read; + while ((read = inputStream.read(buffer)) != -1) { + baos.write(buffer, 0, read); + } + byte[] bytes = baos.toByteArray(); storage.put(path, bytes); future.complete(null); } catch (IOException e) { diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java new file mode 100644 index 0000000000000..eb48f02680db6 --- /dev/null +++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.packages.management.core; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import lombok.Cleanup; +import org.testng.annotations.Test; + +public class MockedPackagesStorageTest { + + @Test + public void testWriteAndRead() throws Exception { + PackagesStorageProvider provider = new MockedPackagesStorageProvider(); + PackagesStorage storage = provider.getStorage(mock(PackagesStorageConfiguration.class)); + storage.initialize(); + + // Test data + byte[] testBytes = new byte[1 * 1024 * 1024]; + + // Write + storage.writeAsync("test/path", new ByteArrayInputStream(testBytes)).get(); + + // Read + @Cleanup + ByteArrayOutputStream readBaos = new ByteArrayOutputStream(); + storage.readAsync("test/path", readBaos).get(); + + // Verify + assertEquals(readBaos.toByteArray(), testBytes); + + storage.closeAsync().get(); + } +} diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml index d132cc4ce2c38..40de98cfdc820 100644 --- a/pulsar-proxy/pom.xml +++ b/pulsar-proxy/pom.xml @@ -232,6 +232,14 @@ opentelemetry-sdk-testing test + + + ${project.groupId} + pulsar-package-core + ${project.version} + test-jar + test + diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index 23ebed3420caf..528fbcd47b989 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -26,12 +26,10 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; -import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -44,20 +42,17 @@ import org.apache.pulsar.common.util.PulsarSslConfiguration; import org.apache.pulsar.common.util.PulsarSslFactory; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; -import org.eclipse.jetty.client.ContinueProtocolHandler; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.ProtocolHandlers; import org.eclipse.jetty.client.RedirectProtocolHandler; import org.eclipse.jetty.client.Request; import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP; import org.eclipse.jetty.ee8.proxy.ProxyServlet; -import org.eclipse.jetty.http.HttpCookieStore; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,84 +112,15 @@ class AdminProxyHandler extends ProxyServlet { @Override protected HttpClient createHttpClient() throws ServletException { - ServletConfig config = getServletConfig(); - - HttpClient client = newHttpClient(); - - client.setFollowRedirects(true); - - // Must not store cookies, otherwise cookies of different clients will mix. - client.setHttpCookieStore(new HttpCookieStore.Empty()); - - Executor executor; - String value = config.getInitParameter("maxThreads"); - if (value == null || "-".equals(value)) { - executor = (Executor) getServletContext().getAttribute("org.eclipse.jetty.server.Executor"); - if (executor == null) { - throw new IllegalStateException("No server executor for proxy"); - } - } else { - QueuedThreadPool qtp = new QueuedThreadPool(Integer.parseInt(value)); - String servletName = config.getServletName(); - int dot = servletName.lastIndexOf('.'); - if (dot >= 0) { - servletName = servletName.substring(dot + 1); - } - qtp.setName(servletName); - executor = qtp; - } - - client.setExecutor(executor); - - value = config.getInitParameter("maxConnections"); - if (value == null) { - value = "256"; - } - client.setMaxConnectionsPerDestination(Integer.parseInt(value)); - - value = config.getInitParameter("idleTimeout"); - if (value == null) { - value = "30000"; - } - client.setIdleTimeout(Long.parseLong(value)); - - value = config.getInitParameter(INIT_PARAM_REQUEST_BUFFER_SIZE); - if (value != null) { - client.setRequestBufferSize(Integer.parseInt(value)); - } - - value = config.getInitParameter("responseBufferSize"); - if (value != null){ - client.setResponseBufferSize(Integer.parseInt(value)); - } - - try { - client.start(); - - // Content must not be decoded, otherwise the client gets confused. - // Allow encoded content, such as "Content-Encoding: gzip", to pass through without decoding it. - client.getContentDecoderFactories().clear(); - - // Pass traffic to the client, only intercept what's necessary. - ProtocolHandlers protocolHandlers = client.getProtocolHandlers(); - protocolHandlers.clear(); - protocolHandlers.put(new RedirectProtocolHandler(client)); - protocolHandlers.put(new ProxyContinueProtocolHandler()); - - return client; - } catch (Exception x) { - throw new ServletException(x); - } + HttpClient httpClient = super.createHttpClient(); + customizeHttpClient(httpClient); + return httpClient; } - class ProxyContinueProtocolHandler extends ContinueProtocolHandler { - - @Override - protected Runnable onContinue(Request request) { - HttpServletRequest clientRequest = - (HttpServletRequest) request.getAttributes().get(CLIENT_REQUEST_ATTRIBUTE); - return AdminProxyHandler.this.onContinue(clientRequest, request); - } + private void customizeHttpClient(HttpClient httpClient) { + httpClient.setFollowRedirects(true); + ProtocolHandlers protocolHandlers = httpClient.getProtocolHandlers(); + protocolHandlers.put(new RedirectProtocolHandler(httpClient)); } // This class allows the request body to be replayed, the default implementation diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java new file mode 100644 index 0000000000000..cf4cd1b189d4e --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static com.google.common.net.HttpHeaders.EXPECT; +import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import lombok.Cleanup; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; +import org.apache.pulsar.packages.management.core.common.PackageMetadata; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.RequestBuilder; +import org.asynchttpclient.Response; +import org.asynchttpclient.request.body.multipart.FilePart; +import org.asynchttpclient.request.body.multipart.StringPart; +import org.eclipse.jetty.ee8.servlet.ServletHolder; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +public class ProxyPackagesUploadTest extends MockedPulsarServiceBaseTest { + + private static final int FILE_SIZE = 8 * 1024 * 1024; // 8 MB + private static final ObjectMapper MAPPER = ObjectMapperFactory.create(); + private WebServer webServer; + private PulsarAdmin proxyAdmin; + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setEnablePackagesManagement(true); + conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName()); + super.internalSetup(); + + ProxyConfiguration proxyConfig = new ProxyConfiguration(); + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setBrokerWebServiceURL(brokerUrl.toString()); + + webServer = new WebServer(proxyConfig, new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig, true))); + webServer.addServlet("/", new ServletHolder(new AdminProxyHandler(proxyConfig, null, null))); + webServer.start(); + + proxyAdmin = PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:" + webServer.getListenPortHTTP().get()) + .build(); + + admin.tenants().createTenant("public", createDefaultTenantInfo()); + admin.namespaces().createNamespace("public/default"); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + if (proxyAdmin != null) { + proxyAdmin.close(); + } + if (webServer != null) { + webServer.stop(); + } + super.internalCleanup(); + } + + @Test + public void testUploadPackageThroughProxy() throws Exception { + Path packageFile = Files.createTempFile("pkg-sdk", ".nar"); + packageFile.toFile().deleteOnExit(); + Files.write(packageFile, new byte[FILE_SIZE]); + + String pkgName = "function://public/default/pkg-sdk@v1"; + PackageMetadata meta = PackageMetadata.builder().description("sdk-test").build(); + + proxyAdmin.packages().upload(meta, pkgName, packageFile.toString()); + + verifyDownload(pkgName, FILE_SIZE); + } + + @Test + public void testUploadWithExpect100Continue() throws Exception { + Path packageFile = Files.createTempFile("pkg-ahc", ".nar"); + packageFile.toFile().deleteOnExit(); + Files.write(packageFile, new byte[FILE_SIZE]); + + String pkgName = "function://public/default/expect-test@v1"; + String uploadUrl = String.format("http://localhost:%d/admin/v3/packages/function/public/default/expect-test/v1", + webServer.getListenPortHTTP().orElseThrow()); + + @Cleanup + AsyncHttpClient client = new DefaultAsyncHttpClient(new DefaultAsyncHttpClientConfig.Builder().build()); + + Response response = client.executeRequest(new RequestBuilder("POST") + .setUrl(uploadUrl) + .addHeader(EXPECT, "100-continue") + .addBodyPart(new FilePart("file", packageFile.toFile())) + .addBodyPart(new StringPart("metadata", MAPPER.writeValueAsString( + PackageMetadata.builder().description("ahc-test").build()), "application/json")) + .build()).get(); + + assertThat(response.getStatusCode()).isEqualTo(204); + + verifyDownload(pkgName, FILE_SIZE); + } + + private void verifyDownload(String packageName, int expectedSize) throws Exception { + Path fromBroker = Files.createTempFile("from-broker", ".nar"); + fromBroker.toFile().deleteOnExit(); + admin.packages().download(packageName, fromBroker.toString()); + assertThat(Files.size(fromBroker)).isEqualTo(expectedSize); + Files.deleteIfExists(fromBroker); + + Path fromProxy = Files.createTempFile("from-proxy", ".nar"); + fromProxy.toFile().deleteOnExit(); + proxyAdmin.packages().download(packageName, fromProxy.toString()); + assertThat(Files.size(fromProxy)).isEqualTo(expectedSize); + } +}