Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ public class CommonClientConfigs {
*/

public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form "
public static final String BOOTSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form "
+ "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to "
+ "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
+ "servers (you may want more than one, though, in case a server is down).";
/**
* @deprecated This will be removed in a future release. Please use {@link #BOOTSTRAP_SERVERS_DOC}
*/
@Deprecated
public static final String BOOSTRAP_SERVERS_DOC = BOOTSTRAP_SERVERS_DOC;

public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
Expand Down
18 changes: 11 additions & 7 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@
*/
package org.apache.kafka.clients;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* A class encapsulating some of the logic around metadata.
* <p>
Expand Down Expand Up @@ -254,7 +255,10 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) {
unauthorizedTopics.retainAll(this.topics);

for (String topic : this.topics) {
partitionInfos.addAll(cluster.partitionsForTopic(topic));
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
if (partitionInfoList != null) {
partitionInfos.addAll(cluster.partitionsForTopic(topic));
}
}
nodes = cluster.nodes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public class ConsumerConfig extends AbstractConfig {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Importance.HIGH,
CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
.define(SESSION_TIMEOUT_MS_CONFIG,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,7 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callbac
}

/**
* Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,10 @@ public class ProducerConfig extends AbstractConfig {
@Deprecated
public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. "
+ "By default this setting is false and the producer will no longer throw a BufferExhaustException but instead will use the {@link #MAX_BLOCK_MS_CONFIG} "
+ "value to block, after which it will throw a TimeoutException. Setting this property to true will set the <code>" + MAX_BLOCK_MS_CONFIG + "</code> to Long.MAX_VALUE."
+ "By default this setting is false and the producer will no longer throw a BufferExhaustException but instead will use the <code>" + MAX_BLOCK_MS_CONFIG + "</code> "
+ "value to block, after which it will throw a TimeoutException. Setting this property to true will set the <code>" + MAX_BLOCK_MS_CONFIG + "</code> to Long.MAX_VALUE. "
+ "<em>Also if this property is set to true, parameter <code>" + METADATA_FETCH_TIMEOUT_CONFIG + "</code> is not longer honored.</em>"
+ "<p>"
+ "This parameter is deprecated and will be removed in a future release. "
+ "<p>This parameter is deprecated and will be removed in a future release. "
+ "Parameter <code>" + MAX_BLOCK_MS_CONFIG + "</code> should be used instead.";

/** <code>buffer.memory</code> */
Expand Down Expand Up @@ -218,7 +217,7 @@ public class ProducerConfig extends AbstractConfig {
+ "received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";

static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public final class RecordAccumulator {
private final Set<TopicPartition> muted;
private int drainIndex;


/**
* Create a new record accumulator
*
Expand Down Expand Up @@ -104,11 +103,11 @@ public RecordAccumulator(int batchSize,
this.compression = compression;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
this.batches = new CopyOnWriteMap<>();
String metricGrpName = "producer-metrics";
this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
this.incomplete = new IncompleteRecordBatches();
this.muted = new HashSet<TopicPartition>();
this.muted = new HashSet<>();
this.time = time;
registerMetrics(metrics, metricGrpName);
}
Expand Down Expand Up @@ -171,12 +170,9 @@ public RecordAppendResult append(TopicPartition tp,
synchronized (dq) {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null)
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}

// we don't have an in-progress record batch try to allocate a new batch
Expand All @@ -187,14 +183,12 @@ public RecordAppendResult append(TopicPartition tp,
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordBatch last = dq.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
}

RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
free.deallocate(buffer);
return appendResult;
}
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
Expand All @@ -209,12 +203,28 @@ public RecordAppendResult append(TopicPartition tp,
}
}

/**
* If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
* resources (like compression streams buffers).
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
RecordBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
if (future == null)
last.records.close();
else
return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
}
return null;
}

/**
* Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout
* due to metadata being unavailable
*/
public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
List<RecordBatch> expiredBatches = new ArrayList<RecordBatch>();
List<RecordBatch> expiredBatches = new ArrayList<>();
int count = 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> dq = entry.getValue();
Expand Down Expand Up @@ -245,7 +255,7 @@ public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
}
}
}
if (expiredBatches.size() > 0)
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", count);

return expiredBatches;
Expand Down Expand Up @@ -287,7 +297,7 @@ public void reenqueue(RecordBatch batch, long now) {
* </ol>
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<Node>();
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
boolean unknownLeadersExist = false;

Expand Down Expand Up @@ -333,7 +343,7 @@ public boolean hasUnsent() {
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> deque = entry.getValue();
synchronized (deque) {
if (deque.size() > 0)
if (!deque.isEmpty())
return true;
}
}
Expand All @@ -357,11 +367,11 @@ public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
if (nodes.isEmpty())
return Collections.emptyMap();

Map<Integer, List<RecordBatch>> batches = new HashMap<Integer, List<RecordBatch>>();
Map<Integer, List<RecordBatch>> batches = new HashMap<>();
for (Node node : nodes) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<RecordBatch> ready = new ArrayList<RecordBatch>();
List<RecordBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();
do {
Expand Down Expand Up @@ -436,6 +446,11 @@ public void deallocate(RecordBatch batch) {
boolean flushInProgress() {
return flushesInProgress.get() > 0;
}

/* Visible for testing */
Map<TopicPartition, Deque<RecordBatch>> batches() {
return Collections.unmodifiableMap(batches);
}

/**
* Initiate the flushing of data from the accumulator...this makes all requests immediately ready
Expand Down Expand Up @@ -569,7 +584,7 @@ public void remove(RecordBatch batch) {

public Iterable<RecordBatch> all() {
synchronized (incomplete) {
return new ArrayList<RecordBatch>(this.incomplete);
return new ArrayList<>(this.incomplete);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ public String toString() {
return builder.toString();
}

/** Visible for testing */
public boolean isWritable() {
return writable;
}

public static class RecordsIterator extends AbstractIterator<LogEntry> {
private final ByteBuffer buffer;
private final DataInputStream stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -75,14 +76,27 @@ public void teardown() {
@Test
public void testFull() throws Exception {
long now = time.milliseconds();
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10L, 100L, metrics, time);
int appends = 1024 / msgSize;
int batchSize = 1024;
RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * batchSize, CompressionType.NONE, 10L, 100L, metrics, time);
int appends = batchSize / msgSize;
for (int i = 0; i < appends; i++) {
// append to the first batch
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
assertEquals(1, partitionBatches.size());
assertTrue(partitionBatches.peekFirst().records.isWritable());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
}

// this append doesn't fit in the first batch, so a new batch is created and the first batch is closed
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
Deque<RecordBatch> partitionBatches = accum.batches().get(tp1);
assertEquals(2, partitionBatches.size());
Iterator<RecordBatch> partitionBatchesIterator = partitionBatches.iterator();
assertFalse(partitionBatchesIterator.next().records.isWritable());
assertTrue(partitionBatchesIterator.next().records.isWritable());
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);

List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ public List<SourceRecord> poll() throws InterruptedException {
records = new ArrayList<>();
records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
}
new ArrayList<SourceRecord>();
} while (line != null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public void configure(Map<String, ?> configs, boolean isKey) {

Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_CONFIG);
if (cacheSizeVal != null)
cacheSize = (int) cacheSizeVal;
cacheSize = Integer.parseInt((String) cacheSizeVal);
fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, ObjectNode>(cacheSize));
toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, Schema>(cacheSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -36,10 +37,13 @@
import org.junit.Test;
import org.powermock.reflect.Whitebox;

import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Calendar;
Expand Down Expand Up @@ -607,6 +611,19 @@ public void testCacheSchemaToJsonConversion() {
assertEquals(2, cache.size());
}

@Test
public void testJsonSchemaCacheSizeFromConfigFile() throws URISyntaxException, IOException {
URL url = getClass().getResource("/connect-test.properties");
File propFile = new File(url.toURI());
String workerPropsFile = propFile.getAbsolutePath();
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();

JsonConverter rc = new JsonConverter();
rc.configure(workerProps, false);

}


private JsonNode parse(byte[] json) {
try {
Expand Down
17 changes: 17 additions & 0 deletions connect/json/src/test/resources/connect-test.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

schemas.cache.size=1

Loading