Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.HashMap;
Expand Down Expand Up @@ -286,27 +284,6 @@ public class ConsumerConfig extends AbstractConfig {
Type.CLASS,
Importance.HIGH,
VALUE_DESERIALIZER_CLASS_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
.define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
.define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
.define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
.define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
.define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
.define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
.define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
.define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
.define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
.define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
.define(REQUEST_TIMEOUT_MS_CONFIG,
Type.INT,
40 * 1000,
Expand All @@ -318,7 +295,16 @@ public class ConsumerConfig extends AbstractConfig {
Type.LONG,
9 * 60 * 1000,
Importance.MEDIUM,
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC);
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)

// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
.withClientSaslSupport();

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.GroupMetadataRequest;
import org.apache.kafka.common.requests.GroupMetadataResponse;
import org.apache.kafka.common.requests.GroupCoordinatorRequest;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
Expand Down Expand Up @@ -450,8 +450,8 @@ private RequestFuture<Void> sendGroupMetadataRequest() {
} else {
// create a group metadata request
log.debug("Issuing group metadata request to broker {}", node.id());
GroupMetadataRequest metadataRequest = new GroupMetadataRequest(this.groupId);
return client.send(node, ApiKeys.GROUP_METADATA, metadataRequest)
GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
.compose(new RequestFutureAdapter<ClientResponse, Void>() {
@Override
public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
Expand All @@ -472,14 +472,14 @@ private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void
// We already found the coordinator, so ignore the request
future.complete(null);
} else {
GroupMetadataResponse groupMetadataResponse = new GroupMetadataResponse(resp.responseBody());
GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
if (groupMetadataResponse.errorCode() == Errors.NONE.code()) {
this.coordinator = new Node(Integer.MAX_VALUE - groupMetadataResponse.node().id(),
groupMetadataResponse.node().host(),
groupMetadataResponse.node().port());
if (groupCoordinatorResponse.errorCode() == Errors.NONE.code()) {
this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());

client.tryConnect(coordinator);

Expand All @@ -488,7 +488,7 @@ private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void
heartbeatTask.reset();
future.complete(null);
} else {
future.raise(Errors.forCode(groupMetadataResponse.errorCode()));
future.raise(Errors.forCode(groupCoordinatorResponse.errorCode()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public ConsumerCoordinator(ConsumerNetworkClient client,

@Override
public String protocolType() {
return "consumer";
return ConsumerProtocol.PROTOCOL_TYPE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
*/
public class ConsumerProtocol {

public static final String PROTOCOL_TYPE = "consumer";

public static final String VERSION_KEY_NAME = "version";
public static final String TOPICS_KEY_NAME = "topics";
public static final String TOPIC_KEY_NAME = "topic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,22 @@
*/
package org.apache.kafka.clients.producer;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.serialization.Serializer;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;

/**
* Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a
* href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka documentation</a>
Expand Down Expand Up @@ -262,32 +261,33 @@ public class ProducerConfig extends AbstractConfig {
atLeast(1),
Importance.LOW,
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
.define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
.define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
.define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
.define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
.define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
.define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
.define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
.define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
.define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
.define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
.define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG,
Type.CLASS,
Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
9 * 60 * 1000,
Importance.MEDIUM,
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(PARTITIONER_CLASS_CONFIG,
Type.CLASS,
DefaultPartitioner.class.getName(),
Importance.MEDIUM, PARTITIONER_CLASS_DOC)

// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
.withClientSaslSupport();

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,23 @@ public ConfigDef define(String name, Type type, Importance importance, String do
return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
}

/**
* Add standard SSL client configuration options.
* @return this
*/
public ConfigDef withClientSslSupport() {
SslConfigs.addClientSslSupport(this);
return this;
}

/**
* Add standard SASL client configuration options.
* @return this
*/
public ConfigDef withClientSaslSupport() {
SaslConfigs.addClientSaslSupport(this);
return this;
}

/**
* Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,12 @@ public class SaslConfigs {
"By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>.";
public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT");

public static void addClientSaslSupport(ConfigDef config) {
config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
.define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
.define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC);
}

}
Loading