Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ jmh-benchmarks/src/main/generated
raft/.jqwik-database
**/src/generated
**/src/generated-test

storage/kafka-tiered-storage/
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@ project(':core') {

implementation project(':server-common')
implementation project(':metadata')
implementation project(':storage:api')
implementation project(':raft')
implementation project(':storage')

Expand Down Expand Up @@ -852,6 +853,8 @@ project(':core') {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':metadata').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
testImplementation project(':storage:api').sourceSets.test.output

testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation libs.easymock
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class OffsetMovedToTieredStorageException extends ApiException {

private static final long serialVersionUID = 1L;

public OffsetMovedToTieredStorageException(String message) {
super(message);
}

public OffsetMovedToTieredStorageException(String message, Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OffsetMovedToTieredStorageException;
import org.apache.kafka.common.errors.OperationNotAttemptedException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
Expand Down Expand Up @@ -364,7 +365,8 @@ public enum Errors {
INCONSISTENT_TOPIC_ID(103, "The log's topic ID did not match the topic ID in the request", InconsistentTopicIdException::new),
INCONSISTENT_CLUSTER_ID(104, "The clusterId in the request does not match that found on the server", InconsistentClusterIdException::new),
TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new),
FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new);
FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new),
OFFSET_MOVED_TO_TIERED_STORAGE(107, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.record;

import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.Utils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
import static org.apache.kafka.common.record.Records.SIZE_OFFSET;

public class RemoteLogInputStream implements LogInputStream<RecordBatch> {
private final InputStream is;
private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);

public RemoteLogInputStream(InputStream is) {
this.is = is;
}

@Override
public RecordBatch nextBatch() throws IOException {
logHeaderBuffer.rewind();
Utils.readFully(is, logHeaderBuffer);

if (logHeaderBuffer.position() < HEADER_SIZE_UP_TO_MAGIC)
return null;

logHeaderBuffer.rewind();
logHeaderBuffer.getLong(OFFSET_OFFSET);
int size = logHeaderBuffer.getInt(SIZE_OFFSET);

// V0 has the smallest overhead, stricter checking is done later
if (size < LegacyRecord.RECORD_OVERHEAD_V0)
throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " +
"overhead (%d).", size, LegacyRecord.RECORD_OVERHEAD_V0));

byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
ByteBuffer buffer = ByteBuffer.allocate(size + LOG_OVERHEAD);
buffer.put(logHeaderBuffer);
buffer.position(logHeaderBuffer.limit());

Utils.readFully(is, buffer);
if (buffer.position() != size + LOG_OVERHEAD)
return null;
buffer.rewind();

MutableRecordBatch batch;
if (magic > RecordBatch.MAGIC_VALUE_V1)
batch = new DefaultRecordBatch(buffer);
else
batch = new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(buffer);

return batch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public class ListOffsetsRequest extends AbstractRequest {
public static final long LATEST_TIMESTAMP = -1L;
public static final long MAX_TIMESTAMP = -3L;

/**
* It is used to represent the earliest message stored in the local log which is also called the local-log-start-offset
*/
public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;

public static final int CONSUMER_REPLICA_ID = -1;
public static final int DEBUGGING_REPLICA_ID = -2;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;

import org.apache.kafka.common.KafkaException;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Locale;
import java.util.NoSuchElementException;

/**
* A class loader that looks for classes and resources in a specified class path first, before delegating to its parent
* class loader.
*/
public class ChildFirstClassLoader extends URLClassLoader {
static {
ClassLoader.registerAsParallelCapable();
}

/**
* @param classPath Class path string
* @param parent The parent classloader. If the required class / resource cannot be found in the given classPath,
* this classloader will be used to find the class / resource.
*/
public ChildFirstClassLoader(String classPath, ClassLoader parent) {
super(classpathToURLs(classPath), parent);
}

static private URL[] classpathToURLs(String classPath) {
ArrayList<URL> urls = new ArrayList<>();
for (String path : classPath.split(File.pathSeparator)) {
if (path == null || path.trim().isEmpty())
continue;
File file = new File(path);

try {
if (path.endsWith("/*")) {
File parent = new File(new File(file.getCanonicalPath()).getParent());
if (parent.isDirectory()) {
File[] files = parent.listFiles((dir, name) -> {
String lower = name.toLowerCase(Locale.ROOT);
return lower.endsWith(".jar") || lower.endsWith(".zip");
});
if (files != null) {
for (File jarFile : files) {
urls.add(jarFile.getCanonicalFile().toURI().toURL());
}
}
}
} else if (file.exists()) {
urls.add(file.getCanonicalFile().toURI().toURL());
}
} catch (IOException e) {
throw new KafkaException(e);
}
}
return urls.toArray(new URL[0]);
}

@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
synchronized (getClassLoadingLock(name)) {
Class<?> c = findLoadedClass(name);

if (c == null) {
try {
c = findClass(name);
} catch (ClassNotFoundException e) {
// Try parent
c = super.loadClass(name, false);
}
}

if (resolve)
resolveClass(c);

return c;
}
}

@Override
public URL getResource(String name) {
URL url = findResource(name);
if (url == null) {
// try parent
url = super.getResource(name);
}
return url;
}

@Override
public Enumeration<URL> getResources(String name) throws IOException {
Enumeration<URL> urls1 = findResources(name);
Enumeration<URL> urls2 = getParent() != null ? getParent().getResources(name) : null;

return new Enumeration<URL>() {
@Override
public boolean hasMoreElements() {
return (urls1 != null && urls1.hasMoreElements()) || (urls2 != null && urls2.hasMoreElements());
}

@Override
public URL nextElement() {
if (urls1 != null && urls1.hasMoreElements())
return urls1.nextElement();
if (urls2 != null && urls2.hasMoreElements())
return urls2.nextElement();
throw new NoSuchElementException();
}
};
}
}
4 changes: 3 additions & 1 deletion clients/src/main/resources/common/message/FetchRequest.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
// the `LastFetchedEpoch` field
//
// Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
"validVersions": "0-13",
//
// Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405)
"validVersions": "0-14",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
Expand Down
4 changes: 3 additions & 1 deletion clients/src/main/resources/common/message/FetchResponse.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
// and leader discovery through the `CurrentLeader` field
//
// Version 13 replaces the topic name field with topic ID (KIP-516).
"validVersions": "0-13",
//
// Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405)
"validVersions": "0-14",
"flexibleVersions": "12+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
// Version 6 enables flexible versions.
//
// Version 7 enables listing offsets by max timestamp (KIP-734).
"validVersions": "0-7",
//
// Version 8 enables listing offsets by local log start offset (KIP-405).
"validVersions": "0-8",
"flexibleVersions": "6+",
"fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
// Version 6 enables flexible versions.
//
// Version 7 is the same as version 6 (KIP-734).
"validVersions": "0-7",
//
// Version 8 enables listing offsets by local timestamp.
// This is the ealiest log start offset i the local log. (KIP-405).
"validVersions": "0-8",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.server.builders;

import kafka.log.LogManager;
import kafka.log.remote.RemoteLogManager;
import kafka.server.AlterIsrManager;
import kafka.server.BrokerTopicStats;
import kafka.server.DelayedDeleteRecords;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class ReplicaManagerBuilder {
private AlterIsrManager alterIsrManager = null;
private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private Optional<RemoteLogManager> remoteLogManager = Optional.empty();
private Optional<KafkaZkClient> zkClient = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedProduce>> delayedProducePurgatory = Optional.empty();
private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty();
Expand Down Expand Up @@ -85,6 +87,11 @@ public ReplicaManagerBuilder setLogManager(LogManager logManager) {
return this;
}

public ReplicaManagerBuilder setRemoteLogManager(RemoteLogManager remoteLogManager) {
this.remoteLogManager = Optional.ofNullable(remoteLogManager);
return this;
}

public ReplicaManagerBuilder setQuotaManagers(QuotaManagers quotaManagers) {
this.quotaManagers = quotaManagers;
return this;
Expand Down Expand Up @@ -157,6 +164,7 @@ public ReplicaManager build() {
time,
scheduler,
logManager,
OptionConverters.toScala(remoteLogManager),
quotaManagers,
metadataCache,
logDirFailureChannel,
Expand Down
Loading