diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index b366efdc69544..4131352fcd430 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -17,8 +17,6 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.Deserializer; import java.util.HashMap; @@ -286,27 +284,6 @@ public class ConsumerConfig extends AbstractConfig { Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) - .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) - .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) - .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) - .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false) - .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false) - .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) - .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) - .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false) - .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) - .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false) - .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) - .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) - .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) - .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) - .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) - .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) - .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false) - .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC) - .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC) - .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC) - .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC) .define(REQUEST_TIMEOUT_MS_CONFIG, Type.INT, 40 * 1000, @@ -318,7 +295,16 @@ public class ConsumerConfig extends AbstractConfig { Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, - CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC); + CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) + + // security support + .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + Type.STRING, + CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, + Importance.MEDIUM, + CommonClientConfigs.SECURITY_PROTOCOL_DOC) + .withClientSslSupport() + .withClientSaslSupport(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index d8f3c25ec907b..9cf825c91e8d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -30,8 +30,8 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.GroupMetadataRequest; -import org.apache.kafka.common.requests.GroupMetadataResponse; +import org.apache.kafka.common.requests.GroupCoordinatorRequest; +import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupRequest; @@ -450,8 +450,8 @@ private RequestFuture sendGroupMetadataRequest() { } else { // create a group metadata request log.debug("Issuing group metadata request to broker {}", node.id()); - GroupMetadataRequest metadataRequest = new GroupMetadataRequest(this.groupId); - return client.send(node, ApiKeys.GROUP_METADATA, metadataRequest) + GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId); + return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) .compose(new RequestFutureAdapter() { @Override public void onSuccess(ClientResponse response, RequestFuture future) { @@ -472,14 +472,14 @@ private void handleGroupMetadataResponse(ClientResponse resp, RequestFutureKafka documentation @@ -262,32 +261,33 @@ public class ProducerConfig extends AbstractConfig { atLeast(1), Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) - .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) - .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) - .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) - .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) - .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false) - .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false) - .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) - .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) - .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false) - .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) - .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false) - .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) - .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) - .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) - .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) - .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) - .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) - .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false) - .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC) - .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC) - .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC) - .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC) + .define(KEY_SERIALIZER_CLASS_CONFIG, + Type.CLASS, + Importance.HIGH, + KEY_SERIALIZER_CLASS_DOC) + .define(VALUE_SERIALIZER_CLASS_CONFIG, + Type.CLASS, + Importance.HIGH, + VALUE_SERIALIZER_CLASS_DOC) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ - .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) - .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC); + .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, + Type.LONG, + 9 * 60 * 1000, + Importance.MEDIUM, + CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) + .define(PARTITIONER_CLASS_CONFIG, + Type.CLASS, + DefaultPartitioner.class.getName(), + Importance.MEDIUM, PARTITIONER_CLASS_DOC) + + // security support + .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + Type.STRING, + CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, + Importance.MEDIUM, + CommonClientConfigs.SECURITY_PROTOCOL_DOC) + .withClientSslSupport() + .withClientSaslSupport(); } diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 2e820ddaa7566..f01ed2840262f 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -151,6 +151,23 @@ public ConfigDef define(String name, Type type, Importance importance, String do return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required); } + /** + * Add standard SSL client configuration options. + * @return this + */ + public ConfigDef withClientSslSupport() { + SslConfigs.addClientSslSupport(this); + return this; + } + + /** + * Add standard SASL client configuration options. + * @return this + */ + public ConfigDef withClientSaslSupport() { + SaslConfigs.addClientSaslSupport(this); + return this; + } /** * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java index 657c6d333f3f7..0046868e38b85 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java @@ -49,4 +49,12 @@ public class SaslConfigs { "By default, principal names of the form /@ are mapped to ."; public static final List DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT"); + public static void addClientSaslSupport(ConfigDef config) { + config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false) + .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC) + .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC) + .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC) + .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC); + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index 60e1eb3354130..8f93301b505ec 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -96,4 +96,21 @@ public class SslConfigs { + " unlike requested , if this option is set client can choose not to provide authentication information about itself" + "
  • ssl.client.auth=none This means client authentication is not needed."; + public static void addClientSslSupport(ConfigDef config) { + config.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) + .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) + .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false) + .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false) + .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) + .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) + .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false) + .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) + .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false) + .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) + .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) + .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) + .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) + .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) + .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index af7b266c016ec..5bd3c9666d706 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -30,11 +30,13 @@ public enum ApiKeys { CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"), OFFSET_COMMIT(8, "OffsetCommit"), OFFSET_FETCH(9, "OffsetFetch"), - GROUP_METADATA(10, "GroupMetadata"), + GROUP_COORDINATOR(10, "GroupCoordinator"), JOIN_GROUP(11, "JoinGroup"), HEARTBEAT(12, "Heartbeat"), LEAVE_GROUP(13, "LeaveGroup"), - SYNC_GROUP(14, "SyncGroup"); + SYNC_GROUP(14, "SyncGroup"), + DESCRIBE_GROUPS(15, "DescribeGroups"), + LIST_GROUPS(16, "ListGroups"); private static ApiKeys[] codeToType; public static final int MAX_API_KEY; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 2c9cb209b9274..d4eb1f98c322f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -86,7 +86,8 @@ public enum Errors { new ApiException("The session timeout is not within an acceptable range.")), INVALID_COMMIT_OFFSET_SIZE(28, new ApiException("The committing offset data size is not valid")), - AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")), + AUTHORIZATION_FAILED(29, + new ApiException("Request is not authorized.")), REBALANCE_IN_PROGRESS(30, new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed.")); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 36094b0251530..00560db3c7302 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -388,18 +388,71 @@ public class Protocol { public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1}; public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1}; - /* Consumer metadata api */ - public static final Schema GROUP_METADATA_REQUEST_V0 = new Schema(new Field("group_id", - STRING, - "The unique group id.")); + /* List groups api */ + public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema(); - public static final Schema GROUP_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16), - new Field("coordinator", - BROKER, - "Host and port information for the coordinator for a consumer group.")); + public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING), + new Field("protocol_type", STRING)); + public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16), + new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); - public static final Schema[] GROUP_METADATA_REQUEST = new Schema[] {GROUP_METADATA_REQUEST_V0}; - public static final Schema[] GROUP_METADATA_RESPONSE = new Schema[] {GROUP_METADATA_RESPONSE_V0}; + public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0}; + public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0}; + + /* Describe group api */ + public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids", + new ArrayOf(STRING), + "List of groupIds to request metadata for (an empty groupId array will return empty group metadata).")); + + public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", + STRING, + "The memberId assigned by the coordinator"), + new Field("client_id", + STRING, + "The client id used in the member's latest join group request"), + new Field("client_host", + STRING, + "The client host used in the request session corresponding to the member's join group."), + new Field("member_metadata", + BYTES, + "The metadata corresponding to the current group protocol in use (will only be present if the group is stable)."), + new Field("member_assignment", + BYTES, + "The current assignment provided by the group leader (will only be present if the group is stable).")); + + public static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(new Field("error_code", INT16), + new Field("group_id", + STRING), + new Field("state", + STRING, + "The current state of the group (one of: Dead, Stable, AwaitingSync, or PreparingRebalance, or empty if there is no active group)"), + new Field("protocol_type", + STRING, + "The current group protocol type (will be empty if the there is no active group)"), + new Field("protocol", + STRING, + "The current group protocol (only provided if the group is Stable)"), + new Field("members", + new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0), + "Current group members (only provided if the group is not Dead)")); + + public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0))); + + public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0}; + public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0}; + + /* Group coordinator api */ + public static final Schema GROUP_COORDINATOR_REQUEST_V0 = new Schema(new Field("group_id", + STRING, + "The unique group id.")); + + public static final Schema GROUP_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error_code", INT16), + new Field("coordinator", + BROKER, + "Host and port information for the coordinator for a consumer group.")); + + public static final Schema[] GROUP_COORDINATOR_REQUEST = new Schema[] {GROUP_COORDINATOR_REQUEST_V0}; + public static final Schema[] GROUP_COORDINATOR_RESPONSE = new Schema[] {GROUP_COORDINATOR_RESPONSE_V0}; /* Controlled shutdown api */ public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id", @@ -616,12 +669,13 @@ public class Protocol { REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST; REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST; - REQUESTS[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_REQUEST; + REQUESTS[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_REQUEST; REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST; REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST; REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST; REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST; - + REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST; + REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; @@ -633,11 +687,13 @@ public class Protocol { RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE; RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE; - RESPONSES[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_RESPONSE; + RESPONSES[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_RESPONSE; RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE; RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE; RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE; RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE; + RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE; + RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE; /* set the maximum version of each api */ for (ApiKeys api : ApiKeys.values()) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index ef2525e507c8c..54c3debd0fbc7 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -98,6 +98,14 @@ public Struct getStruct(String name) { return (Struct) get(name); } + public Byte getByte(Field field) { + return (Byte) get(field); + } + + public byte getByte(String name) { + return (Byte) get(name); + } + public Short getShort(Field field) { return (Short) get(field); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 03e77a5ea2a45..8dfa3f68e15dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -49,8 +49,8 @@ public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffe return OffsetCommitRequest.parse(buffer, versionId); case OFFSET_FETCH: return OffsetFetchRequest.parse(buffer, versionId); - case GROUP_METADATA: - return GroupMetadataRequest.parse(buffer, versionId); + case GROUP_COORDINATOR: + return GroupCoordinatorRequest.parse(buffer, versionId); case JOIN_GROUP: return JoinGroupRequest.parse(buffer, versionId); case HEARTBEAT: @@ -67,6 +67,10 @@ public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffe return UpdateMetadataRequest.parse(buffer, versionId); case LEADER_AND_ISR: return LeaderAndIsrRequest.parse(buffer, versionId); + case DESCRIBE_GROUPS: + return DescribeGroupsRequest.parse(buffer, versionId); + case LIST_GROUPS: + return ListGroupsRequest.parse(buffer, versionId); default: return null; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java new file mode 100644 index 0000000000000..a545cca861487 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class DescribeGroupsRequest extends AbstractRequest { + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.DESCRIBE_GROUPS.id); + private static final String GROUP_IDS_KEY_NAME = "group_ids"; + + private final List groupIds; + + public DescribeGroupsRequest(List groupIds) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray()); + this.groupIds = groupIds; + } + + public DescribeGroupsRequest(Struct struct) { + super(struct); + this.groupIds = new ArrayList<>(); + for (Object groupId : struct.getArray(GROUP_IDS_KEY_NAME)) + this.groupIds.add((String) groupId); + } + + public List groupIds() { + return groupIds; + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds); + + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.DESCRIBE_GROUPS.id))); + } + } + + public static DescribeGroupsRequest parse(ByteBuffer buffer, int versionId) { + return new DescribeGroupsRequest(ProtoUtils.parseRequest(ApiKeys.DESCRIBE_GROUPS.id, versionId, buffer)); + } + + public static DescribeGroupsRequest parse(ByteBuffer buffer) { + return new DescribeGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer)); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java new file mode 100644 index 0000000000000..c71e7d225c7d6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DescribeGroupsResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.DESCRIBE_GROUPS.id); + + private static final String GROUPS_KEY_NAME = "groups"; + + private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final String GROUP_ID_KEY_NAME = "group_id"; + private static final String GROUP_STATE_KEY_NAME = "state"; + private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; + private static final String PROTOCOL_KEY_NAME = "protocol"; + + private static final String MEMBERS_KEY_NAME = "members"; + private static final String MEMBER_ID_KEY_NAME = "member_id"; + private static final String CLIENT_ID_KEY_NAME = "client_id"; + private static final String CLIENT_HOST_KEY_NAME = "client_host"; + private static final String MEMBER_METADATA_KEY_NAME = "member_metadata"; + private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment"; + + public static final String UNKNOWN_STATE = ""; + public static final String UNKNOWN_PROTOCOL_TYPE = ""; + public static final String UNKNOWN_PROTOCOL = ""; + + /** + * Possible per-group error codes: + * + * GROUP_LOAD_IN_PROGRESS (14) + * GROUP_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_GROUP (16) + * AUTHORIZATION_FAILED (29) + */ + + private final Map groups; + + public DescribeGroupsResponse(Map groups) { + super(new Struct(CURRENT_SCHEMA)); + + List groupStructs = new ArrayList<>(); + for (Map.Entry groupEntry : groups.entrySet()) { + Struct groupStruct = struct.instance(GROUPS_KEY_NAME); + GroupMetadata group = groupEntry.getValue(); + groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey()); + groupStruct.set(ERROR_CODE_KEY_NAME, group.errorCode); + groupStruct.set(GROUP_STATE_KEY_NAME, group.state); + groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType); + groupStruct.set(PROTOCOL_KEY_NAME, group.protocol); + List membersList = new ArrayList<>(); + for (GroupMember member : group.members) { + Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME); + memberStruct.set(MEMBER_ID_KEY_NAME, member.memberId); + memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId); + memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost); + memberStruct.set(MEMBER_METADATA_KEY_NAME, member.memberMetadata); + memberStruct.set(MEMBER_ASSIGNMENT_KEY_NAME, member.memberAssignment); + membersList.add(memberStruct); + } + groupStruct.set(MEMBERS_KEY_NAME, membersList.toArray()); + groupStructs.add(groupStruct); + } + struct.set(GROUPS_KEY_NAME, groupStructs.toArray()); + this.groups = groups; + } + + public DescribeGroupsResponse(Struct struct) { + super(struct); + this.groups = new HashMap<>(); + for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { + Struct groupStruct = (Struct) groupObj; + + String groupId = groupStruct.getString(GROUP_ID_KEY_NAME); + short errorCode = groupStruct.getShort(ERROR_CODE_KEY_NAME); + String state = groupStruct.getString(GROUP_STATE_KEY_NAME); + String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME); + String protocol = groupStruct.getString(PROTOCOL_KEY_NAME); + + List members = new ArrayList<>(); + for (Object memberObj : groupStruct.getArray(MEMBERS_KEY_NAME)) { + Struct memberStruct = (Struct) memberObj; + String memberId = memberStruct.getString(MEMBER_ID_KEY_NAME); + String clientId = memberStruct.getString(CLIENT_ID_KEY_NAME); + String clientHost = memberStruct.getString(CLIENT_HOST_KEY_NAME); + ByteBuffer memberMetadata = memberStruct.getBytes(MEMBER_METADATA_KEY_NAME); + ByteBuffer memberAssignment = memberStruct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME); + members.add(new GroupMember(memberId, clientId, clientHost, + memberMetadata, memberAssignment)); + } + this.groups.put(groupId, new GroupMetadata(errorCode, state, protocolType, protocol, members)); + } + } + + public Map groups() { + return groups; + } + + + public static class GroupMetadata { + private final short errorCode; + private final String state; + private final String protocolType; + private final String protocol; + private final List members; + + public GroupMetadata(short errorCode, + String state, + String protocolType, + String protocol, + List members) { + this.errorCode = errorCode; + this.state = state; + this.protocolType = protocolType; + this.protocol = protocol; + this.members = members; + } + + public short errorCode() { + return errorCode; + } + + public String state() { + return state; + } + + public String protocolType() { + return protocolType; + } + + public String protocol() { + return protocol; + } + + public List members() { + return members; + } + + public static GroupMetadata forError(Errors error) { + return new DescribeGroupsResponse.GroupMetadata( + error.code(), + DescribeGroupsResponse.UNKNOWN_STATE, + DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE, + DescribeGroupsResponse.UNKNOWN_PROTOCOL, + Collections.emptyList()); + } + } + + public static class GroupMember { + private final String memberId; + private final String clientId; + private final String clientHost; + private final ByteBuffer memberMetadata; + private final ByteBuffer memberAssignment; + + public GroupMember(String memberId, + String clientId, + String clientHost, + ByteBuffer memberMetadata, + ByteBuffer memberAssignment) { + this.memberId = memberId; + this.clientId = clientId; + this.clientHost = clientHost; + this.memberMetadata = memberMetadata; + this.memberAssignment = memberAssignment; + } + + public String memberId() { + return memberId; + } + + public String clientId() { + return clientId; + } + + public String clientHost() { + return clientHost; + } + + public ByteBuffer memberMetadata() { + return memberMetadata; + } + + public ByteBuffer memberAssignment() { + return memberAssignment; + } + } + + public static DescribeGroupsResponse parse(ByteBuffer buffer) { + return new DescribeGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer)); + } + + public static DescribeGroupsResponse fromError(Errors error, List groupIds) { + GroupMetadata errorMetadata = GroupMetadata.forError(error); + Map groups = new HashMap<>(); + for (String groupId : groupIds) + groups.put(groupId, errorMetadata); + return new DescribeGroupsResponse(groups); + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java similarity index 72% rename from clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java index fd54c5adcf278..8c56e7faea832 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java @@ -21,21 +21,21 @@ import java.nio.ByteBuffer; -public class GroupMetadataRequest extends AbstractRequest { +public class GroupCoordinatorRequest extends AbstractRequest { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_METADATA.id); + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_COORDINATOR.id); private static final String GROUP_ID_KEY_NAME = "group_id"; private final String groupId; - public GroupMetadataRequest(String groupId) { + public GroupCoordinatorRequest(String groupId) { super(new Struct(CURRENT_SCHEMA)); struct.set(GROUP_ID_KEY_NAME, groupId); this.groupId = groupId; } - public GroupMetadataRequest(Struct struct) { + public GroupCoordinatorRequest(Struct struct) { super(struct); groupId = struct.getString(GROUP_ID_KEY_NAME); } @@ -44,10 +44,10 @@ public GroupMetadataRequest(Struct struct) { public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { switch (versionId) { case 0: - return new GroupMetadataResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode()); + return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_METADATA.id))); + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_COORDINATOR.id))); } } @@ -55,11 +55,11 @@ public String groupId() { return groupId; } - public static GroupMetadataRequest parse(ByteBuffer buffer, int versionId) { - return new GroupMetadataRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_METADATA.id, versionId, buffer)); + public static GroupCoordinatorRequest parse(ByteBuffer buffer, int versionId) { + return new GroupCoordinatorRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_COORDINATOR.id, versionId, buffer)); } - public static GroupMetadataRequest parse(ByteBuffer buffer) { - return new GroupMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer)); + public static GroupCoordinatorRequest parse(ByteBuffer buffer) { + return new GroupCoordinatorRequest((Struct) CURRENT_SCHEMA.read(buffer)); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java similarity index 86% rename from clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java index a5ef4785292c1..c28de70bdf482 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java @@ -20,9 +20,9 @@ import java.nio.ByteBuffer; -public class GroupMetadataResponse extends AbstractRequestResponse { +public class GroupCoordinatorResponse extends AbstractRequestResponse { - private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_METADATA.id); + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id); private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String COORDINATOR_KEY_NAME = "coordinator"; @@ -34,7 +34,7 @@ public class GroupMetadataResponse extends AbstractRequestResponse { private final short errorCode; private final Node node; - public GroupMetadataResponse(short errorCode, Node node) { + public GroupCoordinatorResponse(short errorCode, Node node) { super(new Struct(CURRENT_SCHEMA)); struct.set(ERROR_CODE_KEY_NAME, errorCode); Struct coordinator = struct.instance(COORDINATOR_KEY_NAME); @@ -46,7 +46,7 @@ public GroupMetadataResponse(short errorCode, Node node) { this.node = node; } - public GroupMetadataResponse(Struct struct) { + public GroupCoordinatorResponse(Struct struct) { super(struct); errorCode = struct.getShort(ERROR_CODE_KEY_NAME); Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME); @@ -64,7 +64,7 @@ public Node node() { return node; } - public static GroupMetadataResponse parse(ByteBuffer buffer) { - return new GroupMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer)); + public static GroupCoordinatorResponse parse(ByteBuffer buffer) { + return new GroupCoordinatorResponse((Struct) CURRENT_SCHEMA.read(buffer)); } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java new file mode 100644 index 0000000000000..439720fd14cd9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.Collections; + +public class ListGroupsRequest extends AbstractRequest { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_GROUPS.id); + + public ListGroupsRequest() { + super(new Struct(CURRENT_SCHEMA)); + } + + public ListGroupsRequest(Struct struct) { + super(struct); + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + short errorCode = Errors.forException(e).code(); + return new ListGroupsResponse(errorCode, Collections.emptyList()); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_GROUPS.id))); + } + } + + public static ListGroupsRequest parse(ByteBuffer buffer, int versionId) { + return new ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, buffer)); + } + + public static ListGroupsRequest parse(ByteBuffer buffer) { + return new ListGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer)); + } + + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java new file mode 100644 index 0000000000000..d07f0d1bc26fc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class ListGroupsResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_GROUPS.id); + + public static final String ERROR_CODE_KEY_NAME = "error_code"; + public static final String GROUPS_KEY_NAME = "groups"; + public static final String GROUP_ID_KEY_NAME = "group_id"; + public static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type"; + + /** + * Possible error codes: + * + * GROUP_COORDINATOR_NOT_AVAILABLE (15) + * AUTHORIZATION_FAILED (29) + */ + + private final short errorCode; + private final List groups; + + public ListGroupsResponse(short errorCode, List groups) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(ERROR_CODE_KEY_NAME, errorCode); + List groupList = new ArrayList<>(); + for (Group group : groups) { + Struct groupStruct = struct.instance(GROUPS_KEY_NAME); + groupStruct.set(GROUP_ID_KEY_NAME, group.groupId); + groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType); + groupList.add(groupStruct); + } + struct.set(GROUPS_KEY_NAME, groupList.toArray()); + this.errorCode = errorCode; + this.groups = groups; + } + + public ListGroupsResponse(Struct struct) { + super(struct); + this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + this.groups = new ArrayList<>(); + for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { + Struct groupStruct = (Struct) groupObj; + String groupId = groupStruct.getString(GROUP_ID_KEY_NAME); + String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME); + this.groups.add(new Group(groupId, protocolType)); + } + } + + public List groups() { + return groups; + } + + public short errorCode() { + return errorCode; + } + + public static class Group { + private final String groupId; + private final String protocolType; + + public Group(String groupId, String protocolType) { + this.groupId = groupId; + this.protocolType = protocolType; + } + + public String groupId() { + return groupId; + } + + public String protocolType() { + return protocolType; + } + + } + + public static ListGroupsResponse parse(ByteBuffer buffer) { + return new ListGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer)); + } + + public static ListGroupsResponse fromError(Errors error) { + return new ListGroupsResponse(error.code(), Collections.emptyList()); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 2029e92a48c3e..8667f2277bb2c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -32,7 +32,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.GroupMetadataResponse; +import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; @@ -729,7 +729,7 @@ public void testRefreshOffsetWithNoFetchableOffsets() { } private Struct consumerMetadataResponse(Node node, short error) { - GroupMetadataResponse response = new GroupMetadataResponse(error, node); + GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node); return response.toStruct(); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index fb21802d5ea95..761b9dbb218f4 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -28,6 +28,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,9 +44,9 @@ public void testSerialization() throws Exception { List requestResponseList = Arrays.asList( createRequestHeader(), createResponseHeader(), - createConsumerMetadataRequest(), - createConsumerMetadataRequest().getErrorResponse(0, new UnknownServerException()), - createConsumerMetadataResponse(), + createGroupCoordinatorRequest(), + createGroupCoordinatorRequest().getErrorResponse(0, new UnknownServerException()), + createGroupCoordinatorResponse(), createControlledShutdownRequest(), createControlledShutdownResponse(), createControlledShutdownRequest().getErrorResponse(1, new UnknownServerException()), @@ -61,6 +62,12 @@ public void testSerialization() throws Exception { createLeaveGroupRequest(), createLeaveGroupRequest().getErrorResponse(0, new UnknownServerException()), createLeaveGroupResponse(), + createListGroupsRequest(), + createListGroupsRequest().getErrorResponse(0, new UnknownServerException()), + createListGroupsResponse(), + createDescribeGroupRequest(), + createDescribeGroupRequest().getErrorResponse(0, new UnknownServerException()), + createDescribeGroupResponse(), createListOffsetRequest(), createListOffsetRequest().getErrorResponse(0, new UnknownServerException()), createListOffsetResponse(), @@ -150,12 +157,12 @@ private AbstractRequestResponse createResponseHeader() { return new ResponseHeader(10); } - private AbstractRequest createConsumerMetadataRequest() { - return new GroupMetadataRequest("test-group"); + private AbstractRequest createGroupCoordinatorRequest() { + return new GroupCoordinatorRequest("test-group"); } - private AbstractRequestResponse createConsumerMetadataResponse() { - return new GroupMetadataResponse(Errors.NONE.code(), new Node(10, "host1", 2014)); + private AbstractRequestResponse createGroupCoordinatorResponse() { + return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014)); } private AbstractRequest createFetchRequest() { @@ -193,6 +200,30 @@ private AbstractRequestResponse createJoinGroupResponse() { return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members); } + private AbstractRequest createListGroupsRequest() { + return new ListGroupsRequest(); + } + + private AbstractRequestResponse createListGroupsResponse() { + List groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer")); + return new ListGroupsResponse(Errors.NONE.code(), groups); + } + + private AbstractRequest createDescribeGroupRequest() { + return new DescribeGroupsRequest(Collections.singletonList("test-group")); + } + + private AbstractRequestResponse createDescribeGroupResponse() { + String clientId = "consumer-1"; + String clientHost = "localhost"; + ByteBuffer empty = ByteBuffer.allocate(0); + DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId", + clientId, clientHost, empty, empty); + DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE.code(), + "STABLE", "consumer", "roundrobin", Arrays.asList(member)); + return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata)); + } + private AbstractRequest createLeaveGroupRequest() { return new LeaveGroupRequest("group1", "consumer1"); } diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java index ca5367421fa3b..ac9df446bdf47 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.requests.GroupMetadataResponse; +import org.apache.kafka.common.requests.GroupCoordinatorResponse; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; @@ -386,7 +386,7 @@ public void testLeaderPerformAssignment2() throws Exception { private Struct groupMetadataResponse(Node node, short error) { - GroupMetadataResponse response = new GroupMetadataResponse(error, node); + GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node); return response.toStruct(); } diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala new file mode 100644 index 0000000000000..ddd3114934e00 --- /dev/null +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.admin + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicInteger + +import kafka.common.KafkaException +import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary} +import kafka.utils.Logging +import org.apache.kafka.clients._ +import org.apache.kafka.clients.consumer.internals.{SendFailedException, ConsumerProtocol, ConsumerNetworkClient, RequestFuture} +import org.apache.kafka.common.config.ConfigDef.{Importance, Type} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SaslConfigs, SslConfigs} +import org.apache.kafka.common.errors.DisconnectException +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.Selector +import org.apache.kafka.common.protocol.types.Struct +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.{SystemTime, Time, Utils} +import org.apache.kafka.common.{TopicPartition, Cluster, Node} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +class AdminClient(val time: Time, + val requestTimeoutMs: Int, + val client: ConsumerNetworkClient, + val bootstrapBrokers: List[Node]) extends Logging { + + private def send(target: Node, + api: ApiKeys, + request: AbstractRequest): Struct = { + var now = time.milliseconds() + val deadline = now + requestTimeoutMs + var future: RequestFuture[ClientResponse] = null + + do { + future = client.send(target, api, request) + client.poll(future) + + if (future.succeeded()) + return if (future.value().wasDisconnected()) { + throw new DisconnectException() + } else { + future.value().responseBody() + } + + now = time.milliseconds() + } while (now < deadline && future.exception().isInstanceOf[SendFailedException]) + + throw future.exception() + } + + private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = { + bootstrapBrokers.foreach { + case broker => + try { + return send(broker, api, request) + } catch { + case e: Exception => + debug(s"Request ${api} failed against node ${broker}", e) + } + } + throw new RuntimeException(s"Request ${api} failed on brokers ${bootstrapBrokers}") + } + + private def findCoordinator(groupId: String): Node = { + val request = new GroupCoordinatorRequest(groupId) + val responseBody = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request) + val response = new GroupCoordinatorResponse(responseBody) + Errors.forCode(response.errorCode()).maybeThrow() + response.node() + } + + def listGroups(node: Node): List[GroupOverview] = { + val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest()) + val response = new ListGroupsResponse(responseBody) + Errors.forCode(response.errorCode()).maybeThrow() + response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList + } + + private def findAllBrokers(): List[Node] = { + val request = new MetadataRequest(List[String]()) + val responseBody = sendAnyNode(ApiKeys.METADATA, request) + val response = new MetadataResponse(responseBody) + if (!response.errors().isEmpty) + debug(s"Metadata request contained errors: ${response.errors()}") + response.cluster().nodes().asScala.toList + } + + def listAllGroups(): Map[Node, List[GroupOverview]] = { + findAllBrokers.map { + case broker => + broker -> { + try { + listGroups(broker) + } catch { + case e: Exception => + debug(s"Failed to find groups from broker ${broker}", e) + List[GroupOverview]() + } + } + }.toMap + } + + def listAllConsumerGroups(): Map[Node, List[GroupOverview]] = { + listAllGroups().mapValues { groups => + groups.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE) + } + } + + def listAllGroupsFlattened(): List[GroupOverview] = { + listAllGroups.values.flatten.toList + } + + def listAllConsumerGroupsFlattened(): List[GroupOverview] = { + listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE) + } + + def describeGroup(groupId: String): GroupSummary = { + val coordinator = findCoordinator(groupId) + val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava)) + val response = new DescribeGroupsResponse(responseBody) + val metadata = response.groups().get(groupId) + if (metadata == null) + throw new KafkaException(s"Response from broker contained no metadata for group ${groupId}") + + Errors.forCode(metadata.errorCode()).maybeThrow() + val members = metadata.members().map { + case member => + val metadata = Utils.readBytes(member.memberMetadata()) + val assignment = Utils.readBytes(member.memberAssignment()) + MemberSummary(member.memberId(), member.clientId(), member.clientHost(), metadata, assignment) + }.toList + GroupSummary(metadata.state(), metadata.protocolType(), metadata.protocol(), members) + } + + def describeConsumerGroup(groupId: String): Map[String, List[TopicPartition]] = { + val group = describeGroup(groupId) + if (group.protocolType != ConsumerProtocol.PROTOCOL_TYPE) + throw new IllegalArgumentException(s"Group ${groupId} is not a consumer group") + + group.members.map { + case member => + val assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment)) + member.memberId -> assignment.partitions().asScala.toList + }.toMap + } + +} + +object AdminClient { + val DefaultConnectionMaxIdleMs = 9 * 60 * 1000 + val DefaultRequestTimeoutMs = 5000 + val DefaultMaxInFlightRequestsPerConnection = 100 + val DefaultReconnectBackoffMs = 50 + val DefaultSendBufferBytes = 128 * 1024 + val DefaultReceiveBufferBytes = 32 * 1024 + val DefaultRetryBackoffMs = 100 + val AdminClientIdSequence = new AtomicInteger(1) + val AdminConfigDef = { + val config = new ConfigDef() + .define( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + Type.LIST, + Importance.HIGH, + CommonClientConfigs.BOOSTRAP_SERVERS_DOC) + .define( + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + ConfigDef.Type.STRING, + CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, + ConfigDef.Importance.MEDIUM, + CommonClientConfigs.SECURITY_PROTOCOL_DOC) + .withClientSslSupport() + .withClientSaslSupport() + config + } + + class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals, false) + + def createSimplePlaintext(brokerUrl: String): AdminClient = { + val config = Map(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokerUrl) + create(new AdminConfig(config)) + } + + def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props)) + + def create(config: AdminConfig): AdminClient = { + val time = new SystemTime + val metrics = new Metrics(time) + val metadata = new Metadata + val channelBuilder = ClientUtils.createChannelBuilder(config.values()) + + val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls) + val bootstrapCluster = Cluster.bootstrap(brokerAddresses) + metadata.update(bootstrapCluster, 0) + + val selector = new Selector( + DefaultConnectionMaxIdleMs, + metrics, + time, + "admin", + Map[String, String](), + channelBuilder) + + val networkClient = new NetworkClient( + selector, + metadata, + "admin-" + AdminClientIdSequence.getAndIncrement(), + DefaultMaxInFlightRequestsPerConnection, + DefaultReconnectBackoffMs, + DefaultSendBufferBytes, + DefaultReceiveBufferBytes, + DefaultRequestTimeoutMs, + time) + + val highLevelClient = new ConsumerNetworkClient( + networkClient, + metadata, + time, + DefaultRetryBackoffMs) + + new AdminClient( + time, + DefaultRequestTimeoutMs, + highLevelClient, + bootstrapCluster.nodes().asScala.toList) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala similarity index 80% rename from core/src/main/scala/kafka/api/GroupMetadataRequest.scala rename to core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala index 075ddb5bf2f3c..43e78f54c948d 100644 --- a/core/src/main/scala/kafka/api/GroupMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala @@ -23,7 +23,7 @@ import kafka.common.ErrorMapping import kafka.network.{RequestOrResponseSend, RequestChannel} import kafka.network.RequestChannel.Response -object GroupMetadataRequest { +object GroupCoordinatorRequest { val CurrentVersion = 0.shortValue val DefaultClientId = "" @@ -35,16 +35,16 @@ object GroupMetadataRequest { // request val group = ApiUtils.readShortString(buffer) - GroupMetadataRequest(group, versionId, correlationId, clientId) + GroupCoordinatorRequest(group, versionId, correlationId, clientId) } } -case class GroupMetadataRequest(group: String, - versionId: Short = GroupMetadataRequest.CurrentVersion, - correlationId: Int = 0, - clientId: String = GroupMetadataRequest.DefaultClientId) - extends RequestOrResponse(Some(RequestKeys.GroupMetadataKey)) { +case class GroupCoordinatorRequest(group: String, + versionId: Short = GroupCoordinatorRequest.CurrentVersion, + correlationId: Int = 0, + clientId: String = GroupCoordinatorRequest.DefaultClientId) + extends RequestOrResponse(Some(RequestKeys.GroupCoordinatorKey)) { def sizeInBytes = 2 + /* versionId */ @@ -64,7 +64,7 @@ case class GroupMetadataRequest(group: String, override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = { // return ConsumerCoordinatorNotAvailable for all uncaught errors - val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId) + val errorResponse = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId) requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) } diff --git a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala similarity index 79% rename from core/src/main/scala/kafka/api/GroupMetadataResponse.scala rename to core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala index 2d65917f7fad2..4cd7db895ba86 100644 --- a/core/src/main/scala/kafka/api/GroupMetadataResponse.scala +++ b/core/src/main/scala/kafka/api/GroupCoordinatorResponse.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import kafka.cluster.BrokerEndPoint import kafka.common.ErrorMapping -object GroupMetadataResponse { +object GroupCoordinatorResponse { val CurrentVersion = 0 private val NoBrokerEndpointOpt = Some(BrokerEndPoint(id = -1, host = "", port = -1)) @@ -35,23 +35,23 @@ object GroupMetadataResponse { else None - GroupMetadataResponse(coordinatorOpt, errorCode, correlationId) + GroupCoordinatorResponse(coordinatorOpt, errorCode, correlationId) } } -case class GroupMetadataResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int) +case class GroupCoordinatorResponse (coordinatorOpt: Option[BrokerEndPoint], errorCode: Short, correlationId: Int) extends RequestOrResponse() { def sizeInBytes = 4 + /* correlationId */ 2 + /* error code */ - coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).get.sizeInBytes + coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).get.sizeInBytes def writeTo(buffer: ByteBuffer) { buffer.putInt(correlationId) buffer.putShort(errorCode) - coordinatorOpt.orElse(GroupMetadataResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer)) + coordinatorOpt.orElse(GroupCoordinatorResponse.NoBrokerEndpointOpt).foreach(_.writeTo(buffer)) } def describe(details: Boolean) = toString diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index 669b63a765560..2363099f02ef7 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -33,12 +33,17 @@ object RequestKeys { val ControlledShutdownKey: Short = 7 val OffsetCommitKey: Short = 8 val OffsetFetchKey: Short = 9 - val GroupMetadataKey: Short = 10 + val GroupCoordinatorKey: Short = 10 val JoinGroupKey: Short = 11 val HeartbeatKey: Short = 12 val LeaveGroupKey: Short = 13 val SyncGroupKey: Short = 14 + val DescribeGroupsKey: Short = 15 + val ListGroupsKey: Short = 16 + // NOTE: this map only includes the server-side request/response handlers. Newer + // request types should only use the client-side versions which are parsed with + // o.a.k.common.requests.AbstractRequest.getRequest() val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]= Map(ProduceKey -> ("Produce", ProducerRequest.readFrom), FetchKey -> ("Fetch", FetchRequest.readFrom), @@ -49,8 +54,7 @@ object RequestKeys { UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom), ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom), OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom), - OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom), - GroupMetadataKey -> ("GroupMetadata", GroupMetadataRequest.readFrom) + OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom) ) def nameForKey(key: Short): String = { diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 36b5b3bc21b95..2f836c05d7327 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -151,9 +151,9 @@ object ClientUtils extends Logging{ if (!queryChannel.isConnected) queryChannel = channelToAnyBroker(zkUtils) debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group)) - queryChannel.send(GroupMetadataRequest(group)) + queryChannel.send(GroupCoordinatorRequest(group)) val response = queryChannel.receive() - val consumerMetadataResponse = GroupMetadataResponse.readFrom(response.payload()) + val consumerMetadataResponse = GroupCoordinatorResponse.readFrom(response.payload()) debug("Consumer metadata response: " + consumerMetadataResponse.toString) if (consumerMetadataResponse.errorCode == ErrorMapping.NoError) coordinatorOpt = consumerMetadataResponse.coordinatorOpt diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 81cb51b79e3a0..6f53fac2d8a0a 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -60,6 +60,7 @@ object ErrorMapping { // 27: COMMITTING_PARTITIONS_NOT_ASSIGNED // 28: INVALID_COMMIT_OFFSET_SIZE val AuthorizationCode: Short = 29 + // 30: REBALANCE_IN_PROGRESS private val exceptionToCode = Map[Class[Throwable], Short]( diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 5b1aead973fc4..e15aca4b4bac9 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -112,9 +112,9 @@ class SimpleConsumer(val host: String, TopicMetadataResponse.readFrom(response.payload()) } - def send(request: GroupMetadataRequest): GroupMetadataResponse = { + def send(request: GroupCoordinatorRequest): GroupCoordinatorResponse = { val response = sendRequest(request) - GroupMetadataResponse.readFrom(response.payload()) + GroupCoordinatorResponse.readFrom(response.payload()) } /** diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index 97ce22be640e3..2015371a7d6db 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -107,6 +107,7 @@ class GroupCoordinator(val brokerId: Int, def handleJoinGroup(groupId: String, memberId: String, clientId: String, + clientHost: String, sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte])], @@ -132,10 +133,10 @@ class GroupCoordinator(val brokerId: Int, responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) } else { group = groupManager.addGroup(groupId, protocolType) - doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback) + doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback) } } else { - doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback) + doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback) } } } @@ -143,6 +144,7 @@ class GroupCoordinator(val brokerId: Int, private def doJoinGroup(group: GroupMetadata, memberId: String, clientId: String, + clientHost: String, sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte])], @@ -166,7 +168,7 @@ class GroupCoordinator(val brokerId: Int, case PreparingRebalance => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { - addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback) + addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) } else { val member = group.get(memberId) updateMemberAndRebalance(group, member, protocols, responseCallback) @@ -174,7 +176,7 @@ class GroupCoordinator(val brokerId: Int, case AwaitingSync => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { - addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback) + addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) } else { val member = group.get(memberId) if (member.matches(protocols)) { @@ -201,7 +203,7 @@ class GroupCoordinator(val brokerId: Int, case Stable => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { // if the member id is unknown, register the member to the group - addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback) + addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) } else { val member = group.get(memberId) if (memberId == group.leaderId || !member.matches(protocols)) { @@ -269,13 +271,30 @@ class GroupCoordinator(val brokerId: Int, group.get(memberId).awaitingSyncCallback = responseCallback completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) - // if this is the leader, then we can transition to stable and - // propagate the assignment to any awaiting members + // if this is the leader, then we can attempt to persist state and transition to stable if (memberId == group.leaderId) { - group.transitionTo(Stable) - // persist the group metadata and upon finish propagate the assignment - groupManager.storeGroup(group, groupAssignment) + // fill any missing members with an empty assignment + val missing = group.allMembers -- groupAssignment.keySet + val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap + + // persist the group metadata and upon finish transition to stable and propagate the assignment + groupManager.storeGroup(group, assignment, (errorCode: Short) => { + group synchronized { + // another member may have joined the group while we were awaiting this callback, + // so we must ensure we are still in the AwaitingSync state when it gets invoked. + // if we have transitioned to another state, then we shouldn't do anything + if (group.is(AwaitingSync)) { + if (errorCode != Errors.NONE.code) { + resetAndPropagateAssignmentError(group, errorCode) + maybePrepareRebalance(group) + } else if (group.is(AwaitingSync)) { + setAndPropagateAssignment(group, assignment) + group.transitionTo(Stable) + } + } + } + }) } case Stable => @@ -413,6 +432,34 @@ class GroupCoordinator(val brokerId: Int, } } + def handleListGroups(): (Errors, List[GroupOverview]) = { + if (!isActive.get) { + (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]()) + } else { + val errorCode = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE + (errorCode, groupManager.currentGroups.map(_.overview).toList) + } + } + + def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = { + if (!isActive.get) { + (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup) + } else if (!isCoordinatorForGroup(groupId)) { + (Errors.NOT_COORDINATOR_FOR_GROUP, GroupCoordinator.EmptyGroup) + } else if (isCoordinatorLoadingInProgress(groupId)) { + (Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup) + } else { + val group = groupManager.getGroup(groupId) + if (group == null) { + (Errors.NONE, GroupCoordinator.DeadGroup) + } else { + group synchronized { + (Errors.NONE, group.summary) + } + } + } + } + def handleGroupImmigration(offsetTopicPartitionId: Int) = { groupManager.loadGroupsForPartition(offsetTopicPartitionId) } @@ -421,6 +468,27 @@ class GroupCoordinator(val brokerId: Int, groupManager.removeGroupsForPartition(offsetTopicPartitionId) } + private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) { + assert(group.is(AwaitingSync)) + group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId)) + propagateAssignment(group, Errors.NONE.code) + } + + private def resetAndPropagateAssignmentError(group: GroupMetadata, errorCode: Short) { + assert(group.is(AwaitingSync)) + group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte]) + propagateAssignment(group, errorCode) + } + + private def propagateAssignment(group: GroupMetadata, errorCode: Short) { + for (member <- group.allMemberMetadata) { + if (member.awaitingSyncCallback != null) { + member.awaitingSyncCallback(member.assignment, errorCode) + member.awaitingSyncCallback = null + } + } + } + private def validGroupId(groupId: String): Boolean = { groupId != null && !groupId.isEmpty } @@ -458,12 +526,13 @@ class GroupCoordinator(val brokerId: Int, private def addMemberAndRebalance(sessionTimeoutMs: Int, clientId: String, + clientHost: String, protocols: List[(String, Array[Byte])], group: GroupMetadata, callback: JoinCallback) = { // use the client-id with a random id suffix as the member-id val memberId = clientId + "-" + group.generateMemberIdSuffix - val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, sessionTimeoutMs, protocols) member.awaitingJoinCallback = callback group.add(member.memberId, member) maybePrepareRebalance(group) @@ -488,11 +557,9 @@ class GroupCoordinator(val brokerId: Int, private def prepareRebalance(group: GroupMetadata) { // if any members are awaiting sync, cancel their request and have them rejoin - if (group.is(AwaitingSync)) { - groupManager.propagateAssignment(group, Errors.REBALANCE_IN_PROGRESS.code) - } + if (group.is(AwaitingSync)) + resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code) - group.allMembers.foreach(_.assignment = null) group.transitionTo(PreparingRebalance) info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId)) @@ -544,7 +611,7 @@ class GroupCoordinator(val brokerId: Int, info("Stabilized group %s generation %s".format(group.groupId, group.generationId)) // trigger the awaiting join group response callback for all the members after rebalancing - for (member <- group.allMembers) { + for (member <- group.allMemberMetadata) { assert(member.awaitingJoinCallback != null) val joinResult = JoinGroupResult( members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty }, @@ -595,6 +662,11 @@ class GroupCoordinator(val brokerId: Int, object GroupCoordinator { + val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers) + val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers) + val NoMembers = List[MemberSummary]() + val NoState = "" + val NoProtocolType = "" val NoProtocol = "" val NoLeader = "" diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala index 652a3a4a8d50a..ece9ce0769a01 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala @@ -92,6 +92,20 @@ private object GroupMetadata { PreparingRebalance -> Set(Stable, AwaitingSync)) } +/** + * Case class used to represent group metadata for the ListGroups API + */ +case class GroupOverview(groupId: String, + protocolType: String) + +/** + * Case class used to represent group metadata for the DescribeGroup API + */ +case class GroupSummary(state: String, + protocolType: String, + protocol: String, + members: List[MemberSummary]) + /** * Group contains the following metadata: * @@ -144,7 +158,9 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList - def allMembers = members.values.toList + def allMembers = members.keySet + + def allMemberMetadata = members.values.toList def rebalanceTimeout = members.values.foldLeft(0) {(timeout, member) => timeout.max(member.sessionTimeoutMs) @@ -168,7 +184,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: val candidates = candidateProtocols // let each member vote for one of the protocols and choose the one with the most votes - val votes: List[(String, Int)] = allMembers + val votes: List[(String, Int)] = allMemberMetadata .map(_.vote(candidates)) .groupBy(identity) .mapValues(_.size) @@ -179,7 +195,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: private def candidateProtocols = { // get the set of protocols that are commonly supported by all members - allMembers + allMemberMetadata .map(_.protocols) .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols) } @@ -201,6 +217,20 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap } + def summary: GroupSummary = { + if (is(Stable)) { + val members = this.members.values.map{ member => member.summary(protocol) }.toList + GroupSummary(state.toString, protocolType, protocol, members) + } else { + val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList + GroupSummary(state.toString, protocolType, GroupCoordinator.NoProtocol, members) + } + } + + def overview: GroupOverview = { + GroupOverview(groupId, protocolType) + } + private def assertValidTransition(targetState: GroupState) { if (!GroupMetadata.validPreviousStates(targetState).contains(state)) throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state" diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index 81ed54850d90f..0052b5d4efc52 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -95,12 +95,16 @@ class GroupMetadataManager(val brokerId: Int, } ) + def currentGroups(): Iterable[GroupMetadata] = groupsCache.values + def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId)) def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId)) + def isLoading(): Boolean = loadingPartitions synchronized !loadingPartitions.isEmpty + /** * Get the group associated with the given groupId, or null if not found */ @@ -158,7 +162,8 @@ class GroupMetadataManager(val brokerId: Int, } def storeGroup(group: GroupMetadata, - groupAssignment: Map[String, Array[Byte]]) { + groupAssignment: Map[String, Array[Byte]], + responseCallback: Short => Unit) { // construct the message to append val message = new Message( key = GroupMetadataManager.groupMetadataKey(group.groupId), @@ -208,12 +213,7 @@ class GroupMetadataManager(val brokerId: Int, } } - for (member <- group.allMembers) { - member.assignment = groupAssignment.getOrElse(member.memberId, Array.empty[Byte]) - } - - // propagate the assignments - propagateAssignment(group, responseCode) + responseCallback(responseCode) } // call replica manager to append the group message @@ -225,16 +225,7 @@ class GroupMetadataManager(val brokerId: Int, putCacheCallback) } - def propagateAssignment(group: GroupMetadata, - errorCode: Short) { - val hasError = errorCode != Errors.NONE.code - for (member <- group.allMembers) { - if (member.awaitingSyncCallback != null) { - member.awaitingSyncCallback(if (hasError) Array.empty else member.assignment, errorCode) - member.awaitingSyncCallback = null - } - } - } + /** * Store offsets by appending it to the replicated log and then inserting to cache @@ -657,10 +648,14 @@ object GroupMetadataManager { private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING), + new Field("client_id", STRING), + new Field("client_host", STRING), new Field("session_timeout", INT32), new Field("subscription", BYTES), new Field("assignment", BYTES)) private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id") + private val MEMBER_METADATA_CLIENT_ID_V0 = MEMBER_METADATA_V0.get("client_id") + private val MEMBER_METADATA_CLIENT_HOST_V0 = MEMBER_METADATA_V0.get("client_host") private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = MEMBER_METADATA_V0.get("session_timeout") private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription") private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment") @@ -787,10 +782,12 @@ object GroupMetadataManager { value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol) value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId) - val memberArray = groupMetadata.allMembers.map { + val memberArray = groupMetadata.allMemberMetadata.map { case memberMetadata => val memberStruct = value.instance(GROUP_METADATA_MEMBERS_V0) memberStruct.set(MEMBER_METADATA_MEMBER_ID_V0, memberMetadata.memberId) + memberStruct.set(MEMBER_METADATA_CLIENT_ID_V0, memberMetadata.clientId) + memberStruct.set(MEMBER_METADATA_CLIENT_HOST_V0, memberMetadata.clientHost) memberStruct.set(MEMBER_METADATA_SESSION_TIMEOUT_V0, memberMetadata.sessionTimeoutMs) val metadata = memberMetadata.metadata(groupMetadata.protocol) @@ -901,10 +898,13 @@ object GroupMetadataManager { case memberMetadataObj => val memberMetadata = memberMetadataObj.asInstanceOf[Struct] val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String] + val clientId = memberMetadata.get(MEMBER_METADATA_CLIENT_ID_V0).asInstanceOf[String] + val clientHost = memberMetadata.get(MEMBER_METADATA_CLIENT_HOST_V0).asInstanceOf[String] val sessionTimeout = memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int] val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer]) - val member = new MemberMetadata(memberId, groupId, sessionTimeout, List((group.protocol, subscription))) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout, + List((group.protocol, subscription))) member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer]) diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala index 6a762412796bb..80782c8108ee0 100644 --- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala @@ -23,6 +23,13 @@ import kafka.utils.nonthreadsafe import scala.collection.Map + +case class MemberSummary(memberId: String, + clientId: String, + clientHost: String, + metadata: Array[Byte], + assignment: Array[Byte]) + /** * Member metadata contains the following metadata: * @@ -46,15 +53,14 @@ import scala.collection.Map @nonthreadsafe private[coordinator] class MemberMetadata(val memberId: String, val groupId: String, + val clientId: String, + val clientHost: String, val sessionTimeoutMs: Int, var supportedProtocols: List[(String, Array[Byte])]) { - // NOTE: we need to add memory barrier to assignment and awaitingSyncCallback - // since they can be accessed in the append callback thread that does not - // hold on the group object lock - @volatile var assignment: Array[Byte] = null + var assignment: Array[Byte] = Array.empty[Byte] var awaitingJoinCallback: JoinGroupResult => Unit = null - @volatile var awaitingSyncCallback: (Array[Byte], Short) => Unit = null + var awaitingSyncCallback: (Array[Byte], Short) => Unit = null var latestHeartbeat: Long = -1 var isLeaving: Boolean = false @@ -87,6 +93,14 @@ private[coordinator] class MemberMetadata(val memberId: String, return true } + def summary(protocol: String): MemberSummary = { + MemberSummary(memberId, clientId, clientHost, metadata(protocol), assignment) + } + + def summaryNoMetadata(): MemberSummary = { + MemberSummary(memberId, clientId, clientHost, Array.empty[Byte], Array.empty[Byte]) + } + /** * Vote for one of the potential group protocols. This takes into account the protocol preference as * indicated by the order of supported protocols and returns the first one also contained in the set diff --git a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala similarity index 82% rename from core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala rename to core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala index b94aa01512afb..0e1475860e329 100644 --- a/core/src/main/scala/kafka/javaapi/GroupMetadataResponse.scala +++ b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala @@ -20,7 +20,7 @@ package kafka.javaapi import java.nio.ByteBuffer import kafka.cluster.BrokerEndPoint -class GroupMetadataResponse(private val underlying: kafka.api.GroupMetadataResponse) { +class GroupCoordinatorResponse(private val underlying: kafka.api.GroupCoordinatorResponse) { def errorCode = underlying.errorCode @@ -30,11 +30,11 @@ class GroupMetadataResponse(private val underlying: kafka.api.GroupMetadataRespo } override def equals(other: Any) = canEqual(other) && { - val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupMetadataResponse] + val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.GroupCoordinatorResponse] this.underlying.equals(otherConsumerMetadataResponse.underlying) } - def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupMetadataResponse] + def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.GroupCoordinatorResponse] override def hashCode = underlying.hashCode @@ -42,6 +42,6 @@ class GroupMetadataResponse(private val underlying: kafka.api.GroupMetadataRespo } -object GroupMetadataResponse { - def readFrom(buffer: ByteBuffer) = new GroupMetadataResponse(kafka.api.GroupMetadataResponse.readFrom(buffer)) +object GroupCoordinatorResponse { + def readFrom(buffer: ByteBuffer) = new GroupCoordinatorResponse(kafka.api.GroupCoordinatorResponse.readFrom(buffer)) } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 9fce77e5025d4..9ea407916c34e 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -65,7 +65,8 @@ object RequestChannel extends Logging { RequestKeys.deserializerForKey(requestId)(buffer) else null - // for client-side request / response format + // if we failed to find a server-side mapping, then try using the + // client-side request / response format val header: RequestHeader = if (requestObj == null) { buffer.rewind diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 35c5956559425..0a2e0b9dd7759 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -18,6 +18,7 @@ package kafka.server import java.nio.ByteBuffer +import java.util import kafka.admin.AdminUtils import kafka.api._ @@ -31,13 +32,12 @@ import kafka.network.RequestChannel.{Session, Response} import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write} import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse} +import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} +import org.apache.kafka.common.requests.{GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse} import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.Node import scala.collection._ - - /** * Logic to handle the various Kafka requests */ @@ -74,11 +74,13 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) - case RequestKeys.GroupMetadataKey => handleGroupMetadataRequest(request) + case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request) case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request) case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request) case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request) case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request) + case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request) + case RequestKeys.ListGroupsKey => handleListGroupsRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -676,34 +678,73 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } - /* - * Handle a consumer metadata request - */ - def handleGroupMetadataRequest(request: RequestChannel.Request) { - val groupMetadataRequest = request.requestObj.asInstanceOf[GroupMetadataRequest] + def handleGroupCoordinatorRequest(request: RequestChannel.Request) { + val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest] + val responseHeader = new ResponseHeader(request.header.correlationId) - if (!authorize(request.session, Read, new Resource(Group, groupMetadataRequest.group))) { - val response = GroupMetadataResponse(None, ErrorMapping.AuthorizationCode, groupMetadataRequest.correlationId) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response))) + if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) { + val responseBody = new GroupCoordinatorResponse(Errors.AUTHORIZATION_FAILED.code, Node.noNode) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } else { - val partition = coordinator.partitionFor(groupMetadataRequest.group) + val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId) // get metadata (and create the topic if necessary) val offsetsTopicMetadata = getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), request.securityProtocol).head + val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).flatMap { + partitionMetadata => partitionMetadata.leader + } - val errorResponse = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, groupMetadataRequest.correlationId) - - val response = - offsetsTopicMetadata.partitionsMetadata.find(_.partitionId == partition).map { partitionMetadata => - partitionMetadata.leader.map { leader => - GroupMetadataResponse(Some(leader), ErrorMapping.NoError, groupMetadataRequest.correlationId) - }.getOrElse(errorResponse) - }.getOrElse(errorResponse) + val responseBody = coordinatorEndpoint match { + case None => + new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode()) + case Some(endpoint) => + new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port)) + } trace("Sending consumer metadata %s for correlation id %d to client %s." - .format(response, groupMetadataRequest.correlationId, groupMetadataRequest.clientId)) - requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response))) + .format(responseBody, request.header.correlationId, request.header.clientId)) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) + } + } + + def handleDescribeGroupRequest(request: RequestChannel.Request) { + import JavaConverters._ + + val describeRequest = request.body.asInstanceOf[DescribeGroupsRequest] + val responseHeader = new ResponseHeader(request.header.correlationId) + + val groups = describeRequest.groupIds().asScala.map { + case groupId => + if (!authorize(request.session, Describe, new Resource(Group, groupId))) { + groupId -> DescribeGroupsResponse.GroupMetadata.forError(Errors.AUTHORIZATION_FAILED) + } else { + val (error, summary) = coordinator.handleDescribeGroup(groupId) + val members = summary.members.map { member => + val metadata = ByteBuffer.wrap(member.metadata) + val assignment = ByteBuffer.wrap(member.assignment) + new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment) + } + groupId -> new DescribeGroupsResponse.GroupMetadata(error.code, summary.state, summary.protocolType, + summary.protocol, members.asJava) + } + }.toMap + + val responseBody = new DescribeGroupsResponse(groups.asJava) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) + } + + def handleListGroupsRequest(request: RequestChannel.Request) { + import JavaConverters._ + + val responseHeader = new ResponseHeader(request.header.correlationId) + val responseBody = if (!authorize(request.session, Describe, Resource.ClusterResource)) { + ListGroupsResponse.fromError(Errors.AUTHORIZATION_FAILED) + } else { + val (error, groups) = coordinator.handleListGroups() + val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } + new ListGroupsResponse(error.code, allGroups.asJava) } + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } def handleJoinGroupRequest(request: RequestChannel.Request) { @@ -740,6 +781,7 @@ class KafkaApis(val requestChannel: RequestChannel, joinGroupRequest.groupId(), joinGroupRequest.memberId(), request.header.clientId(), + request.session.host, joinGroupRequest.sessionTimeout(), joinGroupRequest.protocolType(), protocols, diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala new file mode 100644 index 0000000000000..97b49dd9029e4 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.api + +import kafka.admin.AdminClient +import kafka.api.IntegrationTestHarness +import kafka.server.KafkaConfig +import kafka.utils.{TestUtils, Logging} +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.TopicPartition +import org.junit.{Before, Test} +import org.junit.Assert._ +import scala.collection.JavaConversions._ + +class AdminClientTest extends IntegrationTestHarness with Logging { + + val producerCount = 1 + val consumerCount = 2 + val serverCount = 3 + val groupId = "my-test" + val clientId = "consumer-498" + + val topic = "topic" + val part = 0 + val tp = new TopicPartition(topic, part) + val part2 = 1 + val tp2 = new TopicPartition(topic, part2) + + var client: AdminClient = null + + // configure the servers and clients + this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") + this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout + this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId) + this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId) + this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "100") + + @Before + override def setUp() { + super.setUp + client = AdminClient.createSimplePlaintext(this.brokerList) + TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers) + } + + @Test + def testListGroups() { + consumers(0).subscribe(List(topic)) + TestUtils.waitUntilTrue(() => { + consumers(0).poll(0) + !consumers(0).assignment().isEmpty + }, "Expected non-empty assignment") + + val groups = client.listAllGroupsFlattened + assertFalse(groups.isEmpty) + val group = groups(0) + assertEquals(groupId, group.groupId) + assertEquals("consumer", group.protocolType) + } + + @Test + def testDescribeGroup() { + consumers(0).subscribe(List(topic)) + TestUtils.waitUntilTrue(() => { + consumers(0).poll(0) + !consumers(0).assignment().isEmpty + }, "Expected non-empty assignment") + + val group= client.describeGroup(groupId) + assertEquals("consumer", group.protocolType) + assertEquals("range", group.protocol) + assertEquals("Stable", group.state) + assertFalse(group.members.isEmpty) + + val member = group.members(0) + assertEquals(clientId, member.clientId) + assertFalse(member.clientHost.isEmpty) + assertFalse(member.memberId.isEmpty) + } + + @Test + def testDescribeConsumerGroup() { + consumers(0).subscribe(List(topic)) + TestUtils.waitUntilTrue(() => { + consumers(0).poll(0) + !consumers(0).assignment().isEmpty + }, "Expected non-empty assignment") + + val assignment = client.describeConsumerGroup(groupId) + assertEquals(1, assignment.size) + for (partitions <- assignment.values) + assertEquals(Set(tp, tp2), partitions.toSet) + } + +} diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 3d484b84d3cf2..e363e27db1160 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -85,7 +85,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { RequestKeys.OffsetsKey -> classOf[ListOffsetResponse], RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse], RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse], - RequestKeys.GroupMetadataKey -> classOf[GroupMetadataResponse], + RequestKeys.GroupCoordinatorKey -> classOf[GroupCoordinatorResponse], RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse], RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse], RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse], @@ -103,7 +103,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2), RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), - RequestKeys.GroupMetadataKey -> ((resp: GroupMetadataResponse) => resp.errorCode()), + RequestKeys.GroupCoordinatorKey -> ((resp: GroupCoordinatorResponse) => resp.errorCode()), RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()), RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()), RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()), @@ -121,7 +121,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { RequestKeys.OffsetsKey -> TopicDescribeAcl, RequestKeys.OffsetCommitKey -> (TopicReadAcl ++ GroupReadAcl), RequestKeys.OffsetFetchKey -> (TopicReadAcl ++ GroupReadAcl), - RequestKeys.GroupMetadataKey -> (TopicReadAcl ++ GroupReadAcl), + RequestKeys.GroupCoordinatorKey -> (TopicReadAcl ++ GroupReadAcl), RequestKeys.UpdateMetadataKey -> ClusterAcl, RequestKeys.JoinGroupKey -> GroupReadAcl, RequestKeys.SyncGroupKey -> GroupReadAcl, @@ -174,7 +174,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0, 100)).asJava), RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava), RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava), - RequestKeys.GroupMetadataKey -> new GroupMetadataRequest(group), + RequestKeys.GroupCoordinatorKey -> new GroupCoordinatorRequest(group), RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue, Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava), diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index a77979aec87ad..86e68777c7187 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -135,8 +135,8 @@ object TestOffsetManager { val id = random.nextInt().abs % numGroups val group = "group-" + id try { - metadataChannel.send(GroupMetadataRequest(group)) - val coordinatorId = GroupMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1) + metadataChannel.send(GroupCoordinatorRequest(group)) + val coordinatorId = GroupCoordinatorResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1) val channel = if (channels.contains(coordinatorId)) channels(coordinatorId) diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index 09e9ce3fb00dc..90f629aa7c380 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -231,12 +231,12 @@ object SerializationTestUtils { )) } - def createConsumerMetadataRequest: GroupMetadataRequest = { - GroupMetadataRequest("group 1", clientId = "client 1") + def createConsumerMetadataRequest: GroupCoordinatorRequest = { + GroupCoordinatorRequest("group 1", clientId = "client 1") } - def createConsumerMetadataResponse: GroupMetadataResponse = { - GroupMetadataResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0) + def createConsumerMetadataResponse: GroupCoordinatorResponse = { + GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), ErrorMapping.NoError, 0) } def createUpdateMetadataRequest(versionId: Short): UpdateMetadataRequest = { @@ -276,7 +276,7 @@ class RequestResponseSerializationTest extends JUnitSuite { private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse - private val consumerMetadataResponseNoCoordinator = GroupMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0) + private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0) private val updateMetadataRequestV0 = SerializationTestUtils.createUpdateMetadataRequest(0) private val updateMetadataRequestV1 = SerializationTestUtils.createUpdateMetadataRequest(1) private val updateMetdataResponse = SerializationTestUtils.createUpdateMetadataResponse diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index 5e6bd036fad6d..c1278e4005f74 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -51,10 +51,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite { type LeaveGroupCallbackParams = Short type LeaveGroupCallback = Short => Unit + val ClientId = "consumer-test" + val ClientHost = "localhost" val ConsumerMinSessionTimeout = 10 val ConsumerMaxSessionTimeout = 1000 val DefaultSessionTimeout = 500 - var consumerCoordinator: GroupCoordinator = null + var groupCoordinator: GroupCoordinator = null var replicaManager: ReplicaManager = null var scheduler: KafkaScheduler = null var zkUtils: ZkUtils = null @@ -85,26 +87,25 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret) EasyMock.replay(zkUtils) - consumerCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime())) - consumerCoordinator.startup() + groupCoordinator = GroupCoordinator.create(KafkaConfig.fromProps(props), zkUtils, replicaManager, new MockScheduler(new MockTime())) + groupCoordinator.startup() // add the partition into the owned partition list - groupPartitionId = consumerCoordinator.partitionFor(groupId) - consumerCoordinator.groupManager.addPartitionOwnership(groupPartitionId) + groupPartitionId = groupCoordinator.partitionFor(groupId) + groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId) } @After def tearDown() { EasyMock.reset(replicaManager) - consumerCoordinator.shutdown() + groupCoordinator.shutdown() } @Test def testJoinGroupWrongCoordinator() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType, - protocols) + val joinGroupResult = joinGroup(otherGroupId, memberId, DefaultSessionTimeout, protocolType, protocols) val joinGroupErrorCode = joinGroupResult.errorCode assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.code, joinGroupErrorCode) } @@ -139,8 +140,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { val groupId = "" val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID - val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, - protocols) + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols) assertEquals(Errors.INVALID_GROUP_ID.code, joinGroupResult.errorCode) } @@ -164,8 +164,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { assertEquals(Errors.NONE.code, joinGroupResult.errorCode) EasyMock.reset(replicaManager) - val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat", - protocols) + val otherJoinGroupResult = joinGroup(groupId, otherMemberId, DefaultSessionTimeout, "copycat", protocols) assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code, otherJoinGroupResult.errorCode) } @@ -284,6 +283,27 @@ class GroupCoordinatorResponseTest extends JUnitSuite { assertEquals(Errors.NONE.code, heartbeatResult) } + @Test + def testSyncGroupEmptyAssignment() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map()) + val syncGroupErrorCode = syncGroupResult._2 + assertEquals(Errors.NONE.code, syncGroupErrorCode) + assertTrue(syncGroupResult._1.isEmpty) + + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.NONE.code, heartbeatResult) + } + @Test def testSyncGroupNotCoordinator() { val generation = 1 @@ -668,6 +688,92 @@ class GroupCoordinatorResponseTest extends JUnitSuite { assertEquals(Errors.NONE.code, leaveGroupResult) } + @Test + def testListGroupsIncludesStableGroups() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + assertEquals(Errors.NONE.code, joinGroupResult.errorCode) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) + val syncGroupErrorCode = syncGroupResult._2 + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + val (error, groups) = groupCoordinator.handleListGroups() + assertEquals(Errors.NONE, error) + assertEquals(1, groups.size) + assertEquals(GroupOverview("groupId", "consumer"), groups(0)) + } + + @Test + def testListGroupsIncludesRebalancingGroups() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols) + assertEquals(Errors.NONE.code, joinGroupResult.errorCode) + + val (error, groups) = groupCoordinator.handleListGroups() + assertEquals(Errors.NONE, error) + assertEquals(1, groups.size) + assertEquals(GroupOverview("groupId", "consumer"), groups(0)) + } + + @Test + def testDescribeGroupWrongCoordinator() { + EasyMock.reset(replicaManager) + val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId) + assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error) + } + + @Test + def testDescribeGroupInactiveGroup() { + EasyMock.reset(replicaManager) + val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + assertEquals(Errors.NONE, error) + assertEquals(GroupCoordinator.DeadGroup, summary) + } + + @Test + def testDescribeGroupStable() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) + val syncGroupErrorCode = syncGroupResult._2 + assertEquals(Errors.NONE.code, syncGroupErrorCode) + + EasyMock.reset(replicaManager) + val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + assertEquals(Errors.NONE, error) + assertEquals(protocolType, summary.protocolType) + assertEquals("range", summary.protocol) + assertEquals(List(assignedMemberId), summary.members.map(_.memberId)) + } + + @Test + def testDescribeGroupRebalancing() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols) + val joinGroupErrorCode = joinGroupResult.errorCode + assertEquals(Errors.NONE.code, joinGroupErrorCode) + + EasyMock.reset(replicaManager) + val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + assertEquals(Errors.NONE, error) + assertEquals(protocolType, summary.protocolType) + assertEquals(GroupCoordinator.NoProtocol, summary.protocol) + assertEquals(AwaitingSync.toString, summary.state) + assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId)) + assertTrue(summary.members.forall(_.metadata.isEmpty)) + assertTrue(summary.members.forall(_.assignment.isEmpty)) + } + private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = { val responsePromise = Promise[JoinGroupResult] val responseFuture = responsePromise.future @@ -706,7 +812,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.replay(replicaManager) - consumerCoordinator.handleJoinGroup(groupId, memberId, "clientId", sessionTimeout, protocolType, protocols, responseCallback) + groupCoordinator.handleJoinGroup(groupId, memberId, "clientId", "clientHost", sessionTimeout, + protocolType, protocols, responseCallback) responseFuture } @@ -731,7 +838,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { )}) EasyMock.replay(replicaManager) - consumerCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback) + groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback) responseFuture } @@ -742,7 +849,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.replay(replicaManager) - consumerCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback) + groupCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback) responseFuture } @@ -779,7 +886,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.replay(replicaManager) - consumerCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback) + groupCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback) Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) } @@ -807,7 +914,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { )}) EasyMock.replay(replicaManager) - consumerCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback) + groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback) Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) } @@ -817,7 +924,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None) EasyMock.replay(replicaManager) - consumerCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback) + groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback) Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) } + } diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala index 021aea6af17eb..2846622ae705e 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala @@ -146,18 +146,19 @@ class GroupMetadataTest extends JUnitSuite { @Test def testSelectProtocol() { val groupId = "groupId" - + val clientId = "clientId" + val clientHost = "clientHost" val sessionTimeoutMs = 10000 val memberId = "memberId" - val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) group.add(memberId, member) assertEquals("range", group.selectProtocol) val otherMemberId = "otherMemberId" - val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs, + val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte]))) group.add(otherMemberId, otherMember) @@ -165,7 +166,7 @@ class GroupMetadataTest extends JUnitSuite { assertTrue(Set("range", "roundrobin")(group.selectProtocol)) val lastMemberId = "lastMemberId" - val lastMember = new MemberMetadata(lastMemberId, groupId, sessionTimeoutMs, + val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, sessionTimeoutMs, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte]))) group.add(lastMemberId, lastMember) @@ -182,15 +183,16 @@ class GroupMetadataTest extends JUnitSuite { @Test def testSelectProtocolChoosesCompatibleProtocol() { val groupId = "groupId" - + val clientId = "clientId" + val clientHost = "clientHost" val sessionTimeoutMs = 10000 val memberId = "memberId" - val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) val otherMemberId = "otherMemberId" - val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs, + val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte]))) group.add(memberId, member) @@ -201,14 +203,15 @@ class GroupMetadataTest extends JUnitSuite { @Test def testSupportsProtocols() { val groupId = "groupId" - + val clientId = "clientId" + val clientHost = "clientHost" val sessionTimeoutMs = 10000 // by default, the group supports everything assertTrue(group.supportsProtocols(Set("roundrobin", "range"))) val memberId = "memberId" - val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) group.add(memberId, member) @@ -217,7 +220,7 @@ class GroupMetadataTest extends JUnitSuite { assertFalse(group.supportsProtocols(Set("foo", "bar"))) val otherMemberId = "otherMemberId" - val otherMember = new MemberMetadata(otherMemberId, groupId, sessionTimeoutMs, + val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte]))) group.add(otherMemberId, otherMember) diff --git a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala index 0a5bb3cd917a2..88eb9aefd2cf3 100644 --- a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala @@ -23,15 +23,18 @@ import org.junit.Test import org.scalatest.junit.JUnitSuite class MemberMetadataTest extends JUnitSuite { + val groupId = "groupId" + val clientId = "clientId" + val clientHost = "clientHost" + val memberId = "memberId" + val sessionTimeoutMs = 10000 + @Test def testMatchesSupportedProtocols { - val groupId = "groupId" - val memberId = "memberId" - val sessionTimeoutMs = 10000 val protocols = List(("range", Array.empty[Byte])) - val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols) assertTrue(member.matches(protocols)) assertFalse(member.matches(List(("range", Array[Byte](0))))) assertFalse(member.matches(List(("roundrobin", Array.empty[Byte])))) @@ -40,48 +43,36 @@ class MemberMetadataTest extends JUnitSuite { @Test def testVoteForPreferredProtocol { - val groupId = "groupId" - val memberId = "memberId" - val sessionTimeoutMs = 10000 val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])) - val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols) assertEquals("range", member.vote(Set("range", "roundrobin"))) assertEquals("roundrobin", member.vote(Set("blah", "roundrobin"))) } @Test def testMetadata { - val groupId = "groupId" - val memberId = "memberId" - val sessionTimeoutMs = 10000 val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1))) - val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols) assertTrue(util.Arrays.equals(Array[Byte](0), member.metadata("range"))) assertTrue(util.Arrays.equals(Array[Byte](1), member.metadata("roundrobin"))) } @Test(expected = classOf[IllegalArgumentException]) def testMetadataRaisesOnUnsupportedProtocol { - val groupId = "groupId" - val memberId = "memberId" - val sessionTimeoutMs = 10000 val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])) - val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols) member.metadata("blah") fail() } @Test(expected = classOf[IllegalArgumentException]) def testVoteRaisesOnNoSupportedProtocols { - val groupId = "groupId" - val memberId = "memberId" - val sessionTimeoutMs = 10000 val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])) - val member = new MemberMetadata(memberId, groupId, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols) member.vote(Set("blah")) fail() } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 4e5e776786b7d..31f743bd43dac 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.api.{GroupMetadataRequest, OffsetCommitRequest, OffsetFetchRequest} +import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest} import kafka.consumer.SimpleConsumer import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition} import kafka.utils._ @@ -56,7 +56,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness { time = new MockTime() server = TestUtils.createServer(KafkaConfig.fromProps(config), time) simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client") - val consumerMetadataRequest = GroupMetadataRequest(group) + val consumerMetadataRequest = GroupCoordinatorRequest(group) Stream.continually { val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest) consumerMetadataResponse.coordinatorOpt.isDefined