Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

public class ConsumerConfigTest {

private final Deserializer keyDeserializer = new ByteArrayDeserializer();
private final Deserializer valueDeserializer = new StringDeserializer();
private final Deserializer<byte[]> keyDeserializer = new ByteArrayDeserializer();
private final Deserializer<String> valueDeserializer = new StringDeserializer();
private final String keyDeserializerClassName = keyDeserializer.getClass().getName();
private final String valueDeserializerClassName = valueDeserializer.getClass().getName();
private final Object keyDeserializerClass = keyDeserializer.getClass();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
Expand Down Expand Up @@ -252,10 +251,6 @@ private KafkaMetric getMetric(final String name) {
return metrics.metrics().get(metrics.metricName(name, "consumer" + groupId + "-coordinator-metrics"));
}

private Sensor getSensor(final String name) {
return metrics.sensor(name);
}

@Test
public void testSelectRebalanceProtcol() {
List<ConsumerPartitionAssignor> assignors = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;

@SuppressWarnings("deprecation")
public class FetcherTest {
private static final double EPSILON = 0.0001;

Expand Down Expand Up @@ -2015,7 +2014,7 @@ public void testQuotaMetrics() {
ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
client.send(request, time.milliseconds());
client.poll(1, time.milliseconds());
FetchResponse response = fullFetchResponse(tp0, nextRecords, Errors.NONE, i, throttleTimeMs);
FetchResponse<MemoryRecords> response = fullFetchResponse(tp0, nextRecords, Errors.NONE, i, throttleTimeMs);
buffer = response.serialize(ApiKeys.FETCH,
ApiKeys.FETCH.latestVersion(),
request.correlationId());
Expand Down Expand Up @@ -3032,7 +3031,7 @@ public void testConsumingViaIncrementalFetchRequests() {
2, 0L, null, this.records));
partitions1.put(tp1, new FetchResponse.PartitionData<>(Errors.NONE, 100L,
FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, emptyRecords));
FetchResponse resp1 = new FetchResponse<>(Errors.NONE, partitions1, 0, 123);
FetchResponse<MemoryRecords> resp1 = new FetchResponse<>(Errors.NONE, partitions1, 0, 123);
client.prepareResponse(resp1);
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
Expand All @@ -3058,7 +3057,7 @@ public void testConsumingViaIncrementalFetchRequests() {

// The second response contains no new records.
LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions2 = new LinkedHashMap<>();
FetchResponse resp2 = new FetchResponse<>(Errors.NONE, partitions2, 0, 123);
FetchResponse<MemoryRecords> resp2 = new FetchResponse<>(Errors.NONE, partitions2, 0, 123);
client.prepareResponse(resp2);
assertEquals(1, fetcher.sendFetches());
consumerClient.poll(time.timer(0));
Expand All @@ -3071,7 +3070,7 @@ public void testConsumingViaIncrementalFetchRequests() {
LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions3 = new LinkedHashMap<>();
partitions3.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, 100L,
4, 0L, null, this.nextRecords));
FetchResponse resp3 = new FetchResponse<>(Errors.NONE, partitions3, 0, 123);
FetchResponse<MemoryRecords> resp3 = new FetchResponse<>(Errors.NONE, partitions3, 0, 123);
client.prepareResponse(resp3);
assertEquals(1, fetcher.sendFetches());
consumerClient.poll(time.timer(0));
Expand Down Expand Up @@ -3130,7 +3129,7 @@ public Builder newBuilder() {
}

@Override
public boolean handleResponse(FetchResponse response) {
public boolean handleResponse(FetchResponse<?> response) {
verifySessionPartitions();
return handler.handleResponse(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public void shouldThrowKafkaExceptionOnListWithNonAssignorClassType() {
}

@Test
@SuppressWarnings("deprecation")
public void testOnAssignment() {
OldPartitionAssignor oldAssignor = new OldPartitionAssignor();
ConsumerPartitionAssignor adaptedAssignor = new PartitionAssignorAdapter(oldAssignor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public void testOverwriteAcksAndRetriesForIdempotentProducers() {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new StringSerializer().getClass().getName());
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

ProducerConfig config = new ProducerConfig(props);
assertTrue(config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
Expand Down Expand Up @@ -456,7 +456,7 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat

// Four request updates where the topic isn't present, at which point the timeout expires and a
// TimeoutException is thrown
Future future = producer.send(record);
Future<RecordMetadata> future = producer.send(record);
verify(metadata, times(4)).requestUpdateForTopic(topic);
verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
verify(metadata, times(5)).fetch();
Expand Down Expand Up @@ -532,7 +532,7 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat

// Four request updates where the requested partition is out of range, at which point the timeout expires
// and a TimeoutException is thrown
Future future = producer.send(record);
Future<RecordMetadata> future = producer.send(record);
verify(metadata, times(4)).requestUpdateForTopic(topic);
verify(metadata, times(4)).awaitUpdate(anyInt(), anyLong());
verify(metadata, times(5)).fetch();
Expand Down Expand Up @@ -643,9 +643,7 @@ public void testHeaders() {
private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
@SuppressWarnings("unchecked") // it is safe to suppress, since this is a mock class
Serializer<String> keySerializer = mock(serializerClassToMock);
@SuppressWarnings("unchecked")
Serializer<String> valueSerializer = mock(serializerClassToMock);

long nowMs = Time.SYSTEM.milliseconds();
Expand Down Expand Up @@ -688,7 +686,7 @@ private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerCla
public void closeShouldBeIdempotent() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
Producer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
producer.close();
producer.close();
}
Expand All @@ -697,12 +695,12 @@ public void closeShouldBeIdempotent() {
public void testMetricConfigRecordingLevel() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
assertEquals(Sensor.RecordingLevel.INFO, producer.metrics.config().recordLevel());
}

props.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public void testAutoConfigResolutionWithInvalidConfigProviderClass() {
"org.apache.kafka.common.config.provider.InvalidConfigProvider");
props.put("testKey", "${test:/foo/bar/testpath:testKey}");
try {
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
new TestIndirectConfigResolution(props);
fail("Expected a config exception due to invalid props :" + props);
} catch (KafkaException e) {
// this is good
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public void testBooleanFrequencies() {
}

@Test
@SuppressWarnings("deprecation")
public void testUseWithMetrics() {
MetricName name1 = name("1");
MetricName name2 = name("2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ public void testEndpointIdentificationDisabled() throws Exception {
sslServerConfigs = getTrustingConfig(serverCertStores, clientCertStores);
sslClientConfigs = getTrustingConfig(clientCertStores, serverCertStores);

SecurityProtocol securityProtocol = SecurityProtocol.SSL;
server = createEchoServer(SecurityProtocol.SSL);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void testUniqueErrorCodes() {

@Test
public void testUniqueExceptions() {
Set<Class> exceptionSet = new HashSet<>();
Set<Class<?>> exceptionSet = new HashSet<>();
for (Errors error : Errors.values()) {
if (error != Errors.NONE)
exceptionSet.add(error.exception().getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public void testFetchResponseV4() {
6, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), emptyList(), records));

FetchResponse<MemoryRecords> response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
FetchResponse<MemoryRecords> deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
assertEquals(responseData, deserialized.responseData());
}

Expand All @@ -656,7 +656,7 @@ public void verifyFetchResponseFullWrites() throws Exception {
}
}

private void verifyFetchResponseFullWrite(short apiVersion, FetchResponse fetchResponse) throws Exception {
private void verifyFetchResponseFullWrite(short apiVersion, FetchResponse<MemoryRecords> fetchResponse) throws Exception {
int correlationId = 15;

short responseHeaderVersion = FETCH.responseHeaderVersion(apiVersion);
Expand Down Expand Up @@ -1038,14 +1038,6 @@ private SyncGroupRequest createSyncGroupRequest(int version) {
.setProtocolName("range") // Added in v5 but ignorable
.setAssignments(assignments);

JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
Collections.singleton(
new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName("consumer-range")
.setMetadata(new byte[0])).iterator()
);

// v3 and above could set group instance id
if (version >= 3)
data.setGroupInstanceId("groupInstanceId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -152,8 +151,6 @@ private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, Trans
String mechanism, ChannelMetadataRegistry metadataRegistry) throws IOException {
TestJaasConfig jaasConfig = new TestJaasConfig();
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
Map<String, JaasContext> jaasContexts = Collections.singletonMap(mechanism,
new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig, null));
Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
Map<String, AuthenticateCallbackHandler> callbackHandlers = Collections.singletonMap(
mechanism, new SaslServerCallbackHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,6 @@ private SslEngineBuilder.SecurityStore sslKeyStore(Map<String, Object> sslConfig
);
}

private SslEngineBuilder.SecurityStore sslTrustStore(Map<String, Object> sslConfig) {
return new SslEngineBuilder.SecurityStore(
(String) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
(String) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
(Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG),
null
);
}

private TestSslUtils.SslConfigsBuilder sslConfigsBuilder(Mode mode) {
return new TestSslUtils.SslConfigsBuilder(mode).tlsProtocol(tlsProtocol);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public void testMurmur2() {
cases.put("lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8".getBytes(), -58897971);
cases.put(new byte[]{'a', 'b', 'c'}, 479470107);

for (Map.Entry c : cases.entrySet()) {
assertEquals((int) c.getValue(), murmur2((byte[]) c.getKey()));
for (Map.Entry<byte[], Integer> c : cases.entrySet()) {
assertEquals(c.getValue().intValue(), murmur2(c.getKey()));
}
}

Expand Down
16 changes: 8 additions & 8 deletions clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ public static File tempDirectory(final Path parent, String prefix) {
}

public static Properties producerConfig(final String bootstrapServers,
final Class keySerializer,
final Class valueSerializer,
final Class<?> keySerializer,
final Class<?> valueSerializer,
final Properties additional) {
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Expand All @@ -289,14 +289,14 @@ public static Properties producerConfig(final String bootstrapServers,
return properties;
}

public static Properties producerConfig(final String bootstrapServers, final Class keySerializer, final Class valueSerializer) {
public static Properties producerConfig(final String bootstrapServers, final Class<?> keySerializer, final Class<?> valueSerializer) {
return producerConfig(bootstrapServers, keySerializer, valueSerializer, new Properties());
}

public static Properties consumerConfig(final String bootstrapServers,
final String groupId,
final Class keyDeserializer,
final Class valueDeserializer,
final Class<?> keyDeserializer,
final Class<?> valueDeserializer,
final Properties additional) {

final Properties consumerConfig = new Properties();
Expand All @@ -311,8 +311,8 @@ public static Properties consumerConfig(final String bootstrapServers,

public static Properties consumerConfig(final String bootstrapServers,
final String groupId,
final Class keyDeserializer,
final Class valueDeserializer) {
final Class<?> keyDeserializer,
final Class<?> valueDeserializer) {
return consumerConfig(bootstrapServers,
groupId,
keyDeserializer,
Expand All @@ -323,7 +323,7 @@ public static Properties consumerConfig(final String bootstrapServers,
/**
* returns consumer config with random UUID for the Group ID
*/
public static Properties consumerConfig(final String bootstrapServers, final Class keyDeserializer, final Class valueDeserializer) {
public static Properties consumerConfig(final String bootstrapServers, final Class<?> keyDeserializer, final Class<?> valueDeserializer) {
return consumerConfig(bootstrapServers,
UUID.randomUUID().toString(),
keyDeserializer,
Expand Down