Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.packages.management.core;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -45,8 +46,13 @@ public CompletableFuture<Void> writeAsync(String path, InputStream inputStream)
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try {
byte[] bytes = new byte[inputStream.available()];
inputStream.read(bytes);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[8192];
int read;
while ((read = inputStream.read(buffer)) != -1) {
baos.write(buffer, 0, read);
}
byte[] bytes = baos.toByteArray();
storage.put(path, bytes);
Comment thread
nodece marked this conversation as resolved.
future.complete(null);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.packages.management.core;

import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import lombok.Cleanup;
import org.testng.annotations.Test;

public class MockedPackagesStorageTest {

@Test
public void testWriteAndRead() throws Exception {
PackagesStorageProvider provider = new MockedPackagesStorageProvider();
PackagesStorage storage = provider.getStorage(mock(PackagesStorageConfiguration.class));
storage.initialize();

// Test data
byte[] testBytes = new byte[1 * 1024 * 1024];
Comment thread
nodece marked this conversation as resolved.

// Write
storage.writeAsync("test/path", new ByteArrayInputStream(testBytes)).get();

// Read
@Cleanup
ByteArrayOutputStream readBaos = new ByteArrayOutputStream();
storage.readAsync("test/path", readBaos).get();

// Verify
assertEquals(readBaos.toByteArray(), testBytes);

storage.closeAsync().get();
}
}
8 changes: 8 additions & 0 deletions pulsar-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-package-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
Expand All @@ -44,20 +42,17 @@
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.eclipse.jetty.client.ContinueProtocolHandler;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.ProtocolHandlers;
import org.eclipse.jetty.client.RedirectProtocolHandler;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP;
import org.eclipse.jetty.ee8.proxy.ProxyServlet;
import org.eclipse.jetty.http.HttpCookieStore;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -117,84 +112,15 @@ class AdminProxyHandler extends ProxyServlet {

@Override
protected HttpClient createHttpClient() throws ServletException {
ServletConfig config = getServletConfig();

HttpClient client = newHttpClient();

client.setFollowRedirects(true);

// Must not store cookies, otherwise cookies of different clients will mix.
client.setHttpCookieStore(new HttpCookieStore.Empty());

Executor executor;
String value = config.getInitParameter("maxThreads");
if (value == null || "-".equals(value)) {
executor = (Executor) getServletContext().getAttribute("org.eclipse.jetty.server.Executor");
if (executor == null) {
throw new IllegalStateException("No server executor for proxy");
}
} else {
QueuedThreadPool qtp = new QueuedThreadPool(Integer.parseInt(value));
String servletName = config.getServletName();
int dot = servletName.lastIndexOf('.');
if (dot >= 0) {
servletName = servletName.substring(dot + 1);
}
qtp.setName(servletName);
executor = qtp;
}

client.setExecutor(executor);

value = config.getInitParameter("maxConnections");
if (value == null) {
value = "256";
}
client.setMaxConnectionsPerDestination(Integer.parseInt(value));

value = config.getInitParameter("idleTimeout");
if (value == null) {
value = "30000";
}
client.setIdleTimeout(Long.parseLong(value));

value = config.getInitParameter(INIT_PARAM_REQUEST_BUFFER_SIZE);
if (value != null) {
client.setRequestBufferSize(Integer.parseInt(value));
}

value = config.getInitParameter("responseBufferSize");
if (value != null){
client.setResponseBufferSize(Integer.parseInt(value));
}

try {
client.start();

// Content must not be decoded, otherwise the client gets confused.
// Allow encoded content, such as "Content-Encoding: gzip", to pass through without decoding it.
client.getContentDecoderFactories().clear();

// Pass traffic to the client, only intercept what's necessary.
ProtocolHandlers protocolHandlers = client.getProtocolHandlers();
protocolHandlers.clear();
protocolHandlers.put(new RedirectProtocolHandler(client));
protocolHandlers.put(new ProxyContinueProtocolHandler());

return client;
} catch (Exception x) {
throw new ServletException(x);
}
HttpClient httpClient = super.createHttpClient();
Comment thread
lhotari marked this conversation as resolved.
customizeHttpClient(httpClient);
return httpClient;
}

class ProxyContinueProtocolHandler extends ContinueProtocolHandler {

@Override
protected Runnable onContinue(Request request) {
HttpServletRequest clientRequest =
(HttpServletRequest) request.getAttributes().get(CLIENT_REQUEST_ATTRIBUTE);
return AdminProxyHandler.this.onContinue(clientRequest, request);
}
private void customizeHttpClient(HttpClient httpClient) {
httpClient.setFollowRedirects(true);
ProtocolHandlers protocolHandlers = httpClient.getProtocolHandlers();
Comment thread
nodece marked this conversation as resolved.
protocolHandlers.put(new RedirectProtocolHandler(httpClient));
}

// This class allows the request body to be replayed, the default implementation
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;

import static com.google.common.net.HttpHeaders.EXPECT;
import static org.assertj.core.api.Assertions.assertThat;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.Response;
import org.asynchttpclient.request.body.multipart.FilePart;
import org.asynchttpclient.request.body.multipart.StringPart;
import org.eclipse.jetty.ee8.servlet.ServletHolder;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-admin")
public class ProxyPackagesUploadTest extends MockedPulsarServiceBaseTest {

private static final int FILE_SIZE = 8 * 1024 * 1024; // 8 MB
private static final ObjectMapper MAPPER = ObjectMapperFactory.create();
private WebServer webServer;
private PulsarAdmin proxyAdmin;

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
conf.setEnablePackagesManagement(true);
conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
super.internalSetup();

ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setServicePort(Optional.of(0));
proxyConfig.setWebServicePort(Optional.of(0));
proxyConfig.setBrokerWebServiceURL(brokerUrl.toString());

webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig, true)));
webServer.addServlet("/", new ServletHolder(new AdminProxyHandler(proxyConfig, null, null)));
webServer.start();

proxyAdmin = PulsarAdmin.builder()
.serviceHttpUrl("http://localhost:" + webServer.getListenPortHTTP().get())
.build();

admin.tenants().createTenant("public", createDefaultTenantInfo());
admin.namespaces().createNamespace("public/default");
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
if (proxyAdmin != null) {
proxyAdmin.close();
}
if (webServer != null) {
webServer.stop();
}
super.internalCleanup();
}

