From 41e23e10186a9835e905eee4225d4d9a6882b6e8 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 4 Feb 2026 20:28:06 +0800 Subject: [PATCH 1/9] [fix][proxy] Handle "Expect: 100-continue" to prevent Early EOF --- .../core/MockedPackagesStorage.java | 10 +- .../core/MockedPackagesStorageTest.java | 56 +++++++ pulsar-proxy/pom.xml | 8 + .../proxy/server/AdminProxyHandler.java | 47 ++++++ .../proxy/server/ProxyPackagesUploadTest.java | 141 ++++++++++++++++++ 5 files changed, 260 insertions(+), 2 deletions(-) create mode 100644 pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java index 1a4b8010d51bd..c882504edf19f 100644 --- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java +++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.packages.management.core; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -45,8 +46,13 @@ public CompletableFuture writeAsync(String path, InputStream inputStream) CompletableFuture future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { try { - byte[] bytes = new byte[inputStream.available()]; - inputStream.read(bytes); + ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); + byte[] buffer = new byte[8192]; + int read; + while ((read = inputStream.read(buffer)) > 0) { + baos.write(buffer, 0, read); + } + byte[] bytes = baos.toByteArray(); storage.put(path, bytes); future.complete(null); } catch (IOException e) { diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java new file mode 100644 index 0000000000000..2d5b092c52b61 --- /dev/null +++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.packages.management.core; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import lombok.Cleanup; +import org.testng.annotations.Test; + +public class MockedPackagesStorageTest { + + @Test + public void testWriteAndRead() throws Exception { + PackagesStorageProvider provider = new MockedPackagesStorageProvider(); + PackagesStorage storage = provider.getStorage(mock(PackagesStorageConfiguration.class)); + storage.initialize(); + + // Test data + byte[] testBytes = new byte[1 * 1024 * 1024]; + + // Write + @Cleanup + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + storage.writeAsync("test/path", new ByteArrayInputStream(testBytes)) + .thenCompose(v -> storage.readAsync("test/path", baos)) + .get(); + + // Read + @Cleanup + ByteArrayOutputStream readBaos = new ByteArrayOutputStream(); + storage.readAsync("test/path", readBaos).get(); + + // Verify + assertEquals(readBaos.toByteArray(), testBytes); + + storage.closeAsync().get(); + } +} diff --git a/pulsar-proxy/pom.xml b/pulsar-proxy/pom.xml index d132cc4ce2c38..40de98cfdc820 100644 --- a/pulsar-proxy/pom.xml +++ b/pulsar-proxy/pom.xml @@ -232,6 +232,14 @@ opentelemetry-sdk-testing test + + + ${project.groupId} + pulsar-package-core + ${project.version} + test-jar + test + diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index 23ebed3420caf..61c12db2976c8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -24,7 +24,10 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -34,6 +37,7 @@ import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletRequestWrapper; import javax.servlet.http.HttpServletResponse; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.web.AuthenticationFilter; @@ -322,6 +326,20 @@ private String getWebServiceUrl() throws PulsarServerException { } } + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + if (expects100Continue(request)) { + // The client sent "Expect: 100-continue". + // Based on curl verbose output, it seems Jetty responds with "100 Continue" automatically. + // To avoid forwarding the Expect header to the broker (which could cause Early EOF), + // we wrap the request to ignore it. + request = new NoExpectRequestWrapper(request); + } + + super.service(request, response); + } + @Override protected String rewriteTarget(HttpServletRequest request) { StringBuilder url = new StringBuilder(); @@ -454,4 +472,33 @@ public void destroy() { this.sslContextRefresher.shutdownNow(); } } + + static class NoExpectRequestWrapper extends HttpServletRequestWrapper { + public NoExpectRequestWrapper(HttpServletRequest request) { + super(request); + } + + @Override + public String getHeader(String name) { + if (HttpHeader.EXPECT.is(name)) { + return null; + } + return super.getHeader(name); + } + + @Override + public Enumeration getHeaders(String name) { + if (HttpHeader.EXPECT.is(name)) { + return Collections.emptyEnumeration(); + } + return super.getHeaders(name); + } + + @Override + public Enumeration getHeaderNames() { + List names = Collections.list(super.getHeaderNames()); + names.removeIf(HttpHeader.EXPECT::is); + return Collections.enumeration(names); + } + } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java new file mode 100644 index 0000000000000..ac90bf00c3542 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import static com.google.common.net.HttpHeaders.EXPECT; +import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import lombok.Cleanup; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; +import org.apache.pulsar.packages.management.core.common.PackageMetadata; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.RequestBuilder; +import org.asynchttpclient.Response; +import org.asynchttpclient.request.body.multipart.FilePart; +import org.asynchttpclient.request.body.multipart.StringPart; +import org.eclipse.jetty.ee8.servlet.ServletHolder; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-admin") +public class ProxyPackagesUploadTest extends MockedPulsarServiceBaseTest { + + private static final int FILE_SIZE = 8 * 1024 * 1024; // 8 MB + private static final ObjectMapper MAPPER = ObjectMapperFactory.create(); + private WebServer webServer; + private PulsarAdmin proxyAdmin; + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setEnablePackagesManagement(true); + conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName()); + super.internalSetup(); + + ProxyConfiguration proxyConfig = new ProxyConfiguration(); + proxyConfig.setServicePort(Optional.of(0)); + proxyConfig.setWebServicePort(Optional.of(0)); + proxyConfig.setBrokerWebServiceURL(brokerUrl.toString()); + + webServer = new WebServer(proxyConfig, new AuthenticationService( + PulsarConfigurationLoader.convertFrom(proxyConfig, true))); + webServer.addServlet("/", new ServletHolder(new AdminProxyHandler(proxyConfig, null, null))); + webServer.start(); + + proxyAdmin = PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:" + webServer.getListenPortHTTP().get()) + .build(); + + admin.tenants().createTenant("public", createDefaultTenantInfo()); + admin.namespaces().createNamespace("public/default"); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + if (proxyAdmin != null) proxyAdmin.close(); + if (webServer != null) webServer.stop(); + super.internalCleanup(); + } + + @Test + public void testUploadPackageThroughProxy() throws Exception { + Path packageFile = Files.createTempFile("pkg-sdk", ".nar"); + Files.write(packageFile, new byte[FILE_SIZE]); + + String pkgName = "function://public/default/large-pkg-sdk@v1"; + PackageMetadata meta = PackageMetadata.builder().description("sdk-test").build(); + + proxyAdmin.packages().upload(meta, pkgName, packageFile.toString()); + + verifyDownload(pkgName, FILE_SIZE); + + Files.deleteIfExists(packageFile); + } + + @Test + public void testUploadWithExpect100Continue() throws Exception { + Path packageFile = Files.createTempFile("pkg-ahc", ".nar"); + Files.write(packageFile, new byte[FILE_SIZE]); + + String pkgName = "function://public/default/expect-test@v1"; + String uploadUrl = String.format("http://localhost:%d/admin/v3/packages/function/public/default/expect-test/v1", + webServer.getListenPortHTTP().orElseThrow()); + + @Cleanup + AsyncHttpClient client = new DefaultAsyncHttpClient(new DefaultAsyncHttpClientConfig.Builder().build()); + + Response response = client.executeRequest(new RequestBuilder("POST") + .setUrl(uploadUrl) + .addHeader(EXPECT, "100-continue") + .addBodyPart(new FilePart("file", packageFile.toFile())) + .addBodyPart(new StringPart("metadata", MAPPER.writeValueAsString( + PackageMetadata.builder().description("ahc-test").build()), "application/json")) + .build()).get(); + + assertThat(response.getStatusCode()).isEqualTo(204); + + verifyDownload(pkgName, FILE_SIZE); + + Files.deleteIfExists(packageFile); + } + + private void verifyDownload(String packageName, int expectedSize) throws Exception { + Path fromBroker = Files.createTempFile("from-broker", ".nar"); + admin.packages().download(packageName, fromBroker.toString()); + assertThat(Files.size(fromBroker)).isEqualTo(expectedSize); + Files.deleteIfExists(fromBroker); + + Path fromProxy = Files.createTempFile("from-proxy", ".nar"); + proxyAdmin.packages().download(packageName, fromProxy.toString()); + assertThat(Files.size(fromProxy)).isEqualTo(expectedSize); + Files.deleteIfExists(fromProxy); + } +} From 0caa561dfaa83353cca8521bf06174e831956127 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 5 Feb 2026 11:35:23 +0800 Subject: [PATCH 2/9] [improve][proxy] Simplify HttpClient creation --- .../proxy/server/AdminProxyHandler.java | 119 +----------------- .../proxy/server/ProxyPackagesUploadTest.java | 1 + 2 files changed, 4 insertions(+), 116 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index 61c12db2976c8..c0f1f1b349869 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -24,20 +24,14 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collections; -import java.util.Enumeration; import java.util.HashSet; -import java.util.List; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; -import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletRequestWrapper; import javax.servlet.http.HttpServletResponse; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.web.AuthenticationFilter; @@ -48,20 +42,17 @@ import org.apache.pulsar.common.util.PulsarSslConfiguration; import org.apache.pulsar.common.util.PulsarSslFactory; import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData; -import org.eclipse.jetty.client.ContinueProtocolHandler; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.ProtocolHandlers; import org.eclipse.jetty.client.RedirectProtocolHandler; import org.eclipse.jetty.client.Request; import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP; import org.eclipse.jetty.ee8.proxy.ProxyServlet; -import org.eclipse.jetty.http.HttpCookieStore; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,69 +112,12 @@ class AdminProxyHandler extends ProxyServlet { @Override protected HttpClient createHttpClient() throws ServletException { - ServletConfig config = getServletConfig(); - - HttpClient client = newHttpClient(); - - client.setFollowRedirects(true); - - // Must not store cookies, otherwise cookies of different clients will mix. - client.setHttpCookieStore(new HttpCookieStore.Empty()); - - Executor executor; - String value = config.getInitParameter("maxThreads"); - if (value == null || "-".equals(value)) { - executor = (Executor) getServletContext().getAttribute("org.eclipse.jetty.server.Executor"); - if (executor == null) { - throw new IllegalStateException("No server executor for proxy"); - } - } else { - QueuedThreadPool qtp = new QueuedThreadPool(Integer.parseInt(value)); - String servletName = config.getServletName(); - int dot = servletName.lastIndexOf('.'); - if (dot >= 0) { - servletName = servletName.substring(dot + 1); - } - qtp.setName(servletName); - executor = qtp; - } - - client.setExecutor(executor); - - value = config.getInitParameter("maxConnections"); - if (value == null) { - value = "256"; - } - client.setMaxConnectionsPerDestination(Integer.parseInt(value)); - - value = config.getInitParameter("idleTimeout"); - if (value == null) { - value = "30000"; - } - client.setIdleTimeout(Long.parseLong(value)); - - value = config.getInitParameter(INIT_PARAM_REQUEST_BUFFER_SIZE); - if (value != null) { - client.setRequestBufferSize(Integer.parseInt(value)); - } - - value = config.getInitParameter("responseBufferSize"); - if (value != null){ - client.setResponseBufferSize(Integer.parseInt(value)); - } - try { - client.start(); - - // Content must not be decoded, otherwise the client gets confused. - // Allow encoded content, such as "Content-Encoding: gzip", to pass through without decoding it. - client.getContentDecoderFactories().clear(); - - // Pass traffic to the client, only intercept what's necessary. + HttpClient client = super.createHttpClient(); + // Follow 307 redirects + client.setFollowRedirects(true); ProtocolHandlers protocolHandlers = client.getProtocolHandlers(); - protocolHandlers.clear(); protocolHandlers.put(new RedirectProtocolHandler(client)); - protocolHandlers.put(new ProxyContinueProtocolHandler()); return client; } catch (Exception x) { @@ -191,16 +125,6 @@ protected HttpClient createHttpClient() throws ServletException { } } - class ProxyContinueProtocolHandler extends ContinueProtocolHandler { - - @Override - protected Runnable onContinue(Request request) { - HttpServletRequest clientRequest = - (HttpServletRequest) request.getAttributes().get(CLIENT_REQUEST_ATTRIBUTE); - return AdminProxyHandler.this.onContinue(clientRequest, request); - } - } - // This class allows the request body to be replayed, the default implementation // does not protected class ReplayableProxyContentProvider extends ProxyInputStreamRequestContent { @@ -329,14 +253,6 @@ private String getWebServiceUrl() throws PulsarServerException { @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { - if (expects100Continue(request)) { - // The client sent "Expect: 100-continue". - // Based on curl verbose output, it seems Jetty responds with "100 Continue" automatically. - // To avoid forwarding the Expect header to the broker (which could cause Early EOF), - // we wrap the request to ignore it. - request = new NoExpectRequestWrapper(request); - } - super.service(request, response); } @@ -472,33 +388,4 @@ public void destroy() { this.sslContextRefresher.shutdownNow(); } } - - static class NoExpectRequestWrapper extends HttpServletRequestWrapper { - public NoExpectRequestWrapper(HttpServletRequest request) { - super(request); - } - - @Override - public String getHeader(String name) { - if (HttpHeader.EXPECT.is(name)) { - return null; - } - return super.getHeader(name); - } - - @Override - public Enumeration getHeaders(String name) { - if (HttpHeader.EXPECT.is(name)) { - return Collections.emptyEnumeration(); - } - return super.getHeaders(name); - } - - @Override - public Enumeration getHeaderNames() { - List names = Collections.list(super.getHeaderNames()); - names.removeIf(HttpHeader.EXPECT::is); - return Collections.enumeration(names); - } - } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java index ac90bf00c3542..6b142eaa46a5e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java @@ -25,6 +25,7 @@ import java.nio.file.Path; import java.util.Optional; import lombok.Cleanup; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; From db42b6e339b17a9942b0e47d656306621566b5ff Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 5 Feb 2026 12:07:29 +0800 Subject: [PATCH 3/9] [improve][test] Refactor package upload tests for clarity and resource management --- .../packages/management/core/MockedPackagesStorage.java | 8 +++++--- .../management/core/MockedPackagesStorageTest.java | 6 +----- .../pulsar/proxy/server/ProxyPackagesUploadTest.java | 8 +++++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java index c882504edf19f..bbc700c5bd4d2 100644 --- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java +++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java @@ -46,11 +46,13 @@ public CompletableFuture writeAsync(String path, InputStream inputStream) CompletableFuture future = new CompletableFuture<>(); CompletableFuture.runAsync(() -> { try { - ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] buffer = new byte[8192]; int read; - while ((read = inputStream.read(buffer)) > 0) { - baos.write(buffer, 0, read); + while ((read = inputStream.read(buffer)) != -1) { + if (read > 0) { + baos.write(buffer, 0, read); + } } byte[] bytes = baos.toByteArray(); storage.put(path, bytes); diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java index 2d5b092c52b61..b89c23a94edb0 100644 --- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java +++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java @@ -37,11 +37,7 @@ public void testWriteAndRead() throws Exception { byte[] testBytes = new byte[1 * 1024 * 1024]; // Write - @Cleanup - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - storage.writeAsync("test/path", new ByteArrayInputStream(testBytes)) - .thenCompose(v -> storage.readAsync("test/path", baos)) - .get(); + storage.writeAsync("test/path", new ByteArrayInputStream(testBytes)); // Read @Cleanup diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java index 6b142eaa46a5e..e9b785089f90c 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java @@ -89,9 +89,10 @@ protected void cleanup() throws Exception { @Test public void testUploadPackageThroughProxy() throws Exception { Path packageFile = Files.createTempFile("pkg-sdk", ".nar"); + packageFile.toFile().deleteOnExit(); Files.write(packageFile, new byte[FILE_SIZE]); - String pkgName = "function://public/default/large-pkg-sdk@v1"; + String pkgName = "function://public/default/pkg-sdk@v1"; PackageMetadata meta = PackageMetadata.builder().description("sdk-test").build(); proxyAdmin.packages().upload(meta, pkgName, packageFile.toString()); @@ -104,6 +105,7 @@ public void testUploadPackageThroughProxy() throws Exception { @Test public void testUploadWithExpect100Continue() throws Exception { Path packageFile = Files.createTempFile("pkg-ahc", ".nar"); + packageFile.toFile().deleteOnExit(); Files.write(packageFile, new byte[FILE_SIZE]); String pkgName = "function://public/default/expect-test@v1"; @@ -124,17 +126,17 @@ public void testUploadWithExpect100Continue() throws Exception { assertThat(response.getStatusCode()).isEqualTo(204); verifyDownload(pkgName, FILE_SIZE); - - Files.deleteIfExists(packageFile); } private void verifyDownload(String packageName, int expectedSize) throws Exception { Path fromBroker = Files.createTempFile("from-broker", ".nar"); + fromBroker.toFile().deleteOnExit(); admin.packages().download(packageName, fromBroker.toString()); assertThat(Files.size(fromBroker)).isEqualTo(expectedSize); Files.deleteIfExists(fromBroker); Path fromProxy = Files.createTempFile("from-proxy", ".nar"); + fromProxy.toFile().deleteOnExit(); proxyAdmin.packages().download(packageName, fromProxy.toString()); assertThat(Files.size(fromProxy)).isEqualTo(expectedSize); Files.deleteIfExists(fromProxy); From f9a009a143bd31c136b70fc3e415eed2c0deead6 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 5 Feb 2026 12:09:35 +0800 Subject: [PATCH 4/9] [improve][proxy] Refactor HttpClient creation to enhance readability and maintainability --- .../proxy/server/AdminProxyHandler.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index c0f1f1b349869..819c197550ab9 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -112,17 +112,15 @@ class AdminProxyHandler extends ProxyServlet { @Override protected HttpClient createHttpClient() throws ServletException { - try { - HttpClient client = super.createHttpClient(); - // Follow 307 redirects - client.setFollowRedirects(true); - ProtocolHandlers protocolHandlers = client.getProtocolHandlers(); - protocolHandlers.put(new RedirectProtocolHandler(client)); - - return client; - } catch (Exception x) { - throw new ServletException(x); - } + HttpClient httpClient = super.createHttpClient(); + customizeHttpClient(httpClient); + return httpClient; + } + + private void customizeHttpClient(HttpClient httpClient) { + httpClient.setFollowRedirects(true); + ProtocolHandlers protocolHandlers = httpClient.getProtocolHandlers(); + protocolHandlers.put(new RedirectProtocolHandler(httpClient)); } // This class allows the request body to be replayed, the default implementation From 80bbf0cf3310f0f801ea5574621aaa3fca5f0f52 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 5 Feb 2026 12:33:55 +0800 Subject: [PATCH 5/9] Update pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../packages/management/core/MockedPackagesStorage.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java index bbc700c5bd4d2..6e76d142c1266 100644 --- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java +++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorage.java @@ -50,9 +50,7 @@ public CompletableFuture writeAsync(String path, InputStream inputStream) byte[] buffer = new byte[8192]; int read; while ((read = inputStream.read(buffer)) != -1) { - if (read > 0) { - baos.write(buffer, 0, read); - } + baos.write(buffer, 0, read); } byte[] bytes = baos.toByteArray(); storage.put(path, bytes); From 9e0679d96a6942b0c1351aab30f285e41e84cfb3 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 5 Feb 2026 12:35:05 +0800 Subject: [PATCH 6/9] Update pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../org/apache/pulsar/proxy/server/AdminProxyHandler.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java index 819c197550ab9..528fbcd47b989 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java @@ -248,12 +248,6 @@ private String getWebServiceUrl() throws PulsarServerException { } } - @Override - protected void service(HttpServletRequest request, HttpServletResponse response) - throws ServletException, IOException { - super.service(request, response); - } - @Override protected String rewriteTarget(HttpServletRequest request) { StringBuilder url = new StringBuilder(); From abe38740e9fee3776ecb0771e61c2a9168cbf132 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 5 Feb 2026 12:35:35 +0800 Subject: [PATCH 7/9] Update pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../packages/management/core/MockedPackagesStorageTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java index b89c23a94edb0..eb48f02680db6 100644 --- a/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java +++ b/pulsar-package-management/core/src/test/java/org/apache/pulsar/packages/management/core/MockedPackagesStorageTest.java @@ -37,7 +37,7 @@ public void testWriteAndRead() throws Exception { byte[] testBytes = new byte[1 * 1024 * 1024]; // Write - storage.writeAsync("test/path", new ByteArrayInputStream(testBytes)); + storage.writeAsync("test/path", new ByteArrayInputStream(testBytes)).get(); // Read @Cleanup From d14d5ebeac370e97d8ed5466cb6408fcce901ba9 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 5 Feb 2026 12:37:50 +0800 Subject: [PATCH 8/9] Remove unused imports and delete file statements --- .../apache/pulsar/proxy/server/ProxyPackagesUploadTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java index e9b785089f90c..3178da4a8bb03 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java @@ -25,7 +25,6 @@ import java.nio.file.Path; import java.util.Optional; import lombok.Cleanup; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -98,8 +97,6 @@ public void testUploadPackageThroughProxy() throws Exception { proxyAdmin.packages().upload(meta, pkgName, packageFile.toString()); verifyDownload(pkgName, FILE_SIZE); - - Files.deleteIfExists(packageFile); } @Test @@ -139,6 +136,5 @@ private void verifyDownload(String packageName, int expectedSize) throws Excepti fromProxy.toFile().deleteOnExit(); proxyAdmin.packages().download(packageName, fromProxy.toString()); assertThat(Files.size(fromProxy)).isEqualTo(expectedSize); - Files.deleteIfExists(fromProxy); } } From 67d67d229a3691fed614340da9ba62be5a0b5735 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Thu, 5 Feb 2026 15:21:07 +0800 Subject: [PATCH 9/9] Fix code style --- .../pulsar/proxy/server/ProxyPackagesUploadTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java index 3178da4a8bb03..cf4cd1b189d4e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyPackagesUploadTest.java @@ -80,8 +80,12 @@ protected void setup() throws Exception { @AfterMethod(alwaysRun = true) @Override protected void cleanup() throws Exception { - if (proxyAdmin != null) proxyAdmin.close(); - if (webServer != null) webServer.stop(); + if (proxyAdmin != null) { + proxyAdmin.close(); + } + if (webServer != null) { + webServer.stop(); + } super.internalCleanup(); }