From 68e94eab7df35cda0496ca6a4709752e497d90e1 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Sun, 19 Jan 2025 19:06:47 +0530 Subject: [PATCH 01/11] HDDS-11901. use om managed index for handling request --- .../ozone/om/execution/IndexGenerator.java | 74 +++++++++++++++++++ .../om/execution/common/BiConsumerX.java | 32 ++++++++ .../om/execution/common/package-info.java | 22 ++++++ .../ozone/om/helpers/OMAuditLogger.java | 10 +-- .../om/ratis/OzoneManagerDoubleBuffer.java | 35 ++++++++- .../om/ratis/OzoneManagerStateMachine.java | 18 ++++- .../om/request/upgrade/OMPrepareRequest.java | 2 +- .../OzoneManagerRequestHandler.java | 5 +- .../ozone/protocolPB/RequestHandler.java | 2 +- 9 files changed, 184 insertions(+), 16 deletions(-) create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/package-info.java diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java new file mode 100644 index 000000000000..0cfbd22c9143 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.execution; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hdds.utils.TransactionInfo; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.ozone.om.OzoneManager; + +/** + * Context required for execution of a request. + */ +public final class IndexGenerator { + public static final String OM_INDEX_KEY = "#OMINDEX"; + + private final AtomicLong index = new AtomicLong(); + private final AtomicLong commitIndex = new AtomicLong(); + private final OzoneManager ozoneManager; + + public IndexGenerator(OzoneManager ozoneManager) throws IOException { + this.ozoneManager = ozoneManager; + initialize(); + } + + public void initialize() throws IOException { + // default first time starts with "0" + long initIndex = 0; + // retrieve last saved index + TransactionInfo transactionInfo = ozoneManager.getMetadataManager().getTransactionInfoTable().get(OM_INDEX_KEY); + if (null == transactionInfo) { + // use ratis transaction for first time upgrade + transactionInfo = TransactionInfo.readTransactionInfo(ozoneManager.getMetadataManager()); + } + if (null != transactionInfo) { + initIndex = transactionInfo.getTransactionIndex(); + } + index.set(initIndex); + commitIndex.set(initIndex); + } + + public long nextIndex() { + return index.incrementAndGet(); + } + + public void changeLeader() { + index.set(Math.max(commitIndex.get(), index.get())); + } + + public synchronized void saveIndex(BatchOperation batchOperation, long idx) throws IOException { + if (idx <= commitIndex.get()) { + return; + } + + ozoneManager.getMetadataManager().getTransactionInfoTable().putWithBatch(batchOperation, OM_INDEX_KEY, + TransactionInfo.valueOf(-1, idx)); + commitIndex.set(idx); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java new file mode 100644 index 000000000000..3a4a085e5035 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.execution.common; + +import java.io.IOException; + +@FunctionalInterface +public interface BiConsumerX { + /** + * Performs this operation on the given arguments. + * + * @param t the first input argument. + * @param u the second input argument. + * @throws java.io.IOException if handling have IOException + */ + void accept(T t, U u) throws IOException; +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/package-info.java new file mode 100644 index 000000000000..9b7457df15d2 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.execution.common; + +/** + * This package contains classes for the common code related to execution. + */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java index 80c20f7af6dc..19cbccb233ee 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/helpers/OMAuditLogger.java @@ -27,9 +27,9 @@ import org.apache.hadoop.ozone.audit.AuditMessage; import org.apache.hadoop.ozone.audit.OMAction; import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; -import org.apache.ratis.server.protocol.TermIndex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,12 +115,12 @@ private static OMAction getAction(OzoneManagerProtocolProtos.OMRequest request) return omAction; } - public static void log(OMAuditLogger.Builder builder, TermIndex termIndex) { + public static void log(OMAuditLogger.Builder builder, ExecutionContext context) { if (builder.isLog.get()) { if (null == builder.getAuditMap()) { builder.setAuditMap(new HashMap<>()); } - builder.getAuditMap().put("Transaction", String.valueOf(termIndex.getIndex())); + builder.getAuditMap().put("Transaction", context.getTermIndex().getIndex() + "::" + context.getIndex()); builder.getMessageBuilder().withParams(builder.getAuditMap()); builder.getAuditLogger().logWrite(builder.getMessageBuilder().build()); } @@ -134,7 +134,7 @@ public static void log(OMAuditLogger.Builder builder) { } public static void log(OMAuditLogger.Builder builder, OMClientRequest request, OzoneManager om, - TermIndex termIndex, Throwable th) { + ExecutionContext context, Throwable th) { if (builder.isLog.get()) { builder.getAuditLogger().logWrite(builder.getMessageBuilder().build()); return; @@ -150,7 +150,7 @@ public static void log(OMAuditLogger.Builder builder, OMClientRequest request, O } try { builder.getAuditMap().put("Command", request.getOmRequest().getCmdType().name()); - builder.getAuditMap().put("Transaction", String.valueOf(termIndex.getIndex())); + builder.getAuditMap().put("Transaction", context.getTermIndex().getIndex() + "::" + context.getIndex()); request.buildAuditMessage(action, builder.getAuditMap(), th, request.getUserInfo()); builder.setLog(true); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index c7f17e16bb33..c67f235e6e4b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -45,6 +46,7 @@ import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.S3SecretManager; import org.apache.hadoop.ozone.om.codec.OMDBDefinition; +import org.apache.hadoop.ozone.om.execution.common.BiConsumerX; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; @@ -77,10 +79,12 @@ public final class OzoneManagerDoubleBuffer { private static class Entry { private final TermIndex termIndex; private final OMClientResponse response; + private final long index; - Entry(TermIndex termIndex, OMClientResponse response) { + Entry(TermIndex termIndex, OMClientResponse response, long index) { this.termIndex = termIndex; this.response = response; + this.index = index; } TermIndex getTermIndex() { @@ -90,6 +94,10 @@ TermIndex getTermIndex() { OMClientResponse getResponse() { return response; } + + long getIndex() { + return index; + } } /** @@ -98,6 +106,7 @@ OMClientResponse getResponse() { public static final class Builder { private OMMetadataManager omMetadataManager; private Consumer updateLastAppliedIndex = termIndex -> { }; + private BiConsumerX updateOmCommitIndex = (m, n) -> { }; private boolean isTracingEnabled = false; private int maxUnFlushedTransactionCount = 0; private FlushNotifier flushNotifier; @@ -116,6 +125,11 @@ Builder setUpdateLastAppliedIndex(Consumer updateLastAppliedIndex) { return this; } + Builder setUpdateOmCommitIndex(BiConsumerX updateOmCommitIndex) { + this.updateOmCommitIndex = updateOmCommitIndex; + return this; + } + public Builder enableTracing(boolean enableTracing) { this.isTracingEnabled = enableTracing; return this; @@ -177,6 +191,7 @@ static Semaphore newSemaphore(int permits) { private final OMMetadataManager omMetadataManager; private final Consumer updateLastAppliedIndex; + private final BiConsumerX updateOmCommitIndex; private final S3SecretManager s3SecretManager; @@ -196,6 +211,7 @@ private OzoneManagerDoubleBuffer(Builder b) { this.omMetadataManager = b.omMetadataManager; this.s3SecretManager = b.s3SecretManager; this.updateLastAppliedIndex = b.updateLastAppliedIndex; + this.updateOmCommitIndex = b.updateOmCommitIndex; this.flushNotifier = b.flushNotifier; this.unFlushedTransactions = newSemaphore(b.maxUnFlushedTransactionCount); @@ -330,6 +346,8 @@ private void flushBatch(Queue buffer) throws IOException { .map(Entry::getTermIndex) .sorted() .collect(Collectors.toList()); + final long index = buffer.stream().max(Comparator.comparing(Entry::getIndex)).orElse(new Entry(null, null, 0)) + .getIndex(); final int flushedTransactionsSize = flushedTransactions.size(); final TermIndex lastTransaction = flushedTransactions.get(flushedTransactionsSize - 1); @@ -347,6 +365,12 @@ private void flushBatch(Queue buffer) throws IOException { () -> omMetadataManager.getTransactionInfoTable().putWithBatch( batchOperation, TRANSACTION_INFO_KEY, TransactionInfo.valueOf(lastTransaction))); + updateOmCommitIndex.accept(batchOperation, index); + //addToBatchTransactionInfoWithTrace(lastTraceId, + // lastTransaction.getIndex(), + // () -> omMetadataManager.getTransactionInfoTable().putWithBatch( + // batchOperation, IndexGenerator.OM_INDEX_KEY, TransactionInfo.valueOf(-1, index))); + long startTime = Time.monotonicNow(); flushBatchWithTrace(lastTraceId, buffer.size(), () -> omMetadataManager.getStore() @@ -527,7 +551,14 @@ private void terminate(Throwable t, int status, OMResponse omResponse) { * Add OmResponseBufferEntry to buffer. */ public synchronized void add(OMClientResponse response, TermIndex termIndex) { - currentBuffer.add(new Entry(termIndex, response)); + add(response, termIndex, termIndex.getIndex()); + } + + /** + * Add OmResponseBufferEntry to buffer. + */ + public synchronized void add(OMClientResponse response, TermIndex termIndex, long index) { + currentBuffer.add(new Entry(termIndex, response, index)); notify(); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 0394ee08c77f..ef178ea0ba3a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -38,6 +38,7 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.OzoneManagerPrepareState; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.execution.IndexGenerator; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.lock.OMLockDetails; @@ -100,6 +101,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private volatile TermIndex lastNotifiedTermIndex = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX); /** The last index skipped by {@link #notifyTermIndexUpdated(long, long)}. */ private volatile long lastSkippedIndex = RaftLog.INVALID_LOG_INDEX; + private final IndexGenerator indexGenerator; private final NettyMetrics nettyMetrics; @@ -107,6 +109,7 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer, boolean isTracingEnabled) throws IOException { this.isTracingEnabled = isTracingEnabled; this.ozoneManager = ratisServer.getOzoneManager(); + this.indexGenerator = new IndexGenerator(ozoneManager); loadSnapshotInfoFromDB(); this.threadPrefix = ozoneManager.getThreadNamePrefix(); @@ -137,6 +140,7 @@ public void initialize(RaftServer server, RaftGroupId id, storage.init(raftStorage); LOG.info("{}: initialize {} with {}", getId(), id, getLastAppliedTermIndex()); }); + indexGenerator.initialize(); } @Override @@ -147,6 +151,7 @@ public synchronized void reinitialize() throws IOException { unpause(lastApplied.getIndex(), lastApplied.getTerm()); LOG.info("{}: reinitialize {} with {}", getId(), getGroupId(), lastApplied); } + indexGenerator.initialize(); } @Override @@ -162,6 +167,10 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, // Initialize OMHAMetrics ozoneManager.omHAMetricsInit(newLeaderId.toString()); LOG.info("{}: leader changed to {}", groupMemberId, newLeaderId); + // if the node is leader (can be ready or not ready, need update index) + if (ozoneManager.getOmRatisServer().checkLeaderStatus() != OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER) { + indexGenerator.changeLeader(); + } } /** Notified by Ratis for non-StateMachine term-index update. */ @@ -459,6 +468,7 @@ public OzoneManagerDoubleBuffer buildDoubleBufferForRatis() { return OzoneManagerDoubleBuffer.newBuilder() .setOmMetadataManager(ozoneManager.getMetadataManager()) .setUpdateLastAppliedIndex(this::updateLastAppliedTermIndex) + .setUpdateOmCommitIndex(indexGenerator::saveIndex) .setMaxUnFlushedTransactionCount(maxUnFlushedTransactionCount) .setThreadPrefix(threadPrefix) .setS3SecretManager(ozoneManager.getS3SecretManager()) @@ -553,8 +563,8 @@ public void close() { * @return response from OM */ private OMResponse runCommand(OMRequest request, TermIndex termIndex) { + ExecutionContext context = ExecutionContext.of(indexGenerator.nextIndex(), termIndex); try { - ExecutionContext context = ExecutionContext.of(termIndex.getIndex(), termIndex); final OMClientResponse omClientResponse = handler.handleWriteRequest( request, context, ozoneManagerDoubleBuffer); OMLockDetails omLockDetails = omClientResponse.getOmLockDetails(); @@ -567,7 +577,7 @@ private OMResponse runCommand(OMRequest request, TermIndex termIndex) { } } catch (IOException e) { LOG.warn("Failed to write, Exception occurred ", e); - return createErrorResponse(request, e, termIndex); + return createErrorResponse(request, e, context); } catch (Throwable e) { // For any Runtime exceptions, terminate OM. String errorMessage = "Request " + request + " failed with exception"; @@ -577,7 +587,7 @@ private OMResponse runCommand(OMRequest request, TermIndex termIndex) { } private OMResponse createErrorResponse( - OMRequest omRequest, IOException exception, TermIndex termIndex) { + OMRequest omRequest, IOException exception, ExecutionContext context) { OMResponse.Builder omResponseBuilder = OMResponse.newBuilder() .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception)) .setCmdType(omRequest.getCmdType()) @@ -588,7 +598,7 @@ private OMResponse createErrorResponse( } OMResponse omResponse = omResponseBuilder.build(); OMClientResponse omClientResponse = new DummyOMClientResponse(omResponse); - ozoneManagerDoubleBuffer.add(omClientResponse, termIndex); + ozoneManagerDoubleBuffer.add(omClientResponse, context.getTermIndex(), context.getIndex()); return omResponse; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java index 05bb957f22cc..45f87ecbdc10 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java @@ -102,7 +102,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut // the snapshot index in the prepared state. OzoneManagerDoubleBuffer doubleBuffer = ozoneManager.getOmRatisServer().getOmStateMachine().getOzoneManagerDoubleBuffer(); - doubleBuffer.add(response, context.getTermIndex()); + doubleBuffer.add(response, context.getTermIndex(), context.getIndex()); OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer(); final RaftServer.Division division = omRatisServer.getServerDivision(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index 67190972ce23..291d93ea0390 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -418,11 +418,10 @@ public OMClientResponse handleWriteRequestImpl(OMRequest omRequest, ExecutionCon impl.getPerfMetrics().getValidateAndUpdateCacheLatencyNs(), () -> Objects.requireNonNull(omClientRequest.validateAndUpdateCache(getOzoneManager(), context), "omClientResponse returned by validateAndUpdateCache cannot be null")); - OMAuditLogger.log(omClientRequest.getAuditBuilder(), context.getTermIndex()); + OMAuditLogger.log(omClientRequest.getAuditBuilder(), context); return omClientResponse; } catch (Throwable th) { - OMAuditLogger.log(omClientRequest.getAuditBuilder(), omClientRequest, getOzoneManager(), context.getTermIndex(), - th); + OMAuditLogger.log(omClientRequest.getAuditBuilder(), omClientRequest, getOzoneManager(), context, th); throw th; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java index 07c154c878e1..45a4b8da2748 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java @@ -61,7 +61,7 @@ default OMClientResponse handleWriteRequest(OMRequest omRequest, ExecutionContex OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer) throws IOException { final OMClientResponse response = handleWriteRequestImpl(omRequest, context); if (omRequest.getCmdType() != Type.Prepare) { - ozoneManagerDoubleBuffer.add(response, context.getTermIndex()); + ozoneManagerDoubleBuffer.add(response, context.getTermIndex(), context.getIndex()); } return response; } From 540f0c6f1777d10c83f1b40786be7b50683afde9 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Wed, 22 Jan 2025 12:43:09 +0530 Subject: [PATCH 02/11] support upgrade handling for index change --- .../ozone/om/execution/IndexGenerator.java | 22 ++++++++++ .../flowcontrol/ExecutionContext.java | 7 +++- .../om/ratis/OzoneManagerStateMachine.java | 5 +++ .../ozone/om/upgrade/OMLayoutFeature.java | 3 +- .../upgrade/OmManagedIndexUpgradeAction.java | 40 +++++++++++++++++++ 5 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java index 0cfbd22c9143..9da9fe469d71 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om.execution; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.BatchOperation; @@ -32,6 +33,7 @@ public final class IndexGenerator { private final AtomicLong index = new AtomicLong(); private final AtomicLong commitIndex = new AtomicLong(); private final OzoneManager ozoneManager; + private final AtomicBoolean enabled = new AtomicBoolean(true); public IndexGenerator(OzoneManager ozoneManager) throws IOException { this.ozoneManager = ozoneManager; @@ -52,9 +54,26 @@ public void initialize() throws IOException { } index.set(initIndex); commitIndex.set(initIndex); + if (ozoneManager.getVersionManager().needsFinalization()) { + enabled.set(false); + } + } + + public void finalizeIndexGeneratorFeature() throws IOException { + enabled.set(true); + long initIndex = 0; + TransactionInfo transactionInfo = TransactionInfo.readTransactionInfo(ozoneManager.getMetadataManager()); + if (null != transactionInfo) { + initIndex = transactionInfo.getTransactionIndex(); + } + index.set(initIndex); + commitIndex.set(initIndex); } public long nextIndex() { + if (!enabled.get()) { + return -1; + } return index.incrementAndGet(); } @@ -63,6 +82,9 @@ public void changeLeader() { } public synchronized void saveIndex(BatchOperation batchOperation, long idx) throws IOException { + if (!enabled.get()) { + return; + } if (idx <= commitIndex.get()) { return; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java index d0dd65b006a9..38d53c811604 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java @@ -27,10 +27,15 @@ public final class ExecutionContext { private final TermIndex termIndex; private ExecutionContext(long index, TermIndex termIndex) { - this.index = index; if (null == termIndex) { termIndex = TermIndex.valueOf(-1, index); } + if (index == -1) { + // during upgrade case before finalization, make use of termIndex's index + // till finalization, index generator will return -1 + index = termIndex.getIndex(); + } + this.index = index; this.termIndex = termIndex; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index ef178ea0ba3a..14531308e008 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.response.DummyOMClientResponse; import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; @@ -667,4 +668,8 @@ public void awaitDoubleBufferFlush() throws InterruptedException { public OzoneManagerDoubleBuffer getOzoneManagerDoubleBuffer() { return ozoneManagerDoubleBuffer; } + + public void finalizeIndexGenerator() throws IOException { + indexGenerator.finalizeIndexGeneratorFeature(); + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java index 7deeef51161c..e71fce15a5e4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java @@ -45,7 +45,8 @@ public enum OMLayoutFeature implements LayoutFeature { QUOTA(6, "Ozone quota re-calculate"), HBASE_SUPPORT(7, "Full support of hsync, lease recovery and listOpenFiles APIs for HBase"), - DELEGATION_TOKEN_SYMMETRIC_SIGN(8, "Delegation token signed by symmetric key"); + DELEGATION_TOKEN_SYMMETRIC_SIGN(8, "Delegation token signed by symmetric key"), + MANAGED_INDEX(9, "Make use of OM managed index"); /////////////////////////////// ///////////////////////////// // Example OM Layout Feature with Actions diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java new file mode 100644 index 000000000000..c5561808d5be --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.ozone.om.upgrade; + +import org.apache.hadoop.ozone.om.OzoneManager; + +import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MANAGED_INDEX; +import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE; + +/** + * Quota repair for usages action to be triggered after upgrade. + */ +@UpgradeActionOm(type = ON_FINALIZE, feature = MANAGED_INDEX) +public class OmManagedIndexUpgradeAction implements OmUpgradeAction { + @Override + public void execute(OzoneManager arg) throws Exception { + // Prepare ensures the db and ratis at reached to a checkpoint where all changes are flushed + // And no further operation is allowed + // At this point, IndexGenerator finialize will re-init index from ratis index and this will be starting point + // for index generation for further request as used by object and other operation + arg.getOmRatisServer().getOmStateMachine().finalizeIndexGenerator(); + } +} From 91651e5d537b00679db13fa4619b8fb82c457fda Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Wed, 22 Jan 2025 15:22:15 +0530 Subject: [PATCH 03/11] fix checkstyle and add test case --- .../om/execution/common/BiConsumerX.java | 3 + .../om/ratis/OzoneManagerStateMachine.java | 1 - .../upgrade/OmManagedIndexUpgradeAction.java | 2 +- .../om/execution/TestIndexGenerator.java | 94 +++++++++++++++++++ ...eManagerDoubleBufferWithDummyResponse.java | 3 + 5 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java index 3a4a085e5035..79b02866a884 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java @@ -19,6 +19,9 @@ import java.io.IOException; +/** + * interface for BiConsumer with IOException being thrown. + */ @FunctionalInterface public interface BiConsumerX { /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 14531308e008..9b9bbb1f3df0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -45,7 +45,6 @@ import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.response.DummyOMClientResponse; import org.apache.hadoop.ozone.om.response.OMClientResponse; -import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java index c5561808d5be..e647f4826275 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java @@ -25,7 +25,7 @@ import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE; /** - * Quota repair for usages action to be triggered after upgrade. + * initialize om managed index generator to provide index for further request handling. */ @UpgradeActionOm(type = ON_FINALIZE, feature = MANAGED_INDEX) public class OmManagedIndexUpgradeAction implements OmUpgradeAction { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java new file mode 100644 index 000000000000..5633ef54f62a --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.execution; + +import org.apache.hadoop.hdds.utils.TransactionInfo; +import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +/** + * Tests Index generator changes. + */ +public class TestIndexGenerator { + @Test + public void testIndexGenerator() throws Exception { + OzoneManager om = Mockito.mock(OzoneManager.class); + OMMetadataManager metaManager = Mockito.mock(OMMetadataManager.class); + Mockito.when(om.getMetadataManager()).thenReturn(metaManager); + OMLayoutVersionManager verManager = Mockito.mock(OMLayoutVersionManager.class); + Mockito.when(om.getVersionManager()).thenReturn(verManager); + Table txInfoTable = Mockito.mock(Table.class); + Mockito.when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); + TransactionInfo txInfo = TransactionInfo.valueOf(-1, 100); + Mockito.when(txInfoTable.get(Mockito.anyString())).thenReturn(txInfo); + IndexGenerator indexGenerator = new IndexGenerator(om); + Assertions.assertEquals(indexGenerator.nextIndex(), 101); + + // save index and verify after change leader, for next index + BatchOperation batchOpr = Mockito.mock(BatchOperation.class); + indexGenerator.saveIndex(batchOpr, 102); + indexGenerator.changeLeader(); + Assertions.assertEquals(indexGenerator.nextIndex(), 103); + } + + @Test + public void testUpgradeIndexGenerator() throws Exception { + OzoneManager om = Mockito.mock(OzoneManager.class); + OMMetadataManager metaManager = Mockito.mock(OMMetadataManager.class); + Mockito.when(om.getMetadataManager()).thenReturn(metaManager); + OMLayoutVersionManager verManager = Mockito.mock(OMLayoutVersionManager.class); + Mockito.when(om.getVersionManager()).thenReturn(verManager); + Mockito.when(verManager.needsFinalization()).thenReturn(true); + Table txInfoTable = Mockito.mock(Table.class); + Mockito.when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); + TransactionInfo txInfo = TransactionInfo.valueOf(-1, 110); + Mockito.when(txInfoTable.get(Mockito.anyString())).thenReturn(null).thenReturn(txInfo); + Mockito.when(txInfoTable.getSkipCache(Mockito.anyString())).thenReturn(txInfo); + IndexGenerator indexGenerator = new IndexGenerator(om); + Assertions.assertEquals(indexGenerator.nextIndex(), -1); + + BatchOperation batchOpr = Mockito.mock(BatchOperation.class); + indexGenerator.saveIndex(batchOpr, 114); + indexGenerator.changeLeader(); + Assertions.assertEquals(indexGenerator.nextIndex(), -1); + + // check ExecutionContext behavior + ExecutionContext executionContext = ExecutionContext.of(indexGenerator.nextIndex(), txInfo.getTermIndex()); + Assertions.assertEquals(executionContext.getIndex(), txInfo.getTermIndex().getIndex()); + + + // save index and verify after change leader, for next index + indexGenerator.finalizeIndexGeneratorFeature(); + Assertions.assertEquals(indexGenerator.nextIndex(), 111); + + // save index and verify after change leader, for next index + indexGenerator.saveIndex(batchOpr, 114); + indexGenerator.changeLeader(); + Assertions.assertEquals(indexGenerator.nextIndex(), 115); + + executionContext = ExecutionContext.of(indexGenerator.nextIndex(), txInfo.getTermIndex()); + Assertions.assertEquals(executionContext.getIndex(), 116); + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java index 3fa1e3a1a0af..a4947ef958bd 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java @@ -59,6 +59,7 @@ public class TestOzoneManagerDoubleBufferWithDummyResponse { private OzoneManagerDoubleBuffer doubleBuffer; private final AtomicLong trxId = new AtomicLong(0); private long term = 1L; + private AtomicLong currentCommitIndex = new AtomicLong(); @TempDir private Path folder; @@ -72,6 +73,7 @@ public void setup() throws IOException { doubleBuffer = OzoneManagerDoubleBuffer.newBuilder() .setOmMetadataManager(omMetadataManager) .setMaxUnFlushedTransactionCount(10000) + .setUpdateOmCommitIndex((x, y) -> currentCommitIndex.set(y)) .build() .start(); } @@ -127,6 +129,7 @@ public void testDoubleBufferWithDummyResponse() throws Exception { assertNotNull(transactionInfo); assertEquals(bucketCount, transactionInfo.getTransactionIndex()); assertEquals(term, transactionInfo.getTerm()); + assertEquals(currentCommitIndex.get(), transactionInfo.getTermIndex().getIndex()); } /** From 2eb709debdeb79a3497e60558eb3778cd9d96d0f Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Mon, 17 Feb 2025 15:52:57 +0530 Subject: [PATCH 04/11] fix test failure --- .../hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index c67f235e6e4b..9c2834d6c26b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -366,10 +366,6 @@ private void flushBatch(Queue buffer) throws IOException { batchOperation, TRANSACTION_INFO_KEY, TransactionInfo.valueOf(lastTransaction))); updateOmCommitIndex.accept(batchOperation, index); - //addToBatchTransactionInfoWithTrace(lastTraceId, - // lastTransaction.getIndex(), - // () -> omMetadataManager.getTransactionInfoTable().putWithBatch( - // batchOperation, IndexGenerator.OM_INDEX_KEY, TransactionInfo.valueOf(-1, index))); long startTime = Time.monotonicNow(); flushBatchWithTrace(lastTraceId, buffer.size(), @@ -482,7 +478,7 @@ private void addCleanupEntry(Entry entry, Map> cleanupEpochs) } for (String table : cleanupTables) { cleanupEpochs.computeIfAbsent(table, list -> new ArrayList<>()) - .add(entry.getTermIndex().getIndex()); + .add(entry.getIndex()); } } else { // This is to catch early errors, when a new response class missed to From b2d986534bb47257950fe33941b8fb11dad4d895 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Mon, 17 Feb 2025 16:18:18 +0530 Subject: [PATCH 05/11] test case failure fix --- .../hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java index 797f6aa2c3e3..32dc4f400bda 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java @@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.om.OzoneManagerPrepareState; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; @@ -97,6 +98,8 @@ public void setup() throws Exception { when(ozoneManager.getAuditLogger()).thenReturn(auditLogger); prepareState = new OzoneManagerPrepareState(conf); when(ozoneManager.getPrepareState()).thenReturn(prepareState); + OMLayoutVersionManager versionManager = mock(OMLayoutVersionManager.class); + when(ozoneManager.getVersionManager()).thenReturn(versionManager); when(ozoneManagerRatisServer.getOzoneManager()).thenReturn(ozoneManager); when(ozoneManager.getTransactionInfo()).thenReturn(mock(TransactionInfo.class)); From fb54f578614f817f14dbdd8062c22fc8a3a25164 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Mon, 17 Feb 2025 17:33:56 +0530 Subject: [PATCH 06/11] fix test case --- .../hadoop/ozone/om/request/upgrade/OMPrepareRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java index 45f87ecbdc10..864fcc02b09e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java @@ -65,7 +65,7 @@ public OMPrepareRequest(OMRequest omRequest) { @Override public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { - final long transactionLogIndex = context.getIndex(); + final long transactionLogIndex = context.getTermIndex().getIndex(); LOG.info("OM {} Received prepare request with log {}", ozoneManager.getOMNodeId(), context.getTermIndex()); From bcee077280ab91c4a6ae473d4f24eecf358c1b59 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Wed, 19 Feb 2025 11:32:42 +0530 Subject: [PATCH 07/11] fix test case --- .../hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java index 7c2bd2aa62c7..13307102a444 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java @@ -45,6 +45,7 @@ import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OMNodeDetails; +import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.security.OMCertificateClient; @@ -118,6 +119,8 @@ public void init(@TempDir Path metaDirPath) throws Exception { when(ozoneManager.getConfiguration()).thenReturn(conf); final OmConfig omConfig = conf.getObject(OmConfig.class); when(ozoneManager.getConfig()).thenReturn(omConfig); + OMLayoutVersionManager versionManger = mock(OMLayoutVersionManager.class); + when(ozoneManager.getVersionManager()).thenReturn(versionManger); secConfig = new SecurityConfig(conf); HddsProtos.OzoneManagerDetailsProto omInfo = OzoneManager.getOmDetailsProto(conf, omID); From 4b2916d66b3d88769ee89ee46676e7255464b201 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Wed, 19 Feb 2025 12:56:34 +0530 Subject: [PATCH 08/11] review comment fix --- .../ozone/om/execution/IndexGenerator.java | 28 +++--- .../om/execution/common/BiConsumerX.java | 35 -------- .../om/execution/common/package-info.java | 22 ----- .../om/ratis/OzoneManagerDoubleBuffer.java | 12 ++- .../om/ratis/OzoneManagerStateMachine.java | 2 +- .../upgrade/OmManagedIndexUpgradeAction.java | 26 +++--- .../om/execution/TestIndexGenerator.java | 87 ++++++++++--------- 7 files changed, 77 insertions(+), 135 deletions(-) delete mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java delete mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/package-info.java diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java index 9da9fe469d71..0df96ceaa21a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java @@ -1,20 +1,20 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.ozone.om.execution; import java.io.IOException; @@ -25,7 +25,7 @@ import org.apache.hadoop.ozone.om.OzoneManager; /** - * Context required for execution of a request. + * Manages indexes for request handling and persist. */ public final class IndexGenerator { public static final String OM_INDEX_KEY = "#OMINDEX"; @@ -60,7 +60,6 @@ public void initialize() throws IOException { } public void finalizeIndexGeneratorFeature() throws IOException { - enabled.set(true); long initIndex = 0; TransactionInfo transactionInfo = TransactionInfo.readTransactionInfo(ozoneManager.getMetadataManager()); if (null != transactionInfo) { @@ -68,6 +67,7 @@ public void finalizeIndexGeneratorFeature() throws IOException { } index.set(initIndex); commitIndex.set(initIndex); + enabled.set(true); } public long nextIndex() { @@ -77,7 +77,7 @@ public long nextIndex() { return index.incrementAndGet(); } - public void changeLeader() { + public void onLeaderChange() { index.set(Math.max(commitIndex.get(), index.get())); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java deleted file mode 100644 index 79b02866a884..000000000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/BiConsumerX.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.om.execution.common; - -import java.io.IOException; - -/** - * interface for BiConsumer with IOException being thrown. - */ -@FunctionalInterface -public interface BiConsumerX { - /** - * Performs this operation on the given arguments. - * - * @param t the first input argument. - * @param u the second input argument. - * @throws java.io.IOException if handling have IOException - */ - void accept(T t, U u) throws IOException; -} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/package-info.java deleted file mode 100644 index 9b7457df15d2..000000000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/common/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.om.execution.common; - -/** - * This package contains classes for the common code related to execution. - */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index 9c2834d6c26b..60a7552084bd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -46,7 +45,6 @@ import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.S3SecretManager; import org.apache.hadoop.ozone.om.codec.OMDBDefinition; -import org.apache.hadoop.ozone.om.execution.common.BiConsumerX; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; @@ -56,6 +54,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.ExitUtils; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.function.CheckedBiConsumer; import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +105,7 @@ long getIndex() { public static final class Builder { private OMMetadataManager omMetadataManager; private Consumer updateLastAppliedIndex = termIndex -> { }; - private BiConsumerX updateOmCommitIndex = (m, n) -> { }; + private CheckedBiConsumer updateOmCommitIndex = (m, n) -> { }; private boolean isTracingEnabled = false; private int maxUnFlushedTransactionCount = 0; private FlushNotifier flushNotifier; @@ -125,7 +124,7 @@ Builder setUpdateLastAppliedIndex(Consumer updateLastAppliedIndex) { return this; } - Builder setUpdateOmCommitIndex(BiConsumerX updateOmCommitIndex) { + Builder setUpdateOmCommitIndex(CheckedBiConsumer updateOmCommitIndex) { this.updateOmCommitIndex = updateOmCommitIndex; return this; } @@ -191,7 +190,7 @@ static Semaphore newSemaphore(int permits) { private final OMMetadataManager omMetadataManager; private final Consumer updateLastAppliedIndex; - private final BiConsumerX updateOmCommitIndex; + private final CheckedBiConsumer updateOmCommitIndex; private final S3SecretManager s3SecretManager; @@ -346,8 +345,7 @@ private void flushBatch(Queue buffer) throws IOException { .map(Entry::getTermIndex) .sorted() .collect(Collectors.toList()); - final long index = buffer.stream().max(Comparator.comparing(Entry::getIndex)).orElse(new Entry(null, null, 0)) - .getIndex(); + final long index = buffer.stream().mapToLong(Entry::getIndex).max().orElse(0); final int flushedTransactionsSize = flushedTransactions.size(); final TermIndex lastTransaction = flushedTransactions.get(flushedTransactionsSize - 1); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 9b9bbb1f3df0..31096f607063 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -169,7 +169,7 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, LOG.info("{}: leader changed to {}", groupMemberId, newLeaderId); // if the node is leader (can be ready or not ready, need update index) if (ozoneManager.getOmRatisServer().checkLeaderStatus() != OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER) { - indexGenerator.changeLeader(); + indexGenerator.onLeaderChange(); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java index e647f4826275..cfad1bbfd276 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java @@ -1,20 +1,18 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.ozone.om.upgrade; diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java index 5633ef54f62a..085cd5b998cd 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java @@ -1,13 +1,12 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,8 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.ozone.om.execution; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.db.Table; @@ -24,9 +29,7 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; /** * Tests Index generator changes. @@ -34,61 +37,61 @@ public class TestIndexGenerator { @Test public void testIndexGenerator() throws Exception { - OzoneManager om = Mockito.mock(OzoneManager.class); - OMMetadataManager metaManager = Mockito.mock(OMMetadataManager.class); - Mockito.when(om.getMetadataManager()).thenReturn(metaManager); - OMLayoutVersionManager verManager = Mockito.mock(OMLayoutVersionManager.class); - Mockito.when(om.getVersionManager()).thenReturn(verManager); - Table txInfoTable = Mockito.mock(Table.class); - Mockito.when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); + OzoneManager om = mock(OzoneManager.class); + OMMetadataManager metaManager = mock(OMMetadataManager.class); + when(om.getMetadataManager()).thenReturn(metaManager); + OMLayoutVersionManager verManager = mock(OMLayoutVersionManager.class); + when(om.getVersionManager()).thenReturn(verManager); + Table txInfoTable = mock(Table.class); + when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); TransactionInfo txInfo = TransactionInfo.valueOf(-1, 100); - Mockito.when(txInfoTable.get(Mockito.anyString())).thenReturn(txInfo); + when(txInfoTable.get(anyString())).thenReturn(txInfo); IndexGenerator indexGenerator = new IndexGenerator(om); - Assertions.assertEquals(indexGenerator.nextIndex(), 101); + assertEquals(indexGenerator.nextIndex(), 101); // save index and verify after change leader, for next index - BatchOperation batchOpr = Mockito.mock(BatchOperation.class); + BatchOperation batchOpr = mock(BatchOperation.class); indexGenerator.saveIndex(batchOpr, 102); - indexGenerator.changeLeader(); - Assertions.assertEquals(indexGenerator.nextIndex(), 103); + indexGenerator.onLeaderChange(); + assertEquals(indexGenerator.nextIndex(), 103); } @Test public void testUpgradeIndexGenerator() throws Exception { - OzoneManager om = Mockito.mock(OzoneManager.class); - OMMetadataManager metaManager = Mockito.mock(OMMetadataManager.class); - Mockito.when(om.getMetadataManager()).thenReturn(metaManager); - OMLayoutVersionManager verManager = Mockito.mock(OMLayoutVersionManager.class); - Mockito.when(om.getVersionManager()).thenReturn(verManager); - Mockito.when(verManager.needsFinalization()).thenReturn(true); - Table txInfoTable = Mockito.mock(Table.class); - Mockito.when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); + OzoneManager om = mock(OzoneManager.class); + OMMetadataManager metaManager = mock(OMMetadataManager.class); + when(om.getMetadataManager()).thenReturn(metaManager); + OMLayoutVersionManager verManager = mock(OMLayoutVersionManager.class); + when(om.getVersionManager()).thenReturn(verManager); + when(verManager.needsFinalization()).thenReturn(true); + Table txInfoTable = mock(Table.class); + when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); TransactionInfo txInfo = TransactionInfo.valueOf(-1, 110); - Mockito.when(txInfoTable.get(Mockito.anyString())).thenReturn(null).thenReturn(txInfo); - Mockito.when(txInfoTable.getSkipCache(Mockito.anyString())).thenReturn(txInfo); + when(txInfoTable.get(anyString())).thenReturn(null).thenReturn(txInfo); + when(txInfoTable.getSkipCache(anyString())).thenReturn(txInfo); IndexGenerator indexGenerator = new IndexGenerator(om); - Assertions.assertEquals(indexGenerator.nextIndex(), -1); + assertEquals(indexGenerator.nextIndex(), -1); - BatchOperation batchOpr = Mockito.mock(BatchOperation.class); + BatchOperation batchOpr = mock(BatchOperation.class); indexGenerator.saveIndex(batchOpr, 114); - indexGenerator.changeLeader(); - Assertions.assertEquals(indexGenerator.nextIndex(), -1); + indexGenerator.onLeaderChange(); + assertEquals(indexGenerator.nextIndex(), -1); // check ExecutionContext behavior ExecutionContext executionContext = ExecutionContext.of(indexGenerator.nextIndex(), txInfo.getTermIndex()); - Assertions.assertEquals(executionContext.getIndex(), txInfo.getTermIndex().getIndex()); + assertEquals(executionContext.getIndex(), txInfo.getTermIndex().getIndex()); // save index and verify after change leader, for next index indexGenerator.finalizeIndexGeneratorFeature(); - Assertions.assertEquals(indexGenerator.nextIndex(), 111); + assertEquals(indexGenerator.nextIndex(), 111); // save index and verify after change leader, for next index indexGenerator.saveIndex(batchOpr, 114); - indexGenerator.changeLeader(); - Assertions.assertEquals(indexGenerator.nextIndex(), 115); + indexGenerator.onLeaderChange(); + assertEquals(indexGenerator.nextIndex(), 115); executionContext = ExecutionContext.of(indexGenerator.nextIndex(), txInfo.getTermIndex()); - Assertions.assertEquals(executionContext.getIndex(), 116); + assertEquals(executionContext.getIndex(), 116); } } From a101418d68db66e0ffdd96923a3d11d4ab6b282e Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Wed, 19 Feb 2025 18:31:53 +0530 Subject: [PATCH 09/11] fix checkstyle --- .../hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java index cfad1bbfd276..2611a0cc1889 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java @@ -17,11 +17,11 @@ package org.apache.hadoop.ozone.om.upgrade; -import org.apache.hadoop.ozone.om.OzoneManager; - import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MANAGED_INDEX; import static org.apache.hadoop.ozone.upgrade.LayoutFeature.UpgradeActionType.ON_FINALIZE; +import org.apache.hadoop.ozone.om.OzoneManager; + /** * initialize om managed index generator to provide index for further request handling. */ From 9a65251bca1f0ece583895dd7ff2ce490d9339e7 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Tue, 25 Feb 2025 14:54:11 +0530 Subject: [PATCH 10/11] fix review comments --- .../ozone/om/execution/IndexGenerator.java | 28 ++++++++++++++++--- .../flowcontrol/ExecutionContext.java | 6 ++-- .../om/ratis/OzoneManagerStateMachine.java | 1 - .../om/execution/TestIndexGenerator.java | 9 +++++- 4 files changed, 36 insertions(+), 8 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java index 0df96ceaa21a..68cf2d221f65 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.om.execution; +import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MANAGED_INDEX; + import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -41,6 +43,11 @@ public IndexGenerator(OzoneManager ozoneManager) throws IOException { } public void initialize() throws IOException { + if (!ozoneManager.getVersionManager().isAllowed(MANAGED_INDEX)) { + enabled.set(false); + return; + } + // default first time starts with "0" long initIndex = 0; // retrieve last saved index @@ -54,20 +61,27 @@ public void initialize() throws IOException { } index.set(initIndex); commitIndex.set(initIndex); - if (ozoneManager.getVersionManager().needsFinalization()) { - enabled.set(false); - } } public void finalizeIndexGeneratorFeature() throws IOException { + if (enabled.get()) { + return; + } + + // reinit the feature on finalization long initIndex = 0; TransactionInfo transactionInfo = TransactionInfo.readTransactionInfo(ozoneManager.getMetadataManager()); if (null != transactionInfo) { initIndex = transactionInfo.getTransactionIndex(); } index.set(initIndex); - commitIndex.set(initIndex); enabled.set(true); + + try (BatchOperation batchOperation = ozoneManager.getMetadataManager().getStore() + .initBatchOperation()) { + saveIndex(batchOperation, initIndex); + ozoneManager.getMetadataManager().getStore().commitBatchOperation(batchOperation); + } } public long nextIndex() { @@ -77,6 +91,12 @@ public long nextIndex() { return index.incrementAndGet(); } + /** + * Follower on every transaction update the commit index via saveIndex(). + * When the follower becomes leader onLeaderChange(), it updates index with Max (commit index, current index). + * Max is done for purpose where follower can have higher index (not yet sync to other nodes) and being discarded + * for continuation on that node. + */ public void onLeaderChange() { index.set(Math.max(commitIndex.get(), index.get())); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java index 38d53c811604..7585bab5b09a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java @@ -28,11 +28,13 @@ public final class ExecutionContext { private ExecutionContext(long index, TermIndex termIndex) { if (null == termIndex) { + // termIndex will be null for pre-ratis execution case which is before ratis transaction termIndex = TermIndex.valueOf(-1, index); } if (index == -1) { - // during upgrade case before finalization, make use of termIndex's index - // till finalization, index generator will return -1 + // during upgrade case before finalization, index will be -1 as returned by indexGenerator + // for this, it will make use of termIndex's index for execution similar to previous behavior + // and after finalization of feature, index generator will return initialized value > 0 index = termIndex.getIndex(); } this.index = index; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index 31096f607063..ce55d4e05141 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -140,7 +140,6 @@ public void initialize(RaftServer server, RaftGroupId id, storage.init(raftStorage); LOG.info("{}: initialize {} with {}", getId(), id, getLastAppliedTermIndex()); }); - indexGenerator.initialize(); } @Override diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java index 085cd5b998cd..40bdc121ea30 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.om.execution; +import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MANAGED_INDEX; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; @@ -24,12 +25,14 @@ import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; /** * Tests Index generator changes. @@ -41,6 +44,7 @@ public void testIndexGenerator() throws Exception { OMMetadataManager metaManager = mock(OMMetadataManager.class); when(om.getMetadataManager()).thenReturn(metaManager); OMLayoutVersionManager verManager = mock(OMLayoutVersionManager.class); + when(verManager.isAllowed(Mockito.eq(MANAGED_INDEX))).thenReturn(true); when(om.getVersionManager()).thenReturn(verManager); Table txInfoTable = mock(Table.class); when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); @@ -66,13 +70,16 @@ public void testUpgradeIndexGenerator() throws Exception { when(verManager.needsFinalization()).thenReturn(true); Table txInfoTable = mock(Table.class); when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); + DBStore dbstore = mock(DBStore.class); + when(metaManager.getStore()).thenReturn(dbstore); + BatchOperation batchOpr = mock(BatchOperation.class); + when(dbstore.initBatchOperation()).thenReturn(batchOpr); TransactionInfo txInfo = TransactionInfo.valueOf(-1, 110); when(txInfoTable.get(anyString())).thenReturn(null).thenReturn(txInfo); when(txInfoTable.getSkipCache(anyString())).thenReturn(txInfo); IndexGenerator indexGenerator = new IndexGenerator(om); assertEquals(indexGenerator.nextIndex(), -1); - BatchOperation batchOpr = mock(BatchOperation.class); indexGenerator.saveIndex(batchOpr, 114); indexGenerator.onLeaderChange(); assertEquals(indexGenerator.nextIndex(), -1); From 8e6e8e9ef7b044e16c987b4b4cda84d50cb1a224 Mon Sep 17 00:00:00 2001 From: Sumit Agrawal Date: Thu, 20 Mar 2025 18:35:55 +0530 Subject: [PATCH 11/11] fix index generation at leader --- .../src/main/proto/OmClientProtocol.proto | 5 +++ ...{IndexGenerator.java => IndexManager.java} | 6 +-- .../ozone/om/execution/OMExecutionFlow.java | 30 ++++++++++++- .../flowcontrol/ExecutionContext.java | 6 --- .../om/ratis/OzoneManagerStateMachine.java | 33 +++++++++++---- .../ratis/utils/OzoneManagerRatisUtils.java | 2 +- .../upgrade/OmManagedIndexUpgradeAction.java | 6 +-- ...exGenerator.java => TestIndexManager.java} | 42 +++++++++---------- 8 files changed, 85 insertions(+), 45 deletions(-) rename hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/{IndexGenerator.java => IndexManager.java} (95%) rename hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/{TestIndexGenerator.java => TestIndexManager.java} (78%) diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index df97028a0f31..2a835c7c8f69 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -303,6 +303,7 @@ message OMRequest { optional GetObjectTaggingRequest getObjectTaggingRequest = 140; optional PutObjectTaggingRequest putObjectTaggingRequest = 141; optional DeleteObjectTaggingRequest deleteObjectTaggingRequest = 142; + optional ExecutionControlRequest executionControlRequest = 143; } message OMResponse { @@ -2300,6 +2301,10 @@ message DeleteObjectTaggingRequest { message DeleteObjectTaggingResponse { } +message ExecutionControlRequest { + required uint64 index = 1; +} + /** The OM service that takes care of Ozone namespace. */ diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexManager.java similarity index 95% rename from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java rename to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexManager.java index 68cf2d221f65..b74fe3285a11 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexGenerator.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/IndexManager.java @@ -29,7 +29,7 @@ /** * Manages indexes for request handling and persist. */ -public final class IndexGenerator { +public final class IndexManager { public static final String OM_INDEX_KEY = "#OMINDEX"; private final AtomicLong index = new AtomicLong(); @@ -37,7 +37,7 @@ public final class IndexGenerator { private final OzoneManager ozoneManager; private final AtomicBoolean enabled = new AtomicBoolean(true); - public IndexGenerator(OzoneManager ozoneManager) throws IOException { + public IndexManager(OzoneManager ozoneManager) throws IOException { this.ozoneManager = ozoneManager; initialize(); } @@ -63,7 +63,7 @@ public void initialize() throws IOException { commitIndex.set(initIndex); } - public void finalizeIndexGeneratorFeature() throws IOException { + public void finalizeFeature() throws IOException { if (enabled.get()) { return; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java index 4ce714ab3dc3..4a5982b49627 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/OMExecutionFlow.java @@ -21,13 +21,16 @@ import com.google.protobuf.ServiceException; import java.io.IOException; +import java.util.function.Supplier; import org.apache.hadoop.ozone.om.OMPerformanceMetrics; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OMAuditLogger; import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.om.request.OMClientRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.ratis.protocol.ClientId; /** * entry for execution flow for write request. @@ -36,10 +39,25 @@ public class OMExecutionFlow { private final OzoneManager ozoneManager; private final OMPerformanceMetrics perfMetrics; + private final Supplier indexGenerator; - public OMExecutionFlow(OzoneManager om) { + public OMExecutionFlow(OzoneManager om) throws IOException { this.ozoneManager = om; this.perfMetrics = ozoneManager.getPerfMetrics(); + indexGenerator = om.getOmRatisServer().getOmStateMachine().getIndexManager()::nextIndex; + } + + /** + * Internal request handling with defined clientId and callId. + * + * @param omRequest the request + * @param clientId the clientId + * @param callId the callId + * @return OMResponse the response of execution + * @throws ServiceException the exception on execution + */ + public OMResponse submitInternal(OMRequest omRequest, ClientId clientId, long callId) throws ServiceException { + return ozoneManager.getOmRatisServer().submitRequest(updateControlRequest(omRequest), clientId, callId); } /** @@ -57,7 +75,7 @@ public OMResponse submit(OMRequest omRequest) throws ServiceException { private OMResponse submitExecutionToRatis(OMRequest request) throws ServiceException { // 1. create client request and preExecute OMClientRequest omClientRequest = null; - final OMRequest requestToSubmit; + OMRequest requestToSubmit; try { omClientRequest = OzoneManagerRatisUtils.createClientRequest(request, ozoneManager); assert (omClientRequest != null); @@ -73,10 +91,18 @@ private OMResponse submitExecutionToRatis(OMRequest request) throws ServiceExcep } // 2. submit request to ratis + requestToSubmit = updateControlRequest(requestToSubmit); OMResponse response = ozoneManager.getOmRatisServer().submitRequest(requestToSubmit); if (!response.getSuccess()) { omClientRequest.handleRequestFailure(ozoneManager); } return response; } + + private OMRequest updateControlRequest(OMRequest requestToSubmit) { + OzoneManagerProtocolProtos.ExecutionControlRequest controlRequest = + OzoneManagerProtocolProtos.ExecutionControlRequest.newBuilder().setIndex(indexGenerator.get()).build(); + requestToSubmit = requestToSubmit.toBuilder().setExecutionControlRequest(controlRequest).build(); + return requestToSubmit; + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java index 7585bab5b09a..fcd13386a0ac 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/execution/flowcontrol/ExecutionContext.java @@ -31,12 +31,6 @@ private ExecutionContext(long index, TermIndex termIndex) { // termIndex will be null for pre-ratis execution case which is before ratis transaction termIndex = TermIndex.valueOf(-1, index); } - if (index == -1) { - // during upgrade case before finalization, index will be -1 as returned by indexGenerator - // for this, it will make use of termIndex's index for execution similar to previous behavior - // and after finalization of feature, index generator will return initialized value > 0 - index = termIndex.getIndex(); - } this.index = index; this.termIndex = termIndex; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index ce55d4e05141..118d8092473f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -38,7 +38,7 @@ import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.OzoneManagerPrepareState; import org.apache.hadoop.ozone.om.exceptions.OMException; -import org.apache.hadoop.ozone.om.execution.IndexGenerator; +import org.apache.hadoop.ozone.om.execution.IndexManager; import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.ozone.om.helpers.OMRatisHelper; import org.apache.hadoop.ozone.om.lock.OMLockDetails; @@ -101,7 +101,7 @@ public class OzoneManagerStateMachine extends BaseStateMachine { private volatile TermIndex lastNotifiedTermIndex = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX); /** The last index skipped by {@link #notifyTermIndexUpdated(long, long)}. */ private volatile long lastSkippedIndex = RaftLog.INVALID_LOG_INDEX; - private final IndexGenerator indexGenerator; + private final IndexManager indexManager; private final NettyMetrics nettyMetrics; @@ -109,7 +109,7 @@ public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer, boolean isTracingEnabled) throws IOException { this.isTracingEnabled = isTracingEnabled; this.ozoneManager = ratisServer.getOzoneManager(); - this.indexGenerator = new IndexGenerator(ozoneManager); + this.indexManager = new IndexManager(ozoneManager); loadSnapshotInfoFromDB(); this.threadPrefix = ozoneManager.getThreadNamePrefix(); @@ -150,7 +150,7 @@ public synchronized void reinitialize() throws IOException { unpause(lastApplied.getIndex(), lastApplied.getTerm()); LOG.info("{}: reinitialize {} with {}", getId(), getGroupId(), lastApplied); } - indexGenerator.initialize(); + indexManager.initialize(); } @Override @@ -168,7 +168,7 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, LOG.info("{}: leader changed to {}", groupMemberId, newLeaderId); // if the node is leader (can be ready or not ready, need update index) if (ozoneManager.getOmRatisServer().checkLeaderStatus() != OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER) { - indexGenerator.onLeaderChange(); + indexManager.onLeaderChange(); } } @@ -467,7 +467,7 @@ public OzoneManagerDoubleBuffer buildDoubleBufferForRatis() { return OzoneManagerDoubleBuffer.newBuilder() .setOmMetadataManager(ozoneManager.getMetadataManager()) .setUpdateLastAppliedIndex(this::updateLastAppliedTermIndex) - .setUpdateOmCommitIndex(indexGenerator::saveIndex) + .setUpdateOmCommitIndex(indexManager::saveIndex) .setMaxUnFlushedTransactionCount(maxUnFlushedTransactionCount) .setThreadPrefix(threadPrefix) .setS3SecretManager(ozoneManager.getS3SecretManager()) @@ -562,7 +562,7 @@ public void close() { * @return response from OM */ private OMResponse runCommand(OMRequest request, TermIndex termIndex) { - ExecutionContext context = ExecutionContext.of(indexGenerator.nextIndex(), termIndex); + ExecutionContext context = ExecutionContext.of(getIndex(request, termIndex), termIndex); try { final OMClientResponse omClientResponse = handler.handleWriteRequest( request, context, ozoneManagerDoubleBuffer); @@ -585,6 +585,17 @@ private OMResponse runCommand(OMRequest request, TermIndex termIndex) { return null; } + private static long getIndex(OMRequest request, TermIndex termIndex) { + long currIndex = termIndex.getIndex(); + if (request.hasExecutionControlRequest()) { + if (request.getExecutionControlRequest().getIndex() != -1) { + // -1 case can happen before upgrade finalize feature for index manager + currIndex = request.getExecutionControlRequest().getIndex(); + } + } + return currIndex; + } + private OMResponse createErrorResponse( OMRequest omRequest, IOException exception, ExecutionContext context) { OMResponse.Builder omResponseBuilder = OMResponse.newBuilder() @@ -667,7 +678,11 @@ public OzoneManagerDoubleBuffer getOzoneManagerDoubleBuffer() { return ozoneManagerDoubleBuffer; } - public void finalizeIndexGenerator() throws IOException { - indexGenerator.finalizeIndexGeneratorFeature(); + public void finalizeIndexFeature() throws IOException { + indexManager.finalizeFeature(); + } + + public IndexManager getIndexManager() { + return indexManager; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java index 5548be7bd8ba..c8e81760a9bf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java @@ -515,7 +515,7 @@ public static GrpcTlsConfig createServerTlsConfig(SecurityConfig conf, public static OzoneManagerProtocolProtos.OMResponse submitRequest( OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException { - return om.getOmRatisServer().submitRequest(omRequest, clientId, callId); + return om.getOmExecutionFlow().submitInternal(omRequest, clientId, callId); } public static OzoneManagerProtocolProtos.OMResponse createErrorResponse( diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java index 2611a0cc1889..dd6f1767bd51 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OmManagedIndexUpgradeAction.java @@ -23,7 +23,7 @@ import org.apache.hadoop.ozone.om.OzoneManager; /** - * initialize om managed index generator to provide index for further request handling. + * initialize om managed index manager to provide index for further request handling. */ @UpgradeActionOm(type = ON_FINALIZE, feature = MANAGED_INDEX) public class OmManagedIndexUpgradeAction implements OmUpgradeAction { @@ -31,8 +31,8 @@ public class OmManagedIndexUpgradeAction implements OmUpgradeAction { public void execute(OzoneManager arg) throws Exception { // Prepare ensures the db and ratis at reached to a checkpoint where all changes are flushed // And no further operation is allowed - // At this point, IndexGenerator finialize will re-init index from ratis index and this will be starting point + // At this point, IndexManager finialize will re-init index from ratis index and this will be starting point // for index generation for further request as used by object and other operation - arg.getOmRatisServer().getOmStateMachine().finalizeIndexGenerator(); + arg.getOmRatisServer().getOmStateMachine().finalizeIndexFeature(); } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexManager.java similarity index 78% rename from hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java rename to hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexManager.java index 40bdc121ea30..974ee07ec051 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexGenerator.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/execution/TestIndexManager.java @@ -35,11 +35,11 @@ import org.mockito.Mockito; /** - * Tests Index generator changes. + * Tests Index manager changes. */ -public class TestIndexGenerator { +public class TestIndexManager { @Test - public void testIndexGenerator() throws Exception { + public void testIndexManager() throws Exception { OzoneManager om = mock(OzoneManager.class); OMMetadataManager metaManager = mock(OMMetadataManager.class); when(om.getMetadataManager()).thenReturn(metaManager); @@ -50,18 +50,18 @@ public void testIndexGenerator() throws Exception { when(metaManager.getTransactionInfoTable()).thenReturn(txInfoTable); TransactionInfo txInfo = TransactionInfo.valueOf(-1, 100); when(txInfoTable.get(anyString())).thenReturn(txInfo); - IndexGenerator indexGenerator = new IndexGenerator(om); - assertEquals(indexGenerator.nextIndex(), 101); + IndexManager indexManager = new IndexManager(om); + assertEquals(indexManager.nextIndex(), 101); // save index and verify after change leader, for next index BatchOperation batchOpr = mock(BatchOperation.class); - indexGenerator.saveIndex(batchOpr, 102); - indexGenerator.onLeaderChange(); - assertEquals(indexGenerator.nextIndex(), 103); + indexManager.saveIndex(batchOpr, 102); + indexManager.onLeaderChange(); + assertEquals(indexManager.nextIndex(), 103); } @Test - public void testUpgradeIndexGenerator() throws Exception { + public void testUpgradeIndexManager() throws Exception { OzoneManager om = mock(OzoneManager.class); OMMetadataManager metaManager = mock(OMMetadataManager.class); when(om.getMetadataManager()).thenReturn(metaManager); @@ -77,28 +77,28 @@ public void testUpgradeIndexGenerator() throws Exception { TransactionInfo txInfo = TransactionInfo.valueOf(-1, 110); when(txInfoTable.get(anyString())).thenReturn(null).thenReturn(txInfo); when(txInfoTable.getSkipCache(anyString())).thenReturn(txInfo); - IndexGenerator indexGenerator = new IndexGenerator(om); - assertEquals(indexGenerator.nextIndex(), -1); + IndexManager indexManager = new IndexManager(om); + assertEquals(indexManager.nextIndex(), -1); - indexGenerator.saveIndex(batchOpr, 114); - indexGenerator.onLeaderChange(); - assertEquals(indexGenerator.nextIndex(), -1); + indexManager.saveIndex(batchOpr, 114); + indexManager.onLeaderChange(); + assertEquals(indexManager.nextIndex(), -1); // check ExecutionContext behavior - ExecutionContext executionContext = ExecutionContext.of(indexGenerator.nextIndex(), txInfo.getTermIndex()); + ExecutionContext executionContext = ExecutionContext.of(indexManager.nextIndex(), txInfo.getTermIndex()); assertEquals(executionContext.getIndex(), txInfo.getTermIndex().getIndex()); // save index and verify after change leader, for next index - indexGenerator.finalizeIndexGeneratorFeature(); - assertEquals(indexGenerator.nextIndex(), 111); + indexManager.finalizeFeature(); + assertEquals(indexManager.nextIndex(), 111); // save index and verify after change leader, for next index - indexGenerator.saveIndex(batchOpr, 114); - indexGenerator.onLeaderChange(); - assertEquals(indexGenerator.nextIndex(), 115); + indexManager.saveIndex(batchOpr, 114); + indexManager.onLeaderChange(); + assertEquals(indexManager.nextIndex(), 115); - executionContext = ExecutionContext.of(indexGenerator.nextIndex(), txInfo.getTermIndex()); + executionContext = ExecutionContext.of(indexManager.nextIndex(), txInfo.getTermIndex()); assertEquals(executionContext.getIndex(), 116); } }