diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index df97028a0f31..2a835c7c8f69 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -303,6 +303,7 @@ message OMRequest { optional GetObjectTaggingRequest getObjectTaggingRequest = 140; optional PutObjectTaggingRequest putObjectTaggingRequest = 141; optional DeleteObjectTaggingRequest deleteObjectTaggingRequest = 142; + optional ExecutionControlRequest executionControlRequest = 143; } message OMResponse { @@ -2300,6 +2301,10 @@ message DeleteObjectTaggingRequest { message DeleteObjectTaggingResponse { } +message ExecutionControlRequest { + required uint64 index = 1; +} + /** The OM service that takes care of Ozone namespace. */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexManager.java new file mode 100644 index 000000000000..b74fe3285a11 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexManager.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.execution; + +import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MANAGED_INDEX; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hdds.utils.TransactionInfo; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.ozone.om.OzoneManager; + +/** + * Manages indexes for request handling and persist. + */ +public final class IndexManager { + public static final String OM_INDEX_KEY = "#OMINDEX"; + + private final AtomicLong index = new AtomicLong(); + private final AtomicLong commitIndex = new AtomicLong(); + private final OzoneManager ozoneManager; + private final AtomicBoolean enabled = new AtomicBoolean(true); + + public IndexManager(OzoneManager ozoneManager) throws IOException { + this.ozoneManager = ozoneManager; + initialize(); + } + + public void initialize() throws IOException { + if (!ozoneManager.getVersionManager().isAllowed(MANAGED_INDEX)) { + enabled.set(false); + return; + } + + // default first time starts with "0" + long initIndex = 0; + // retrieve last saved index + TransactionInfo transactionInfo = ozoneManager.getMetadataManager().getTransactionInfoTable().get(OM_INDEX_KEY); + if (null == transactionInfo) { + // use ratis transaction for first time upgrade + transactionInfo = TransactionInfo.readTransactionInfo(ozoneManager.getMetadataManager()); + } + if (null != transactionInfo) { + initIndex = transactionInfo.getTransactionIndex(); + } + index.set(initIndex); + commitIndex.set(initIndex); + } + + public void finalizeFeature() throws IOException { + if (enabled.get()) { + return; + } + + // reinit the feature on finalization + long initIndex = 0; + TransactionInfo transactionInfo = TransactionInfo.readTransactionInfo(ozoneManager.getMetadataManager()); + if (null != transactionInfo) { + initIndex = transactionInfo.getTransactionIndex(); + } + index.set(initIndex); + enabled.set(true); + + try (BatchOperation batchOperation = ozoneManager.getMetadataManager().getStore() + .initBatchOperation()) { + saveIndex(batchOperation, initIndex); + ozoneManager.getMetadataManager().getStore().commitBatchOperation(batchOperation); + } + } + + public long nextIndex() { + if (!enabled.get()) { + return -1; + } + return index.incrementAndGet(); + } + + /** + * Follower on every transaction update the commit index via saveIndex(). + * When the follower becomes leader onLeaderChange(), it updates index with Max (commit index, current index). + * Max is done for purpose where follower can have higher index (not yet sync to other nodes) and being discarded + * for continuation on that node. + */ + public void onLeaderChange() { + index.set(Math.max(commitIndex.get(), index.get())); + } + + public synchronized void saveIndex(BatchOperation batchOperation, long idx) throws IOException { + if (!enabled.get()) { + return; + } + if (idx <= commitIndex.get()) { + return; + } + + ozoneManager.getMetadataManager().getTransactionInfoTable().putWithBatch(batchOperation, OM_INDEX_KEY, + TransactionInfo.valueOf(-1, idx)); + commitIndex.set(idx); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java index 4ce714ab3dc3..4a5982b49627 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java @@ -21,13 +21,16 @@ import com.google.protobuf.ServiceException; import java.io.IOException; +import java.util.function.Supplier; import org.apache.hadoop.ozone.om.OMPerformanceMetrics; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.ratis.protocol.ClientId; /** * entry for execution flow for write request. @@ -36,10 +39,25 @@ public class OMExecutionFlow { private final OzoneManager ozoneManager; private final OMPerformanceMetrics perfMetrics; + private final Supplier indexGenerator; - public OMExecutionFlow(OzoneManager om) { + public OMExecutionFlow(OzoneManager om) throws IOException { this.ozoneManager = om; this.perfMetrics = ozoneManager.getPerfMetrics(); + indexGenerator = om.getOmRatisServer().getOmStateMachine().getIndexManager()::nextIndex; + } + + /** + * Internal request handling with defined clientId and callId. + * + * @param omRequest the request + * @param clientId the clientId + * @param callId the callId + * @return OMResponse the response of execution + * @throws ServiceException the exception on execution + */ + public OMResponse submitInternal(OMRequest omRequest, ClientId clientId, long callId) throws ServiceException { + return ozoneManager.getOmRatisServer().submitRequest(updateControlRequest(omRequest), clientId, callId); } /** @@ -57,7 +75,7 @@ public OMResponse submit(OMRequest omRequest) throws ServiceException { private OMResponse submitExecutionToRatis(OMRequest request) throws ServiceException { // 1. create client request and preExecute OMClientRequest omClientRequest = null; - final OMRequest requestToSubmit; + OMRequest requestToSubmit; try { omClientRequest = OzoneManagerRatisUtils.createClientRequest(request, ozoneManager); assert (omClientRequest != null); @@ -73,10 +91,18 @@ private OMResponse submitExecutionToRatis(OMRequest request) throws ServiceExcep } // 2. submit request to ratis + requestToSubmit = updateControlRequest(requestToSubmit); OMResponse response = ozoneManager.getOmRatisServer().submitRequest(requestToSubmit); if (!response.getSuccess()) { omClientRequest.handleRequestFailure(ozoneManager); } return response; } + + private OMRequest updateControlRequest(OMRequest requestToSubmit) { + OzoneManagerProtocolProtos.ExecutionControlRequest controlRequest = + OzoneManagerProtocolProtos.ExecutionControlRequest.newBuilder().setIndex(indexGenerator.get()).build(); + requestToSubmit = requestToSubmit.toBuilder().setExecutionControlRequest(controlRequest).build(); + return requestToSubmit; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java index d0dd65b006a9..fcd13386a0ac 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java @@ -27,10 +27,11 @@ public final class ExecutionContext { private final TermIndex termIndex; private ExecutionContext(long index, TermIndex termIndex) { - this.index = index; if (null == termIndex) { + // termIndex will be null for pre-ratis execution case which is before ratis transaction termIndex = TermIndex.valueOf(-1, index); } + this.index = index; this.termIndex = termIndex; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java index 80c20f7af6dc..19cbccb233ee 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java @@ -27,9 +27,9 @@ import org.apache.hadoop.ozone.audit.AuditMessage; import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; -import org.apache.ratis.server.protocol.TermIndex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,12 +115,12 @@ private static OMAction getAction(OzoneManagerProtocolProtos.OMRequest request) return omAction; } - public static void log(OMAuditLogger.Builder builder, TermIndex termIndex) { + public static void log(OMAuditLogger.Builder builder, ExecutionContext context) { if (builder.isLog.get()) { if (null == builder.getAuditMap()) { builder.setAuditMap(new HashMap<>()); } - builder.getAuditMap().put("Transaction", String.valueOf(termIndex.getIndex())); + builder.getAuditMap().put("Transaction", context.getTermIndex().getIndex() + "::" + context.getIndex()); builder.getMessageBuilder().withParams(builder.getAuditMap()); builder.getAuditLogger().logWrite(builder.getMessageBuilder().build()); } @@ -134,7 +134,7 @@ public static void log(OMAuditLogger.Builder builder) { } public static void log(OMAuditLogger.Builder builder, OMClientRequest request, OzoneManager om, - TermIndex termIndex, Throwable th) { + ExecutionContext context, Throwable th) { if (builder.isLog.get()) { builder.getAuditLogger().logWrite(builder.getMessageBuilder().build()); return; @@ -150,7 +150,7 @@ public static void log(OMAuditLogger.Builder builder, OMClientRequest request, O } try { builder.getAuditMap().put("Command", request.getOmRequest().getCmdType().name()); - builder.getAuditMap().put("Transaction", String.valueOf(termIndex.getIndex())); + builder.getAuditMap().put("Transaction", context.getTermIndex().getIndex() + "::" + context.getIndex()); request.buildAuditMessage(action, builder.getAuditMap(), th, request.getUserInfo()); builder.setLog(true); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index c7f17e16bb33..60a7552084bd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -54,6 +54,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.function.CheckedBiConsumer; import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,10 +78,12 @@ public final class OzoneManagerDoubleBuffer { private static class Entry { private final TermIndex termIndex; private final OMClientResponse response; + private final long index; - Entry(TermIndex termIndex, OMClientResponse response) { + Entry(TermIndex termIndex, OMClientResponse response, long index) { this.termIndex = termIndex; this.response = response; + this.index = index; } TermIndex getTermIndex() { @@ -90,6 +93,10 @@ TermIndex getTermIndex() { OMClientResponse getResponse() { return response; } + + long getIndex() { + return index; + } } /** @@ -98,6 +105,7 @@ OMClientResponse getResponse() { public static final class Builder { private OMMetadataManager omMetadataManager; private Consumer updateLastAppliedIndex = termIndex -> { }; + private CheckedBiConsumer updateOmCommitIndex = (m, n) -> { }; private boolean isTracingEnabled = false; private int maxUnFlushedTransactionCount = 0; private FlushNotifier flushNotifier; @@ -116,6 +124,11 @@ Builder setUpdateLastAppliedIndex(Consumer updateLastAppliedIndex) { return this; } + Builder setUpdateOmCommitIndex(CheckedBiConsumer updateOmCommitIndex) { + this.updateOmCommitIndex = updateOmCommitIndex; + return this; + } + public Builder enableTracing(boolean enableTracing) { this.isTracingEnabled = enableTracing; return this; @@ -177,6 +190,7 @@ static Semaphore newSemaphore(int permits) { private final OMMetadataManager omMetadataManager; private final Consumer updateLastAppliedIndex; + private final CheckedBiConsumer updateOmCommitIndex; private final S3SecretManager s3SecretManager; @@ -196,6 +210,7 @@ private OzoneManagerDoubleBuffer(Builder b) { this.omMetadataManager = b.omMetadataManager; this.s3SecretManager = b.s3SecretManager; this.updateLastAppliedIndex = b.updateLastAppliedIndex; + this.updateOmCommitIndex = b.updateOmCommitIndex; this.flushNotifier = b.flushNotifier; this.unFlushedTransactions = newSemaphore(b.maxUnFlushedTransactionCount); @@ -330,6 +345,7 @@ private void flushBatch(Queue buffer) throws IOException { .map(Entry::getTermIndex) .sorted() .collect(Collectors.toList()); + final long index = buffer.stream().mapToLong(Entry::getIndex).max().orElse(0); final int flushedTransactionsSize = flushedTransactions.size(); final TermIndex lastTransaction = flushedTransactions.get(flushedTransactionsSize - 1); @@ -347,6 +363,8 @@ private void flushBatch(Queue buffer) throws IOException { () -> omMetadataManager.getTransactionInfoTable().putWithBatch( batchOperation, TRANSACTION_INFO_KEY, TransactionInfo.valueOf(lastTransaction))); + updateOmCommitIndex.accept(batchOperation, index); + long startTime = Time.monotonicNow(); flushBatchWithTrace(lastTraceId, buffer.size(), () -> omMetadataManager.getStore() @@ -458,7 +476,7 @@ private void addCleanupEntry(Entry entry, Map> cleanupEpochs) } for (String table : cleanupTables) { cleanupEpochs.computeIfAbsent(table, list -> new ArrayList<>()) - .add(entry.getTermIndex().getIndex()); + .add(entry.getIndex()); } } else { // This is to catch early errors, when a new response class missed to @@ -527,7 +545,14 @@ private void terminate(Throwable t, int status, OMResponse omResponse) { * Add OmResponseBufferEntry to buffer. */ public synchronized void add(OMClientResponse response, TermIndex termIndex) { - currentBuffer.add(new Entry(termIndex, response)); + add(response, termIndex, termIndex.getIndex()); + } + + /** + * Add OmResponseBufferEntry to buffer. + */ + public synchronized void add(OMClientResponse response, TermIndex termIndex, long index) { + currentBuffer.add(new Entry(termIndex, response, index)); notify(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 0394ee08c77f..118d8092473f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -38,6 +38,7 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.OzoneManagerPrepareState; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.execution.IndexManager; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.lock.OMLockDetails; @@ -100,6 +101,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private volatile TermIndex lastNotifiedTermIndex = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX); /** The last index skipped by {@link #notifyTermIndexUpdated(long, long)}. */ private volatile long lastSkippedIndex = RaftLog.INVALID_LOG_INDEX; + private final IndexManager indexManager; private final NettyMetrics nettyMetrics; @@ -107,6 +109,7 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer, boolean isTracingEnabled) throws IOException { this.isTracingEnabled = isTracingEnabled; this.ozoneManager = ratisServer.getOzoneManager(); + this.indexManager = new IndexManager(ozoneManager); loadSnapshotInfoFromDB(); this.threadPrefix = ozoneManager.getThreadNamePrefix(); @@ -147,6 +150,7 @@ public synchronized void reinitialize() throws IOException { unpause(lastApplied.getIndex(), lastApplied.getTerm()); LOG.info("{}: reinitialize {} with {}", getId(), getGroupId(), lastApplied); } + indexManager.initialize(); } @Override @@ -162,6 +166,10 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, // Initialize OMHAMetrics ozoneManager.omHAMetricsInit(newLeaderId.toString()); LOG.info("{}: leader changed to {}", groupMemberId, newLeaderId); + // if the node is leader (can be ready or not ready, need update index) + if (ozoneManager.getOmRatisServer().checkLeaderStatus() != OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER) { + indexManager.onLeaderChange(); + } } /** Notified by Ratis for non-StateMachine term-index update. */ @@ -459,6 +467,7 @@ public OzoneManagerDoubleBuffer buildDoubleBufferForRatis() { return OzoneManagerDoubleBuffer.newBuilder() .setOmMetadataManager(ozoneManager.getMetadataManager()) .setUpdateLastAppliedIndex(this::updateLastAppliedTermIndex) + .setUpdateOmCommitIndex(indexManager::saveIndex) .setMaxUnFlushedTransactionCount(maxUnFlushedTransactionCount) .setThreadPrefix(threadPrefix) .setS3SecretManager(ozoneManager.getS3SecretManager()) @@ -553,8 +562,8 @@ public void close() { * @return response from OM */ private OMResponse runCommand(OMRequest request, TermIndex termIndex) { + ExecutionContext context = ExecutionContext.of(getIndex(request, termIndex), termIndex); try { - ExecutionContext context = ExecutionContext.of(termIndex.getIndex(), termIndex); final OMClientResponse omClientResponse = handler.handleWriteRequest( request, context, ozoneManagerDoubleBuffer); OMLockDetails omLockDetails = omClientResponse.getOmLockDetails(); @@ -567,7 +576,7 @@ private OMResponse runCommand(OMRequest request, TermIndex termIndex) { } } catch (IOException e) { LOG.warn("Failed to write, Exception occurred ", e); - return createErrorResponse(request, e, termIndex); + return createErrorResponse(request, e, context); } catch (Throwable e) { // For any Runtime exceptions, terminate OM. String errorMessage = "Request " + request + " failed with exception"; @@ -576,8 +585,19 @@ private OMResponse runCommand(OMRequest request, TermIndex termIndex) { return null; } + private static long getIndex(OMRequest request, TermIndex termIndex) { + long currIndex = termIndex.getIndex(); + if (request.hasExecutionControlRequest()) { + if (request.getExecutionControlRequest().getIndex() != -1) { + // -1 case can happen before upgrade finalize feature for index manager + currIndex = request.getExecutionControlRequest().getIndex(); + } + } + return currIndex; + } + private OMResponse createErrorResponse( - OMRequest omRequest, IOException exception, TermIndex termIndex) { + OMRequest omRequest, IOException exception, ExecutionContext context) { OMResponse.Builder omResponseBuilder = OMResponse.newBuilder() .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception)) .setCmdType(omRequest.getCmdType()) @@ -588,7 +608,7 @@ private OMResponse createErrorResponse( } OMResponse omResponse = omResponseBuilder.build(); OMClientResponse omClientResponse = new DummyOMClientResponse(omResponse); - ozoneManagerDoubleBuffer.add(omClientResponse, termIndex); + ozoneManagerDoubleBuffer.add(omClientResponse, context.getTermIndex(), context.getIndex()); return omResponse; } @@ -657,4 +677,12 @@ public void awaitDoubleBufferFlush() throws InterruptedException { public OzoneManagerDoubleBuffer getOzoneManagerDoubleBuffer() { return ozoneManagerDoubleBuffer; } + + public void finalizeIndexFeature() throws IOException { + indexManager.finalizeFeature(); + } + + public IndexManager getIndexManager() { + return indexManager; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 5548be7bd8ba..c8e81760a9bf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -515,7 +515,7 @@ public static GrpcTlsConfig createServerTlsConfig(SecurityConfig conf, public static OzoneManagerProtocolProtos.OMResponse submitRequest( OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException { - return om.getOmRatisServer().submitRequest(omRequest, clientId, callId); + return om.getOmExecutionFlow().submitInternal(omRequest, clientId, callId); } public static OzoneManagerProtocolProtos.OMResponse createErrorResponse( diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java index 05bb957f22cc..864fcc02b09e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java @@ -65,7 +65,7 @@ public OMPrepareRequest(OMRequest omRequest) { @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { - final long transactionLogIndex = context.getIndex(); + final long transactionLogIndex = context.getTermIndex().getIndex(); LOG.info("OM {} Received prepare request with log {}", ozoneManager.getOMNodeId(), context.getTermIndex()); @@ -102,7 +102,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut // the snapshot index in the prepared state. OzoneManagerDoubleBuffer doubleBuffer = ozoneManager.getOmRatisServer().getOmStateMachine().getOzoneManagerDoubleBuffer(); - doubleBuffer.add(response, context.getTermIndex()); + doubleBuffer.add(response, context.getTermIndex(), context.getIndex()); OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer(); final RaftServer.Division division = omRatisServer.getServerDivision(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java index 7deeef51161c..e71fce15a5e4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java @@ -45,7 +45,8 @@ public enum OMLayoutFeature implements LayoutFeature { QUOTA(6, "Ozone quota re-calculate"), HBASE_SUPPORT(7, "Full support of hsync, lease recovery and listOpenFiles APIs for HBase"), - DELEGATION_TOKEN_SYMMETRIC_SIGN(8, "Delegation token signed by symmetric key"); + DELEGATION_TOKEN_SYMMETRIC_SIGN(8, "Delegation token signed by symmetric key"), + MANAGED_INDEX(9, "Make use of OM managed index"); /////////////////////////////// ///////////////////////////// // Example OM Layout Feature with Actions diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java new file mode 100644 index 000000000000..dd6f1767bd51 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.upgrade; + +import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MANAGED_INDEX; +import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE; + +import org.apache.hadoop.ozone.om.OzoneManager; + +/** + * initialize om managed index manager to provide index for further request handling. + */ +@UpgradeActionOm(type = ON_FINALIZE, feature = MANAGED_INDEX) +public class OmManagedIndexUpgradeAction implements OmUpgradeAction { + @Override + public void execute(OzoneManager arg) throws Exception { + // Prepare ensures the db and ratis at reached to a checkpoint where all changes are flushed + // And no further operation is allowed + // At this point, IndexManager finialize will re-init index from ratis index and this will be starting point + // for index generation for further request as used by object and other operation + arg.getOmRatisServer().getOmStateMachine().finalizeIndexFeature(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index 67190972ce23..291d93ea0390 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -418,11 +418,10 @@ public OMClientResponse handleWriteRequestImpl(OMRequest omRequest, ExecutionCon impl.getPerfMetrics().getValidateAndUpdateCacheLatencyNs(), () -> Objects.requireNonNull(omClientRequest.validateAndUpdateCache(getOzoneManager(), context), "omClientResponse returned by validateAndUpdateCache cannot be null")); - OMAuditLogger.log(omClientRequest.getAuditBuilder(), context.getTermIndex()); + OMAuditLogger.log(omClientRequest.getAuditBuilder(), context); return omClientResponse; } catch (Throwable th) { - OMAuditLogger.log(omClientRequest.getAuditBuilder(), omClientRequest, getOzoneManager(), context.getTermIndex(), - th); + OMAuditLogger.log(omClientRequest.getAuditBuilder(), omClientRequest, getOzoneManager(), context, th); throw th; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java index 07c154c878e1..45a4b8da2748 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java @@ -61,7 +61,7 @@ default OMClientResponse handleWriteRequest(OMRequest omRequest, ExecutionContex OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer) throws IOException { final OMClientResponse response = handleWriteRequestImpl(omRequest, context); if (omRequest.getCmdType() != Type.Prepare) { - ozoneManagerDoubleBuffer.add(response, context.getTermIndex()); + ozoneManagerDoubleBuffer.add(response, context.getTermIndex(), context.getIndex()); } return response; } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexManager.java new file mode 100644 index 000000000000..974ee07ec051 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexManager.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.execution; + +import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MANAGED_INDEX; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.hdds.utils.TransactionInfo; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +/** + * Tests Index manager changes. + */ +public class TestIndexManager { + @Test + public void testIndexManager() throws Exception { + OzoneManager om = mock(OzoneManager.class); + OMMetadataManager metaManager = mock(OMMetadataManager.class); + when(om.getMetadataManager()).thenReturn(metaManager); + OMLayoutVersionManager verManager = mock(OMLayoutVersionManager.class); + when(verManager.isAllowed(Mockito.eq(MANAGED_INDEX))).thenReturn(true); + when(om.getVersionManager()).thenReturn(verManager); + Table txInfoTable = mock(Table.class); + when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); + TransactionInfo txInfo = TransactionInfo.valueOf(-1, 100); + when(txInfoTable.get(anyString())).thenReturn(txInfo); + IndexManager indexManager = new IndexManager(om); + assertEquals(indexManager.nextIndex(), 101); + + // save index and verify after change leader, for next index + BatchOperation batchOpr = mock(BatchOperation.class); + indexManager.saveIndex(batchOpr, 102); + indexManager.onLeaderChange(); + assertEquals(indexManager.nextIndex(), 103); + } + + @Test + public void testUpgradeIndexManager() throws Exception { + OzoneManager om = mock(OzoneManager.class); + OMMetadataManager metaManager = mock(OMMetadataManager.class); + when(om.getMetadataManager()).thenReturn(metaManager); + OMLayoutVersionManager verManager = mock(OMLayoutVersionManager.class); + when(om.getVersionManager()).thenReturn(verManager); + when(verManager.needsFinalization()).thenReturn(true); + Table txInfoTable = mock(Table.class); + when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); + DBStore dbstore = mock(DBStore.class); + when(metaManager.getStore()).thenReturn(dbstore); + BatchOperation batchOpr = mock(BatchOperation.class); + when(dbstore.initBatchOperation()).thenReturn(batchOpr); + TransactionInfo txInfo = TransactionInfo.valueOf(-1, 110); + when(txInfoTable.get(anyString())).thenReturn(null).thenReturn(txInfo); + when(txInfoTable.getSkipCache(anyString())).thenReturn(txInfo); + IndexManager indexManager = new IndexManager(om); + assertEquals(indexManager.nextIndex(), -1); + + indexManager.saveIndex(batchOpr, 114); + indexManager.onLeaderChange(); + assertEquals(indexManager.nextIndex(), -1); + + // check ExecutionContext behavior + ExecutionContext executionContext = ExecutionContext.of(indexManager.nextIndex(), txInfo.getTermIndex()); + assertEquals(executionContext.getIndex(), txInfo.getTermIndex().getIndex()); + + + // save index and verify after change leader, for next index + indexManager.finalizeFeature(); + assertEquals(indexManager.nextIndex(), 111); + + // save index and verify after change leader, for next index + indexManager.saveIndex(batchOpr, 114); + indexManager.onLeaderChange(); + assertEquals(indexManager.nextIndex(), 115); + + executionContext = ExecutionContext.of(indexManager.nextIndex(), txInfo.getTermIndex()); + assertEquals(executionContext.getIndex(), 116); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java index 3fa1e3a1a0af..a4947ef958bd 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java @@ -59,6 +59,7 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse { private OzoneManagerDoubleBuffer doubleBuffer; private final AtomicLong trxId = new AtomicLong(0); private long term = 1L; + private AtomicLong currentCommitIndex = new AtomicLong(); @TempDir private Path folder; @@ -72,6 +73,7 @@ public void setup() throws IOException { doubleBuffer = OzoneManagerDoubleBuffer.newBuilder() .setOmMetadataManager(omMetadataManager) .setMaxUnFlushedTransactionCount(10000) + .setUpdateOmCommitIndex((x, y) -> currentCommitIndex.set(y)) .build() .start(); } @@ -127,6 +129,7 @@ public void testDoubleBufferWithDummyResponse() throws Exception { assertNotNull(transactionInfo); assertEquals(bucketCount, transactionInfo.getTransactionIndex()); assertEquals(term, transactionInfo.getTerm()); + assertEquals(currentCommitIndex.get(), transactionInfo.getTermIndex().getIndex()); } /** diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java index 7c2bd2aa62c7..13307102a444 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java @@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OMNodeDetails; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.security.OMCertificateClient; @@ -118,6 +119,8 @@ public void init(@TempDir Path metaDirPath) throws Exception { when(ozoneManager.getConfiguration()).thenReturn(conf); final OmConfig omConfig = conf.getObject(OmConfig.class); when(ozoneManager.getConfig()).thenReturn(omConfig); + OMLayoutVersionManager versionManger = mock(OMLayoutVersionManager.class); + when(ozoneManager.getVersionManager()).thenReturn(versionManger); secConfig = new SecurityConfig(conf); HddsProtos.OzoneManagerDetailsProto omInfo = OzoneManager.getOmDetailsProto(conf, omID); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java index 797f6aa2c3e3..32dc4f400bda 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java @@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.om.OzoneManagerPrepareState; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; @@ -97,6 +98,8 @@ public void setup() throws Exception { when(ozoneManager.getAuditLogger()).thenReturn(auditLogger); prepareState = new OzoneManagerPrepareState(conf); when(ozoneManager.getPrepareState()).thenReturn(prepareState); + OMLayoutVersionManager versionManager = mock(OMLayoutVersionManager.class); + when(ozoneManager.getVersionManager()).thenReturn(versionManager); when(ozoneManagerRatisServer.getOzoneManager()).thenReturn(ozoneManager); when(ozoneManager.getTransactionInfo()).thenReturn(mock(TransactionInfo.class));