Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";

/** {@code default timestamp.extractor} */
/** {@code default.timestamp.extractor} */
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";

/** {@code default value.serde} */
/** {@code default.value.serde} */
public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -39,76 +39,123 @@
public class AssignmentInfo {

private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
/**
* A new field was added, partitionsByHost. CURRENT_VERSION
* is required so we can decode the previous version. For example, this may occur
* during a rolling upgrade
*/
private static final int CURRENT_VERSION = 2;
public final int version;
public final List<TaskId> activeTasks; // each element corresponds to a partition
public final Map<TaskId, Set<TopicPartition>> standbyTasks;
public final Map<HostInfo, Set<TopicPartition>> partitionsByHost;

public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
Map<HostInfo, Set<TopicPartition>> hostState) {
this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
public static final int LATEST_SUPPORTED_VERSION = 2;

private final int usedVersion;
private List<TaskId> activeTasks;
private Map<TaskId, Set<TopicPartition>> standbyTasks;
private Map<HostInfo, Set<TopicPartition>> partitionsByHost;

private AssignmentInfo(final int version) {
this.usedVersion = version;
}

public AssignmentInfo(final List<TaskId> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks,
final Map<HostInfo, Set<TopicPartition>> hostState) {
this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
}

protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
Map<HostInfo, Set<TopicPartition>> hostState) {
this.version = version;
public AssignmentInfo(final int version,
final List<TaskId> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks,
final Map<HostInfo, Set<TopicPartition>> hostState) {
this.usedVersion = version;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
this.partitionsByHost = hostState;
}

public int version() {
return usedVersion;
}

public List<TaskId> activeTasks() {
return activeTasks;
}

public Map<TaskId, Set<TopicPartition>> standbyTasks() {
return standbyTasks;
}

public Map<HostInfo, Set<TopicPartition>> partitionsByHost() {
return partitionsByHost;
}

/**
* @throws TaskAssignmentException if method fails to encode the data, e.g., if there is an
* IO exception during encoding
*/
public ByteBuffer encode() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);

try {
// Encode version
out.writeInt(version);
// Encode active tasks
out.writeInt(activeTasks.size());
for (TaskId id : activeTasks) {
id.writeTo(out);
}
// Encode standby tasks
out.writeInt(standbyTasks.size());
for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
TaskId id = entry.getKey();
id.writeTo(out);

Set<TopicPartition> partitions = entry.getValue();
writeTopicPartitions(out, partitions);
}
out.writeInt(partitionsByHost.size());
for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost
.entrySet()) {
final HostInfo hostInfo = entry.getKey();
out.writeUTF(hostInfo.host());
out.writeInt(hostInfo.port());
writeTopicPartitions(out, entry.getValue());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();

try (final DataOutputStream out = new DataOutputStream(baos)) {
switch (usedVersion) {
case 1:
encodeVersionOne(out);
break;
case 2:
encodeVersionTwo(out);
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + LATEST_SUPPORTED_VERSION);
}

out.flush();
out.close();

return ByteBuffer.wrap(baos.toByteArray());
} catch (IOException ex) {
} catch (final IOException ex) {
throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
}
}

private void writeTopicPartitions(DataOutputStream out, Set<TopicPartition> partitions) throws IOException {
private void encodeVersionOne(final DataOutputStream out) throws IOException {
out.writeInt(1); // version
encodeActiveAndStandbyTaskAssignment(out);
}

private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out) throws IOException {
// encode active tasks
out.writeInt(activeTasks.size());
for (final TaskId id : activeTasks) {
id.writeTo(out);
}

// encode standby tasks
out.writeInt(standbyTasks.size());
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
final TaskId id = entry.getKey();
id.writeTo(out);

final Set<TopicPartition> partitions = entry.getValue();
writeTopicPartitions(out, partitions);
}
}

private void encodeVersionTwo(final DataOutputStream out) throws IOException {
out.writeInt(2); // version
encodeActiveAndStandbyTaskAssignment(out);
encodePartitionsByHost(out);
}

private void encodePartitionsByHost(final DataOutputStream out) throws IOException {
// encode partitions by host
out.writeInt(partitionsByHost.size());
for (final Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost.entrySet()) {
final HostInfo hostInfo = entry.getKey();
out.writeUTF(hostInfo.host());
out.writeInt(hostInfo.port());
writeTopicPartitions(out, entry.getValue());
}
}

