Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public class WorkerConfig extends AbstractConfig {
" from the domain of the REST API.";
protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = "";

public static final String ACCESS_CONTROL_ALLOW_METHODS_CONFIG = "access.control.allow.methods";
protected static final String ACCESS_CONTROL_ALLOW_METHODS_DOC =
"Sets the methods supported for cross origin requests by setting the Access-Control-Allow-Methods header. "
+ "The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD.";
protected static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = "";

/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
Expand Down Expand Up @@ -141,7 +146,10 @@ protected static ConfigDef baseConfigDef() {
.define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC)
.define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING,
ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW,
ACCESS_CONTROL_ALLOW_ORIGIN_DOC);
ACCESS_CONTROL_ALLOW_ORIGIN_DOC)
.define(ACCESS_CONTROL_ALLOW_METHODS_CONFIG, Type.STRING,
ACCESS_CONTROL_ALLOW_METHODS_DEFAULT, Importance.LOW,
ACCESS_CONTROL_ALLOW_METHODS_DOC);
}

public WorkerConfig(ConfigDef definition, Map<String, String> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
Expand Down Expand Up @@ -47,9 +48,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.DispatcherType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -60,6 +58,10 @@
import java.util.List;
import java.util.Map;

import javax.servlet.DispatcherType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;

/**
* Embedded server for the REST API that provides the control plane for Kafka Connect workers.
*/
Expand Down Expand Up @@ -115,7 +117,11 @@ public void start(Herder herder) {
if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) {
FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
filterHolder.setName("cross-origin");
filterHolder.setInitParameter("allowedOrigins", allowedOrigins);
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins);
String allowedMethods = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG);
if (allowedMethods != null && !allowedOrigins.trim().isEmpty()) {
filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods);
}
context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,19 @@
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.modules.junit4.PowerMockRunner;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;

import static org.junit.Assert.assertEquals;

@RunWith(PowerMockRunner.class)
Expand Down Expand Up @@ -71,15 +72,15 @@ private Map<String, String> baseWorkerProps() {

@Test
public void testCORSEnabled() {
checkCORSRequest("*", "http://bar.com", "http://bar.com");
checkCORSRequest("*", "http://bar.com", "http://bar.com", "PUT");
}

@Test
public void testCORSDisabled() {
checkCORSRequest("", "http://bar.com", null);
checkCORSRequest("", "http://bar.com", null, null);
}

public void checkCORSRequest(String corsDomain, String origin, String expectedHeader) {
public void checkCORSRequest(String corsDomain, String origin, String expectedHeader, String method) {
// To be able to set the Origin, we need to toggle this flag
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");

Expand All @@ -92,10 +93,12 @@ public Object answer() throws Throwable {
return null;
}
});

PowerMock.replayAll();

Map<String, String> workerProps = baseWorkerProps();
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_METHODS_CONFIG, method);
WorkerConfig workerConfig = new StandaloneConfig(workerProps);
server = new RestServer(workerConfig);
server.start(herder);
Expand All @@ -107,6 +110,15 @@ public Object answer() throws Throwable {
assertEquals(200, response.getStatus());

assertEquals(expectedHeader, response.getHeaderString("Access-Control-Allow-Origin"));

response = request("/connector-plugins/FileStreamSource/validate")
.header("Referer", origin + "/page")
.header("Origin", origin)
.header("Access-Control-Request-Method", method)
.options();
assertEquals(404, response.getStatus());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 404 seemed odd to me, so I want to make sure I understand why we're seeing it (since I would normally interpret that as an error). It seems that CORS doesn't require that you return a particular HTTP status code, so for the preflighted request it's actually ok if we return any code here. In the case of Jetty, it looks like the CrossOriginFilter's chainPreflight option defaults to true such that the OPTIONS request continues on for normal handling. Since we don't implement OPTIONS for any of our endpoints (and the one under test, specifically), we end up with a 404. Is that correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. This is purely for testing purpose and making sure the Access-Control-Allow-Methods header is properly set.

assertEquals(expectedHeader, response.getHeaderString("Access-Control-Allow-Origin"));
assertEquals(method, response.getHeaderString("Access-Control-Allow-Methods"));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be checking the Access-Control-Allow-Origin header on this request as well, just as we do with the first request in this method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

PowerMock.verifyAll();
}

Expand Down