Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ subprojects {

sourceCompatibility = 1.7

compileJava {
options.encoding = 'UTF-8'
// Add unchecked once we drop support for Java 7 as @SuppressWarnings("unchecked") is too buggy in Java 7
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean "remove unchecked"?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant adding unchecked like so:

options.compilerArgs << "-Xlint:deprecation,unchecked"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

options.compilerArgs << "-Xlint:deprecation"
}

if (JavaVersion.current().isJava8Compatible()) {
tasks.withType(Javadoc) {
// disable the crazy super-strict doclint tool in Java 8
Expand Down Expand Up @@ -220,6 +226,18 @@ subprojects {
tasks.withType(ScalaCompile) {
scalaCompileOptions.useAnt = false

scalaCompileOptions.additionalParameters = [
"-deprecation",
"-unchecked",
"-encoding", "utf8",
"-target:jvm-${sourceCompatibility}".toString(),
"-Xlog-reflective-calls",
"-feature",
"-language:postfixOps",
"-language:implicitConversions",
"-language:existentials"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use existentials?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that, I found it.

]

configure(scalaCompileOptions.forkOptions) {
memoryMaximumSize = '1g'
jvmArgs = ['-XX:MaxPermSize=512m', '-Xss2m']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
* partition returned by a {@link Consumer#poll(long)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {

@SuppressWarnings("unchecked")
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.EMPTY_MAP);

private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final OffsetCommitCallback defaultOffsetCommitCallback;
private final boolean autoCommitEnabled;
private final AutoCommitTask autoCommitTask;
private final ConsumerInterceptors interceptors;
private final ConsumerInterceptors<?, ?> interceptors;

/**
* Initialize the coordination manager.
Expand All @@ -87,7 +87,7 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
OffsetCommitCallback defaultOffsetCommitCallback,
boolean autoCommitEnabled,
long autoCommitIntervalMs,
ConsumerInterceptors interceptors) {
ConsumerInterceptors<?, ?> interceptors) {
super(client,
groupId,
sessionTimeoutMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class RequestFuture<T> {
private boolean isDone = false;
private T value;
private RuntimeException exception;
private List<RequestFutureListener<T>> listeners = new ArrayList<RequestFutureListener<T>>();
private List<RequestFutureListener<T>> listeners = new ArrayList<>();


/**
Expand Down Expand Up @@ -129,12 +129,12 @@ public void raise(Errors error) {
}

private void fireSuccess() {
for (RequestFutureListener listener: listeners)
for (RequestFutureListener<T> listener : listeners)
listener.onSuccess(value);
}

private void fireFailure() {
for (RequestFutureListener listener: listeners)
for (RequestFutureListener<T> listener : listeners)
listener.onFailure(exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
keySerializer, valueSerializer);
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "deprecation"})
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
log.trace("Starting the Kafka producer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void configure(Map<String, ?> configs) throws KafkaException {
defaultRealm = "";
}

@SuppressWarnings("unchecked")
List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES);
if (principalToLocalRules != null)
kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public class LoginManager {

private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap(LoginType.class);
private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap<>(LoginType.class);

private final Login login;
private final String serviceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ public void configure(Map<String, ?> configs) throws KafkaException {
this.protocol = (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG);
this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG);


@SuppressWarnings("unchecked")
List<String> cipherSuitesList = (List<String>) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
if (cipherSuitesList != null)
this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);

@SuppressWarnings("unchecked")
List<String> enabledProtocolsList = (List<String>) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
if (enabledProtocolsList != null)
this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ public static ByteBuffer ensureCapacity(ByteBuffer existingBuffer, int newLength
* @param <T> the type of element
* @return Set
*/
@SafeVarargs
public static <T> Set<T> mkSet(T... elems) {
return new HashSet<>(Arrays.asList(elems));
}
Expand All @@ -588,6 +589,7 @@ public static <T> Set<T> mkSet(T... elems) {
* @param <T> the type of element
* @return List
*/
@SafeVarargs
public static <T> List<T> mkList(T... elems) {
return Arrays.asList(elems);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ private AbstractRequestResponse createLeaderAndIsrResponse() {
return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
}

@SuppressWarnings("deprecation")
private AbstractRequest createUpdateMetadataRequest(int version) {
Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = Arrays.asList(1, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,15 @@ public byte[] getBytes(String fieldName) {
/**
* Equivalent to calling {@link #get(String)} and casting the result to a List.
*/
@SuppressWarnings("unchecked")
public <T> List<T> getArray(String fieldName) {
return (List<T>) getCheckType(fieldName, Schema.Type.ARRAY);
}

/**
* Equivalent to calling {@link #get(String)} and casting the result to a Map.
*/
@SuppressWarnings("unchecked")
public <K, V> Map<K, V> getMap(String fieldName) {
return (Map<K, V>) getCheckType(fieldName, Schema.Type.MAP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ private ObjectNode asJsonSchema(Schema schema) {
ObjectNode jsonSchemaParams = JsonNodeFactory.instance.objectNode();
for (Map.Entry<String, String> prop : schema.parameters().entrySet())
jsonSchemaParams.put(prop.getKey(), prop.getValue());
jsonSchema.put(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams);
jsonSchema.set(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams);
}
if (schema.defaultValue() != null)
jsonSchema.set(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, convertToJson(schema, schema.defaultValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public boolean isSinkConnector(String connName) {
return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass());
}

@SuppressWarnings("unchecked")
private Class<? extends Connector> getConnectorClass(String connectorAlias) {
// Avoid the classpath scan if the full class name was provided
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<Stri
return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
}

@SuppressWarnings("unchecked")
private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new Callback<ConsumerRecord<String, byte[]>>() {
@Override
public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ private ConnectorStatus parseConnectorStatus(String connector, byte[] data) {
return null;
}

@SuppressWarnings("unchecked")
Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
String trace = (String) statusMap.get(TRACE_KEY_NAME);
Expand All @@ -319,6 +320,7 @@ private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) {
log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
return null;
}
@SuppressWarnings("unchecked")
Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
String trace = (String) statusMap.get(TRACE_KEY_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public <T> Map<String, Object> offset(Map<String, T> partition) {
}

@Override
@SuppressWarnings("unchecked")
public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> partitions) {
// Serialize keys so backing store can work with them
Map<ByteBuffer, Map<String, T>> serializedToOriginal = new HashMap<>(partitions.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;

public class OffsetUtils {
@SuppressWarnings("unchecked")
public static void validateFormat(Object offsetData) {
if (offsetData == null)
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
return capturedRecords;
}

@SuppressWarnings("unchecked")
private IExpectationSetters<Object> expectOnePoll() {
// Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
// returning empty data, we return one record. The expectation is that the data will be ignored by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ public List<SourceRecord> answer() throws Throwable {
return latch;
}

@SuppressWarnings("unchecked")
private void expectSendRecordSyncFailure(Throwable error) throws InterruptedException {
expectConvertKeyValue(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
@RunWith(PowerMockRunner.class)
@PrepareForTest(RestServer.class)
@PowerMockIgnore("javax.management.*")
@SuppressWarnings("unchecked")
public class ConnectorsResourceTest {
// Note trailing / and that we do *not* use LEADER_URL to construct our reference values. This checks that we handle
// URL construction properly, avoiding //, which will mess up routing in the REST server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static org.junit.Assert.fail;

@RunWith(PowerMockRunner.class)
@SuppressWarnings("unchecked")
public class StandaloneHerderTest {
private static final String CONNECTOR_NAME = "test";
private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
@RunWith(PowerMockRunner.class)
@PrepareForTest(KafkaConfigStorage.class)
@PowerMockIgnore("javax.management.*")
@SuppressWarnings("unchecked")
public class KafkaConfigStorageTest {
private static final String TOPIC = "connect-configs";
private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
@RunWith(PowerMockRunner.class)
@PrepareForTest(KafkaOffsetBackingStore.class)
@PowerMockIgnore("javax.management.*")
@SuppressWarnings("unchecked")
public class KafkaOffsetBackingStoreTest {
private static final String TOPIC = "connect-offsets";
private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals;

@SuppressWarnings("unchecked")
public class KafkaStatusBackingStoreTest extends EasyMockSupport {

private static final String STATUS_TOPIC = "status-topic";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public ByteArrayProducerRecordEquals(ProducerRecord<byte[], byte[]> record) {
}

@Override
@SuppressWarnings("unchecked")
public boolean matches(Object argument) {
if (!(argument instanceof ProducerRecord))
return false;
Expand Down
1 change: 0 additions & 1 deletion core/src/test/scala/integration/kafka/api/QuotasTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class QuotasTest extends KafkaServerTestHarness {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.ACKS_CONFIG, "0")
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")
producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString)
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/kafka/tools/TestLogCleaning.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ object TestLogCleaning {
dups: Int,
percentDeletes: Int): File = {
val producerProps = new Properties
producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
producerProps.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
Expand Down Expand Up @@ -316,4 +316,4 @@ case class TestRecord(val topic: String, val key: Int, val value: Long, val dele
def this(line: String) = this(line.split("\t"))
override def toString() = topic + "\t" + key + "\t" + value + "\t" + (if(delete) "d" else "u")
def topicAndKey = topic + key
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {

@Test
def testProcessNotification() {
@volatile var notification: String = null
@volatile var invocationCount = 0
val notificationHandler = new NotificationHandler {
@volatile var notification: String = _
@volatile var invocationCount: Integer = 0
override def processNotification(notificationMessage: String): Unit = {
notification = notificationMessage
invocationCount += 1
Expand All @@ -48,14 +48,14 @@ class ZkNodeChangeNotificationListenerTest extends KafkaServerTestHarness {

zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage1)

TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 1 && notificationHandler.notification == notificationMessage1, "failed to send/process notification message in the timeout period.")
TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == notificationMessage1, "failed to send/process notification message in the timeout period.")

/*There is no easy way to test that purging. Even if we mock kafka time with MockTime, the purging compares kafka time with the time stored in zookeeper stat and the
embeded zookeeper server does not provide a way to mock time. so to test purging we will have to use SystemTime.sleep(changeExpirationMs + 1) issue a write and check
Assert.assertEquals(1, ZkUtils.getChildren(zkClient, seqNodeRoot).size) however even after that the assertion can fail as the second node it self can be deleted
depending on how threads get scheduled.*/

zkUtils.createSequentialPersistentPath(seqNodePath, notificationMessage2)
TestUtils.waitUntilTrue(() => notificationHandler.invocationCount == 2 && notificationHandler.notification == notificationMessage2, "failed to send/process notification message in the timeout period.")
TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == notificationMessage2, "failed to send/process notification message in the timeout period.")
}
}