private void writeTopicPartitions(final DataOutputStream out,
final Set<TopicPartition> partitions) throws IOException {
out.writeInt(partitions.size());
for (TopicPartition partition : partitions) {
for (final TopicPartition partition : partitions) {
out.writeUTF(partition.topic());
out.writeInt(partition.partition());
}
Expand All @@ -117,52 +164,69 @@ private void writeTopicPartitions(DataOutputStream out, Set<TopicPartition> part
/**
* @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
*/
public static AssignmentInfo decode(ByteBuffer data) {
public static AssignmentInfo decode(final ByteBuffer data) {
// ensure we are at the beginning of the ByteBuffer
data.rewind();

try (DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
// Decode version
int version = in.readInt();
if (version < 0 || version > CURRENT_VERSION) {
TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version);
log.error(ex.getMessage(), ex);
throw ex;
}
try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
// decode used version
final int usedVersion = in.readInt();
final AssignmentInfo assignmentInfo = new AssignmentInfo(usedVersion);

// Decode active tasks
int count = in.readInt();
List<TaskId> activeTasks = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
activeTasks.add(TaskId.readFrom(in));
}
// Decode standby tasks
count = in.readInt();
Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
for (int i = 0; i < count; i++) {
TaskId id = TaskId.readFrom(in);
standbyTasks.put(id, readTopicPartitions(in));
switch (usedVersion) {
case 1:
decodeVersionOneData(assignmentInfo, in);
break;
case 2:
decodeVersionTwoData(assignmentInfo, in);
break;
default:
TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
"used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}

Map<HostInfo, Set<TopicPartition>> hostStateToTopicPartitions = new HashMap<>();
if (version == CURRENT_VERSION) {
int numEntries = in.readInt();
for (int i = 0; i < numEntries; i++) {
HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
hostStateToTopicPartitions.put(hostInfo, readTopicPartitions(in));
}
}
return assignmentInfo;
} catch (final IOException ex) {
throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
}
}

private static void decodeVersionOneData(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
// decode active tasks
int count = in.readInt();
assignmentInfo.activeTasks = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
assignmentInfo.activeTasks.add(TaskId.readFrom(in));
}

return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
// decode standby tasks
count = in.readInt();
assignmentInfo.standbyTasks = new HashMap<>(count);
for (int i = 0; i < count; i++) {
TaskId id = TaskId.readFrom(in);
assignmentInfo.standbyTasks.put(id, readTopicPartitions(in));
}
}

} catch (IOException ex) {
throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
decodeVersionOneData(assignmentInfo, in);

// decode partitions by host
assignmentInfo.partitionsByHost = new HashMap<>();
final int numEntries = in.readInt();
for (int i = 0; i < numEntries; i++) {
final HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
assignmentInfo.partitionsByHost.put(hostInfo, readTopicPartitions(in));
}
}

private static Set<TopicPartition> readTopicPartitions(DataInputStream in) throws IOException {
int numPartitions = in.readInt();
Set<TopicPartition> partitions = new HashSet<>(numPartitions);
private static Set<TopicPartition> readTopicPartitions(final DataInputStream in) throws IOException {
final int numPartitions = in.readInt();
final Set<TopicPartition> partitions = new HashSet<>(numPartitions);
for (int j = 0; j < numPartitions; j++) {
partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
}
Expand All @@ -171,14 +235,14 @@ private static Set<TopicPartition> readTopicPartitions(DataInputStream in) throw

@Override
public int hashCode() {
return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
}

@Override
public boolean equals(Object o) {
public boolean equals(final Object o) {
if (o instanceof AssignmentInfo) {
AssignmentInfo other = (AssignmentInfo) o;
return this.version == other.version &&
final AssignmentInfo other = (AssignmentInfo) o;
return this.usedVersion == other.usedVersion &&
this.activeTasks.equals(other.activeTasks) &&
this.standbyTasks.equals(other.standbyTasks) &&
this.partitionsByHost.equals(other.partitionsByHost);
Expand All @@ -189,7 +253,7 @@ public boolean equals(Object o) {

@Override
public String toString() {
return "[version=" + version + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
return "[version=" + usedVersion + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
}

}
Loading