@Test
public void testUploadPackageThroughProxy() throws Exception {
Path packageFile = Files.createTempFile("pkg-sdk", ".nar");
packageFile.toFile().deleteOnExit();
Comment thread
nodece marked this conversation as resolved.
Files.write(packageFile, new byte[FILE_SIZE]);

String pkgName = "function://public/default/pkg-sdk@v1";
PackageMetadata meta = PackageMetadata.builder().description("sdk-test").build();

proxyAdmin.packages().upload(meta, pkgName, packageFile.toString());

verifyDownload(pkgName, FILE_SIZE);
Comment thread
nodece marked this conversation as resolved.
}

@Test
public void testUploadWithExpect100Continue() throws Exception {
Path packageFile = Files.createTempFile("pkg-ahc", ".nar");
packageFile.toFile().deleteOnExit();
Files.write(packageFile, new byte[FILE_SIZE]);

String pkgName = "function://public/default/expect-test@v1";
String uploadUrl = String.format("http://localhost:%d/admin/v3/packages/function/public/default/expect-test/v1",
webServer.getListenPortHTTP().orElseThrow());

@Cleanup
AsyncHttpClient client = new DefaultAsyncHttpClient(new DefaultAsyncHttpClientConfig.Builder().build());

Response response = client.executeRequest(new RequestBuilder("POST")
.setUrl(uploadUrl)
.addHeader(EXPECT, "100-continue")
.addBodyPart(new FilePart("file", packageFile.toFile()))
.addBodyPart(new StringPart("metadata", MAPPER.writeValueAsString(
PackageMetadata.builder().description("ahc-test").build()), "application/json"))
.build()).get();

assertThat(response.getStatusCode()).isEqualTo(204);

verifyDownload(pkgName, FILE_SIZE);
Comment thread
nodece marked this conversation as resolved.
Comment thread
nodece marked this conversation as resolved.
}
Comment thread
nodece marked this conversation as resolved.

private void verifyDownload(String packageName, int expectedSize) throws Exception {
Path fromBroker = Files.createTempFile("from-broker", ".nar");
fromBroker.toFile().deleteOnExit();
admin.packages().download(packageName, fromBroker.toString());
assertThat(Files.size(fromBroker)).isEqualTo(expectedSize);
Files.deleteIfExists(fromBroker);

Path fromProxy = Files.createTempFile("from-proxy", ".nar");
fromProxy.toFile().deleteOnExit();
proxyAdmin.packages().download(packageName, fromProxy.toString());
assertThat(Files.size(fromProxy)).isEqualTo(expectedSize);
}
}
Loading