Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException
table.deleteWithBatch(batch, key);
}

@Override
public void deleteRangeWithBatch(BatchOperation batch, KEY beginKey, KEY endKey) throws CodecException {
table.deleteRangeWithBatch(batch, beginKey, endKey);
}

@Override
public final KeyValueIterator<KEY, VALUE> iterator(KEY prefix, KeyValueIterator.Type type) {
throw new UnsupportedOperationException("Iterating tables directly is not" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ public interface DBStoreHAManager {
default Table<String, TransactionInfo> getTransactionInfoTable() {
return null;
}

default Table<Long, FlushedTransactionInfo> getFlushedTransactionsTable() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils;

import java.util.Objects;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto2Codec;
import org.apache.ratis.server.protocol.TermIndex;

/**
* Represents information about a flushed transaction, including its term and transaction index.
* This class is a lightweight container used to track specific transaction metadata and provides
* methods for serialization and deserialization using a codec.
*/
public class FlushedTransactionInfo {

private static final Codec<FlushedTransactionInfo> CODEC = new DelegatedCodec<>(
Proto2Codec.get(HddsProtos.FlushedTransactionInfo.getDefaultInstance()),
FlushedTransactionInfo::getFromProtobuf,
FlushedTransactionInfo::getProtobuf,
FlushedTransactionInfo.class);

private final long term;
private final long transactionIndex;

FlushedTransactionInfo(TermIndex termIndex) {
this.transactionIndex = termIndex.getIndex();
this.term = termIndex.getTerm();
}

public static FlushedTransactionInfo valueOf(long currentTerm, long transactionIndex) {
return valueOf(TermIndex.valueOf(currentTerm, transactionIndex));
}

public static FlushedTransactionInfo valueOf(TermIndex termIndex) {
return new FlushedTransactionInfo(termIndex);
}

public static Codec<FlushedTransactionInfo> getCodec() {
return CODEC;
}

public long getTerm() {
return term;
}

public long getTransactionIndex() {
return transactionIndex;
}

public static FlushedTransactionInfo getFromProtobuf(HddsProtos.FlushedTransactionInfo transactionInfo) {
return new FlushedTransactionInfo(TermIndex.valueOf(transactionInfo.getTermIndex(),
transactionInfo.getTransactionId()));
}

private HddsProtos.FlushedTransactionInfo getProtobuf() {
return HddsProtos.FlushedTransactionInfo.newBuilder().setTermIndex(this.getTerm())
.setTransactionId(this.getTransactionIndex()).build();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FlushedTransactionInfo that = (FlushedTransactionInfo) o;
return this.getTerm() == that.getTerm() && this.getTransactionIndex() == that.getTransactionIndex();
}

@Override
public int hashCode() {
return Objects.hash(getTerm(), getTransactionIndex());
}

@Override
public String toString() {
return "FlushedTransactionInfo{" +
"term=" + term +
", transactionIndex=" + transactionIndex +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* <p>
* This class is immutable.
*/
public final class TransactionInfo implements Comparable<TransactionInfo> {
public class TransactionInfo implements Comparable<TransactionInfo> {
private static final Codec<TransactionInfo> CODEC = new DelegatedCodec<>(
StringCodec.get(),
TransactionInfo::valueOf,
Expand Down Expand Up @@ -99,7 +99,7 @@ public static TermIndex getTermIndex(long transactionIndex) {
return TermIndex.valueOf(NON_RATIS_TERM, transactionIndex);
}

private TransactionInfo(TermIndex termIndex) {
TransactionInfo(TermIndex termIndex) {
this.transactionInfoString = termIndex.getTerm() + TRANSACTION_INFO_SPLIT_KEY + termIndex.getIndex();
this.snapshotInfo = new SnapshotInfo() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.apache.hadoop.hdds.utils.CollectionUtils;
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
Expand Down Expand Up @@ -80,6 +81,11 @@ public TypedTable<KEY, VALUE> getTable(DBStore db, CacheType cacheType)
return db.getTable(tableName, keyCodec, valueCodec, cacheType);
}

public TypedTable<KEY, VALUE> getTable(DBStore db, CacheType cacheType, Function<KEY, Boolean> keyValidator)
throws RocksDatabaseException, CodecException {
return db.getTable(tableName, keyCodec, valueCodec, cacheType, keyValidator);
}

public String getName() {
return tableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.hdds.utils.db.cache.TableCache.CacheType;
Expand Down Expand Up @@ -64,6 +65,21 @@ <KEY, VALUE> TypedTable<KEY, VALUE> getTable(
String name, Codec<KEY> keyCodec, Codec<VALUE> valueCodec, TableCache.CacheType cacheType)
throws RocksDatabaseException, CodecException;

/**
* Gets table store with implict key/value conversion.
*
* @param name - table name
* @param keyCodec - key codec
* @param valueCodec - value codec
* @param cacheType - cache type
* @param keyValidatorFunction - function to validate key before put/delete
* @return - Table Store
*/
<KEY, VALUE> TypedTable<KEY, VALUE> getTable(
String name, Codec<KEY> keyCodec, Codec<VALUE> valueCodec, TableCache.CacheType cacheType,
Function<KEY, Boolean> keyValidatorFunction) throws RocksDatabaseException, CodecException;


/**
* Lists the Known list of Tables in a DB.
*
Expand Down
Loading