diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 9528ec9974f..699ee319a10 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -520,6 +520,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t */ ConditionalTabletMutator requireFiles(Set files); + /** + * Require that a tablet have less than or equals the specified number of files. + */ + ConditionalTabletMutator requireLessOrEqualsFiles(long limit); + /** *

* Ample provides the following features on top of the conditional writer to help automate diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index da8e050504a..8cee60d96d5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -64,6 +64,7 @@ import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.metadata.iterators.ColumnFamilySizeLimitIterator; import org.apache.accumulo.server.metadata.iterators.PresentIterator; import org.apache.accumulo.server.metadata.iterators.SetEncodingIterator; import org.apache.accumulo.server.metadata.iterators.TabletExistsIterator; @@ -344,6 +345,14 @@ public ConditionalTabletMutator requireFiles(Set files) { return this; } + @Override + public ConditionalTabletMutator requireLessOrEqualsFiles(long limit) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + Condition c = ColumnFamilySizeLimitIterator.createCondition(DataFileColumnFamily.NAME, limit); + mutation.addCondition(c); + return this; + } + @Override public void submit(Ample.RejectionHandler rejectionCheck) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/ColumnFamilySizeLimitIterator.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/ColumnFamilySizeLimitIterator.java new file mode 100644 index 00000000000..b19fde7def2 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/ColumnFamilySizeLimitIterator.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata.iterators; + +import static org.apache.accumulo.server.metadata.iterators.SetEncodingIterator.getTabletRow; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +/** + * Iterator that checks if a column family size is less than or equal a limit as part of a + * conditional mutation. + */ +public class ColumnFamilySizeLimitIterator extends WrappingIterator { + + private static final String LIMIT_OPT = "limit"; + private static final Text EMPTY = new Text(); + + private Long limit; + + private Key startKey = null; + private Value topValue = null; + + @Override + public void init(SortedKeyValueIterator source, Map options, + IteratorEnvironment env) throws IOException { + super.init(source, options, env); + limit = Long.parseLong(options.get(LIMIT_OPT)); + Preconditions.checkState(limit >= 0); + } + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) + throws IOException { + Text tabletRow = getTabletRow(range); + Text family = range.getStartKey().getColumnFamily(); + + Preconditions.checkArgument( + family.getLength() > 0 && range.getStartKey().getColumnQualifier().getLength() == 0); + + startKey = new Key(tabletRow, family); + Key endKey = startKey.followingKey(PartialKey.ROW_COLFAM); + + Range r = new Range(startKey, true, endKey, false); + + var source = getSource(); + source.seek(r, Set.of(startKey.getColumnFamilyData()), true); + + long count = 0; + while (source.hasTop()) { + source.next(); + count++; + } + + if (count <= limit) { + topValue = new Value("1"); + } else { + topValue = null; + } + } + + @Override + public boolean hasTop() { + if (startKey == null) { + throw new IllegalStateException("never been seeked"); + } + return topValue != null; + } + + @Override + public void next() throws IOException { + if (startKey == null) { + throw new IllegalStateException("never been seeked"); + } + topValue = null; + } + + @Override + public Key getTopKey() { + if (startKey == null) { + throw new IllegalStateException("never been seeked"); + } + if (topValue == null) { + throw new NoSuchElementException(); + } + + return startKey; + } + + @Override + public Value getTopValue() { + if (startKey == null) { + throw new IllegalStateException("never been seeked"); + } + if (topValue == null) { + throw new NoSuchElementException(); + } + return topValue; + } + + /** + * Create a condition that checks if the specified column family's size is less than or equal to + * the given limit. + */ + public static Condition createCondition(Text family, long limit) { + IteratorSetting is = new IteratorSetting(ConditionalTabletMutatorImpl.INITIAL_ITERATOR_PRIO, + ColumnFamilySizeLimitIterator.class); + is.addOption(LIMIT_OPT, limit + ""); + return new Condition(family, EMPTY).setValue("1").setIterators(is); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java index b0456afd32a..ed7b52e48da 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java @@ -93,11 +93,11 @@ public void seek(Range range, Collection columnFamilies, boolean i family.getLength() > 0 && range.getStartKey().getColumnQualifier().getLength() == 0); startKey = new Key(tabletRow, family); - Key endKey = new Key(tabletRow, family).followingKey(PartialKey.ROW_COLFAM); + Key endKey = startKey.followingKey(PartialKey.ROW_COLFAM); Range r = new Range(startKey, true, endKey, false); - source.seek(r, Set.of(), false); + source.seek(r, Set.of(startKey.getColumnFamilyData()), true); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 62eec1ebe10..7b0f494c19a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -178,9 +178,6 @@ void load(List tablets, Files files) { if (setTime) { rsc.add(TIME); } - if (pauseLimit > 0) { - rsc.add(FILES); - } ColumnType[] requireSameCols = rsc.toArray(new ColumnType[0]); @@ -237,6 +234,10 @@ void load(List tablets, Files files) { var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent()) .requireAbsentOperation().requireSame(tablet, LOADED, requireSameCols); + if (pauseLimit > 0) { + tabletMutator.requireLessOrEqualsFiles(pauseLimit); + } + filesToLoad.forEach((f, v) -> { tabletMutator.putBulkFile(f, fateId); tabletMutator.putFile(f, v); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index ca0baf7435c..c1bc46921aa 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -1788,7 +1788,7 @@ public void testRequiresFiles() { assertEquals(time2, context.getAmple().readTablet(e1).getTime()); // Test mutation is rejected when a file is given that the tablet does not have - var time3 = MetadataTime.parse("L60"); + var time3 = MetadataTime.parse("L70"); try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { ctmi.mutateTablet(e1).requireAbsentOperation().requireFiles(Set.of(stf1, stf4)).putTime(time3) .submit(tm -> false); @@ -1797,4 +1797,81 @@ public void testRequiresFiles() { // Should be previous time still as the mutation was rejected assertEquals(time2, context.getAmple().readTablet(e1).getTime()); } + + @Test + public void testFilesLimit() { + var context = cluster.getServerContext(); + + var stf1 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf")); + var stf2 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf")); + var stf3 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000072.rf")); + var stf4 = StoredTabletFile + .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/default_tablet/C0000073.rf")); + var dfv = new DataFileValue(100, 100); + + // Add 3 of the files, skip the 4th file + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf1, dfv).putFile(stf2, dfv) + .putFile(stf3, dfv).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); + + // Test mutation is accepted when # files in tablet equals limit + var time1 = MetadataTime.parse("L50"); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(3).putTime(time1) + .submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(time1, context.getAmple().readTablet(e1).getTime()); + + // Test mutation is accepted when # files in tablet is less than limit + var time2 = MetadataTime.parse("L60"); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(4).putTime(time2) + .submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(time2, context.getAmple().readTablet(e1).getTime()); + + // Test mutation is rejected when # files in tablet is greater than limit + var time3 = MetadataTime.parse("L70"); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(2).putTime(time3) + .submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + // Should be previous time still as the mutation was rejected + assertEquals(time2, context.getAmple().readTablet(e1).getTime()); + + // add fourth file + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().putFile(stf4, dfv).submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(Set.of(stf1, stf2, stf3, stf4), context.getAmple().readTablet(e1).getFiles()); + + // Test mutation is rejected when # files in tablet is greater than limit + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(3).putTime(time3) + .submit(tm -> false); + assertEquals(Status.REJECTED, ctmi.process().get(e1).getStatus()); + } + // Should be previous time still as the mutation was rejected + assertEquals(time2, context.getAmple().readTablet(e1).getTime()); + + // Test mutation is accepted when # files in tablet equals limit + var time4 = MetadataTime.parse("L80"); + try (var ctmi = new ConditionalTabletsMutatorImpl(context)) { + ctmi.mutateTablet(e1).requireAbsentOperation().requireLessOrEqualsFiles(4).putTime(time4) + .submit(tm -> false); + assertEquals(Status.ACCEPTED, ctmi.process().get(e1).getStatus()); + } + assertEquals(time4, context.getAmple().readTablet(e1).getTime()); + + } }