From b341f473b41bd96a8521532c96d9c996f03a501a Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 10 Mar 2016 11:56:47 +0000 Subject: [PATCH 1/7] Fix and suppress number of unchecked warnings Ignored Kafka Streams on this iteration. --- .../org/apache/kafka/clients/consumer/ConsumerRecords.java | 2 ++ .../clients/consumer/internals/ConsumerCoordinator.java | 4 ++-- .../kafka/clients/consumer/internals/RequestFuture.java | 6 +++--- .../org/apache/kafka/common/network/SaslChannelBuilder.java | 1 + .../apache/kafka/common/security/kerberos/LoginManager.java | 2 +- .../org/apache/kafka/common/security/ssl/SslFactory.java | 3 ++- .../src/main/java/org/apache/kafka/connect/data/Struct.java | 2 ++ .../main/java/org/apache/kafka/connect/runtime/Worker.java | 1 + .../apache/kafka/connect/storage/KafkaConfigStorage.java | 1 + .../kafka/connect/storage/KafkaStatusBackingStore.java | 2 ++ .../kafka/connect/storage/OffsetStorageReaderImpl.java | 1 + .../java/org/apache/kafka/connect/storage/OffsetUtils.java | 1 + .../kafka/connect/runtime/WorkerSinkTaskThreadedTest.java | 1 + .../apache/kafka/connect/runtime/WorkerSourceTaskTest.java | 1 + .../runtime/rest/resources/ConnectorsResourceTest.java | 1 + .../connect/runtime/standalone/StandaloneHerderTest.java | 1 + .../kafka/connect/storage/KafkaConfigStorageTest.java | 1 + .../kafka/connect/storage/KafkaOffsetBackingStoreTest.java | 1 + .../kafka/connect/storage/KafkaStatusBackingStoreTest.java | 1 + .../kafka/connect/util/ByteArrayProducerRecordEquals.java | 1 + 20 files changed, 27 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 8ee9be28dc742..3d7ec60438dce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -28,6 +28,8 @@ * partition returned by a {@link Consumer#poll(long)} operation. */ public class ConsumerRecords implements Iterable> { + + @SuppressWarnings("unchecked") public static final ConsumerRecords EMPTY = new ConsumerRecords<>(Collections.EMPTY_MAP); private final Map>> records; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index b6b46c135a579..2ae1437336af3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -68,7 +68,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final OffsetCommitCallback defaultOffsetCommitCallback; private final boolean autoCommitEnabled; private final AutoCommitTask autoCommitTask; - private final ConsumerInterceptors interceptors; + private final ConsumerInterceptors interceptors; /** * Initialize the coordination manager. @@ -87,7 +87,7 @@ public ConsumerCoordinator(ConsumerNetworkClient client, OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled, long autoCommitIntervalMs, - ConsumerInterceptors interceptors) { + ConsumerInterceptors interceptors) { super(client, groupId, sessionTimeoutMs, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java index 7be99bd2d7097..71c16faf95275 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java @@ -42,7 +42,7 @@ public class RequestFuture { private boolean isDone = false; private T value; private RuntimeException exception; - private List> listeners = new ArrayList>(); + private List> listeners = new ArrayList<>(); /** @@ -129,12 +129,12 @@ public void raise(Errors error) { } private void fireSuccess() { - for (RequestFutureListener listener: listeners) + for (RequestFutureListener listener : listeners) listener.onSuccess(value); } private void fireFailure() { - for (RequestFutureListener listener: listeners) + for (RequestFutureListener listener : listeners) listener.onFailure(exception); } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index b3db4e1bd9a32..0cd5bfe9005de 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -61,6 +61,7 @@ public void configure(Map configs) throws KafkaException { defaultRealm = ""; } + @SuppressWarnings("unchecked") List principalToLocalRules = (List) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES); if (principalToLocalRules != null) kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules); diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java index cf68d2058c0be..e163ba8cf70c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java @@ -31,7 +31,7 @@ public class LoginManager { - private static final EnumMap CACHED_INSTANCES = new EnumMap(LoginType.class); + private static final EnumMap CACHED_INSTANCES = new EnumMap<>(LoginType.class); private final Login login; private final String serviceName; diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index 0d4d2ce362d19..d0fe2e8694fce 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -69,11 +69,12 @@ public void configure(Map configs) throws KafkaException { this.protocol = (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG); this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG); - + @SuppressWarnings("unchecked") List cipherSuitesList = (List) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG); if (cipherSuitesList != null) this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]); + @SuppressWarnings("unchecked") List enabledProtocolsList = (List) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG); if (enabledProtocolsList != null) this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]); diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java index 4ca37c3366cd3..a598259128c94 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -172,6 +172,7 @@ public byte[] getBytes(String fieldName) { /** * Equivalent to calling {@link #get(String)} and casting the result to a List. */ + @SuppressWarnings("unchecked") public List getArray(String fieldName) { return (List) getCheckType(fieldName, Schema.Type.ARRAY); } @@ -179,6 +180,7 @@ public List getArray(String fieldName) { /** * Equivalent to calling {@link #get(String)} and casting the result to a Map. */ + @SuppressWarnings("unchecked") public Map getMap(String fieldName) { return (Map) getCheckType(fieldName, Schema.Type.MAP); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 4c0d016e1eab9..aa574935d31af 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -189,6 +189,7 @@ public boolean isSinkConnector(String connName) { return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass()); } + @SuppressWarnings("unchecked") private Class getConnectorClass(String connectorAlias) { // Avoid the classpath scan if the full class name was provided try { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java index 7f2fb830ac5b3..08c528c30c53f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java @@ -375,6 +375,7 @@ private KafkaBasedLog createKafkaBasedLog(String topic, Map(topic, producerProps, consumerProps, consumedCallback, new SystemTime()); } + @SuppressWarnings("unchecked") private final Callback> consumedCallback = new Callback>() { @Override public void onCompletion(Throwable error, ConsumerRecord record) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index eb9a48c4b1a18..d24645e4b6e8e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -300,6 +300,7 @@ private ConnectorStatus parseConnectorStatus(String connector, byte[] data) { return null; } + @SuppressWarnings("unchecked") Map statusMap = (Map) schemaAndValue.value(); TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME)); String trace = (String) statusMap.get(TRACE_KEY_NAME); @@ -319,6 +320,7 @@ private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) { log.error("Invalid connector status type {}", schemaAndValue.value().getClass()); return null; } + @SuppressWarnings("unchecked") Map statusMap = (Map) schemaAndValue.value(); TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME)); String trace = (String) statusMap.get(TRACE_KEY_NAME); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java index 23c1019aba553..b404de2205956 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java @@ -55,6 +55,7 @@ public Map offset(Map partition) { } @Override + @SuppressWarnings("unchecked") public Map, Map> offsets(Collection> partitions) { // Serialize keys so backing store can work with them Map> serializedToOriginal = new HashMap<>(partitions.size()); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java index f31715a88d63a..b457b128653b6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java @@ -24,6 +24,7 @@ import java.util.Map; public class OffsetUtils { + @SuppressWarnings("unchecked") public static void validateFormat(Object offsetData) { if (offsetData == null) return; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index ac10d595248d6..1099d7a2740b9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -535,6 +535,7 @@ public ConsumerRecords answer() throws Throwable { return capturedRecords; } + @SuppressWarnings("unchecked") private IExpectationSetters expectOnePoll() { // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of // returning empty data, we return one record. The expectation is that the data will be ignored by the diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 404be0ba68ac1..9b0133a6fb14b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -365,6 +365,7 @@ public List answer() throws Throwable { return latch; } + @SuppressWarnings("unchecked") private void expectSendRecordSyncFailure(Throwable error) throws InterruptedException { expectConvertKeyValue(false); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 1feab0dbbf63e..4659ae8667611 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -56,6 +56,7 @@ @RunWith(PowerMockRunner.class) @PrepareForTest(RestServer.class) @PowerMockIgnore("javax.management.*") +@SuppressWarnings("unchecked") public class ConnectorsResourceTest { // Note trailing / and that we do *not* use LEADER_URL to construct our reference values. This checks that we handle // URL construction properly, avoiding //, which will mess up routing in the REST server diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 07d0e3dcb6697..3959ff8f767d2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -63,6 +63,7 @@ import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) +@SuppressWarnings("unchecked") public class StandaloneHerderTest { private static final String CONNECTOR_NAME = "test"; private static final List TOPICS_LIST = Arrays.asList("topic1", "topic2"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java index f95704c223e7d..5e79a8d3d1b95 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java @@ -62,6 +62,7 @@ @RunWith(PowerMockRunner.class) @PrepareForTest(KafkaConfigStorage.class) @PowerMockIgnore("javax.management.*") +@SuppressWarnings("unchecked") public class KafkaConfigStorageTest { private static final String TOPIC = "connect-configs"; private static final Map DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index aa929421283d5..38e0f7b1b9bc7 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -59,6 +59,7 @@ @RunWith(PowerMockRunner.class) @PrepareForTest(KafkaOffsetBackingStore.class) @PowerMockIgnore("javax.management.*") +@SuppressWarnings("unchecked") public class KafkaOffsetBackingStoreTest { private static final String TOPIC = "connect-offsets"; private static final Map DEFAULT_PROPS = new HashMap<>(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java index 8acd31f7d63f5..45ccdd50e9f49 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java @@ -45,6 +45,7 @@ import static org.easymock.EasyMock.newCapture; import static org.junit.Assert.assertEquals; +@SuppressWarnings("unchecked") public class KafkaStatusBackingStoreTest extends EasyMockSupport { private static final String STATUS_TOPIC = "status-topic"; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java index 4d17ac40da3c0..bcfcc23db6716 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java @@ -36,6 +36,7 @@ public ByteArrayProducerRecordEquals(ProducerRecord record) { } @Override + @SuppressWarnings("unchecked") public boolean matches(Object argument) { if (!(argument instanceof ProducerRecord)) return false; From ba7fd01ef53b52b5881849441a5b9de1a73655f6 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 10 Mar 2016 11:57:18 +0000 Subject: [PATCH 2/7] Add `@SafeVarargs` annotation to fix warning --- clients/src/main/java/org/apache/kafka/common/utils/Utils.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index daef458d8bfd5..4c4225bdcf89a 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -578,6 +578,7 @@ public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength * @param the type of element * @return Set */ + @SafeVarargs public static Set mkSet(T... elems) { return new HashSet<>(Arrays.asList(elems)); } @@ -588,6 +589,7 @@ public static Set mkSet(T... elems) { * @param the type of element * @return List */ + @SafeVarargs public static List mkList(T... elems) { return Arrays.asList(elems); } From 73fd870ff9eaae1ada05d1a50b9dbfa9d44cffd6 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 10 Mar 2016 11:58:02 +0000 Subject: [PATCH 3/7] Suppress unfixable deprecation warnings --- .../java/org/apache/kafka/clients/producer/KafkaProducer.java | 2 +- .../org/apache/kafka/common/requests/RequestResponseTest.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 85ba9efd3c33e..c87973ad1a2db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -202,7 +202,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali keySerializer, valueSerializer); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) { try { log.trace("Starting the Kafka producer"); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 30238378f7b3f..7ccf07980ae24 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -370,6 +370,7 @@ private AbstractRequestResponse createLeaderAndIsrResponse() { return new LeaderAndIsrResponse(Errors.NONE.code(), responses); } + @SuppressWarnings("deprecation") private AbstractRequest createUpdateMetadataRequest(int version) { Map partitionStates = new HashMap<>(); List isr = Arrays.asList(1, 2); From f84ff311017c49e681945b2b294889f49bfa5f89 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 10 Mar 2016 11:58:25 +0000 Subject: [PATCH 4/7] Replace deprecated by non-deprecated usage --- .../java/org/apache/kafka/connect/json/JsonConverter.java | 2 +- core/src/test/scala/integration/kafka/api/QuotasTest.scala | 1 - core/src/test/scala/kafka/tools/TestLogCleaning.scala | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index a70caddfa6b40..d9a685953d2d5 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -425,7 +425,7 @@ private ObjectNode asJsonSchema(Schema schema) { ObjectNode jsonSchemaParams = JsonNodeFactory.instance.objectNode(); for (Map.Entry prop : schema.parameters().entrySet()) jsonSchemaParams.put(prop.getKey(), prop.getValue()); - jsonSchema.put(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams); + jsonSchema.set(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams); } if (schema.defaultValue() != null) jsonSchema.set(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, convertToJson(schema, schema.defaultValue())); diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index 23be1208af10b..b6a0ae5a6d6fa 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -71,7 +71,6 @@ class QuotasTest extends KafkaServerTestHarness { val producerProps = new Properties() producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, "0") - producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString) producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala index dcbfbe1c1fc14..2e288ecd3c033 100755 --- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala @@ -247,7 +247,7 @@ object TestLogCleaning { dups: Int, percentDeletes: Int): File = { val producerProps = new Properties - producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") + producerProps.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString) producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") @@ -316,4 +316,4 @@ case class TestRecord(val topic: String, val key: Int, val value: Long, val dele def this(line: String) = this(line.split("\t")) override def toString() = topic + "\t" + key + "\t" + value + "\t" + (if(delete) "d" else "u") def topicAndKey = topic + key -} \ No newline at end of file +} From 827bcf9cda15eeb6dda4e5c3976f94f8221e2ebb Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 10 Mar 2016 11:58:57 +0000 Subject: [PATCH 5/7] Avoid reflective calls via structural types in Scala --- .../common/ZkNodeChangeNotificationListenerTest.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala index 50496f0adf655..8d48609bafec0 100644 --- a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala +++ b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala @@ -27,9 +27,9 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness { @Test def testProcessNotification() { + @volatile var notification: String = null + @volatile var invocationCount = 0 val notificationHandler = new NotificationHandler { - @volatile var notification: String = _ - @volatile var invocationCount: Integer = 0 override def processNotification(notificationMessage: String): Unit = { notification = notificationMessage invocationCount += 1 @@ -48,7 +48,7 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness { zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage1) - TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 1 && notificationHandler.notification == notificationMessage1, "failed to send/process notification message in the timeout period.") + TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1, "failed to send/process notification message in the timeout period.") /*There is no easy way to test that purging. Even if we mock kafka time with MockTime, the purging compares kafka time with the time stored in zookeeper stat and the embeded zookeeper server does not provide a way to mock time. so to test purging we will have to use SystemTime.sleep(changeExpirationMs + 1) issue a write and check @@ -56,6 +56,6 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness { depending on how threads get scheduled.*/ zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage2) - TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 2 && notificationHandler.notification == notificationMessage2, "failed to send/process notification message in the timeout period.") + TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2, "failed to send/process notification message in the timeout period.") } } From 90d332204887e30055fa43bed14e99fa7af5cd11 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 10 Mar 2016 11:59:57 +0000 Subject: [PATCH 6/7] Tweak compiler settings for scalac and javac --- build.gradle | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/build.gradle b/build.gradle index c2bd2288504ab..6676ab2fb76d1 100644 --- a/build.gradle +++ b/build.gradle @@ -103,6 +103,12 @@ subprojects { sourceCompatibility = 1.7 + compileJava { + options.encoding = 'UTF-8' + // Add unchecked once we drop support for Java 7 as @SuppressWarnings("unchecked") is too buggy in Java 7 + options.compilerArgs << "-Xlint:deprecation" + } + if (JavaVersion.current().isJava8Compatible()) { tasks.withType(Javadoc) { // disable the crazy super-strict doclint tool in Java 8 @@ -220,6 +226,18 @@ subprojects { tasks.withType(ScalaCompile) { scalaCompileOptions.useAnt = false + scalaCompileOptions.additionalParameters = [ + "-deprecation", + "-unchecked", + "-encoding", "utf8", + "-target:jvm-1.7", + "-Xlog-reflective-calls", + "-feature", + "-language:postfixOps", + "-language:implicitConversions", + "-language:existentials" + ] + configure(scalaCompileOptions.forkOptions) { memoryMaximumSize = '1g' jvmArgs = ['-XX:MaxPermSize=512m', '-Xss2m'] From 225782a425820d3fac41c147a61983be01e4cdbd Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 11 Mar 2016 22:08:03 +0000 Subject: [PATCH 7/7] Use `sourceCompatibility` instead of hardcoding `1.7` --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 6676ab2fb76d1..321fc3f9b8d30 100644 --- a/build.gradle +++ b/build.gradle @@ -230,7 +230,7 @@ subprojects { "-deprecation", "-unchecked", "-encoding", "utf8", - "-target:jvm-1.7", + "-target:jvm-${sourceCompatibility}".toString(), "-Xlog-reflective-calls", "-feature", "-language:postfixOps",