Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hbase.mapreduce;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
Expand All @@ -33,6 +35,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -65,22 +68,41 @@ public class RowCounter extends AbstractHBaseTool {
private final static String OPT_END_TIME = "endtime";
private final static String OPT_RANGE = "range";
private final static String OPT_EXPECTED_COUNT = "expectedCount";
private final static String OPT_COUNT_DELETE_MARKERS = "countDeleteMarkers";

private String tableName;
private List<MultiRowRangeFilter.RowRange> rowRangeList;
private long startTime;
private long endTime;
private long expectedCount;
private boolean countDeleteMarkers;
private List<String> columns = new ArrayList<>();

private Job job;

/**
* Mapper that runs the count.
*/
static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result> {

/** Counter enumeration to count the actual rows. */
/** Counter enumeration to count the actual rows, cells and delete markers. */
public static enum Counters {
ROWS
ROWS,
DELETE,
DELETE_COLUMN,
DELETE_FAMILY,
DELETE_FAMILY_VERSION,
ROWS_WITH_DELETE_MARKER
}

private boolean countDeleteMarkers;

@Override
protected void
setup(Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Result>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
countDeleteMarkers = conf.getBoolean(OPT_COUNT_DELETE_MARKERS, false);
}

/**
Expand All @@ -95,6 +117,37 @@ public static enum Counters {
public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
// Count every row containing data, whether it's in qualifiers or values
context.getCounter(Counters.ROWS).increment(1);

if (countDeleteMarkers) {
boolean rowContainsDeleteMarker = false;
for (Cell cell : values.rawCells()) {
Cell.Type type = cell.getType();
switch (type) {
case Delete:
rowContainsDeleteMarker = true;
context.getCounter(Counters.DELETE).increment(1);
break;
case DeleteColumn:
rowContainsDeleteMarker = true;
context.getCounter(Counters.DELETE_COLUMN).increment(1);
break;
case DeleteFamily:
rowContainsDeleteMarker = true;
context.getCounter(Counters.DELETE_FAMILY).increment(1);
break;
case DeleteFamilyVersion:
rowContainsDeleteMarker = true;
context.getCounter(Counters.DELETE_FAMILY_VERSION).increment(1);
break;
default:
break;
}
}

if (rowContainsDeleteMarker) {
context.getCounter(Counters.ROWS_WITH_DELETE_MARKER).increment(1);
}
}
}
}

Expand All @@ -105,11 +158,14 @@ public void map(ImmutableBytesWritable row, Result values, Context context) thro
* @throws IOException When setting up the job fails.
*/
public Job createSubmittableJob(Configuration conf) throws IOException {
conf.setBoolean(OPT_COUNT_DELETE_MARKERS, this.countDeleteMarkers);
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(RowCounter.class);
Scan scan = new Scan();
// raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set
scan.setRaw(this.countDeleteMarkers);
scan.setCacheBlocks(false);
setScanFilter(scan, rowRangeList);
setScanFilter(scan, rowRangeList, this.countDeleteMarkers);

for (String columnName : this.columns) {
String family = StringUtils.substringBefore(columnName, ":");
Expand Down Expand Up @@ -147,13 +203,15 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws
List<MultiRowRangeFilter.RowRange> rowRangeList = null;
long startTime = 0;
long endTime = 0;
boolean countDeleteMarkers = false;

StringBuilder sb = new StringBuilder();

final String rangeSwitch = "--range=";
final String startTimeArgKey = "--starttime=";
final String endTimeArgKey = "--endtime=";
final String expectedCountArg = "--expected-count=";
final String countDeleteMarkersArg = "--countDeleteMarkers";

// First argument is table name, starting from second
for (int i = 1; i < args.length; i++) {
Expand All @@ -179,10 +237,15 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws
Long.parseLong(args[i].substring(expectedCountArg.length())));
continue;
}
if (args[i].startsWith(countDeleteMarkersArg)) {
countDeleteMarkers = true;
continue;
}
// if no switch, assume column names
sb.append(args[i]);
sb.append(" ");
}
conf.setBoolean(OPT_COUNT_DELETE_MARKERS, countDeleteMarkers);
if (endTime < startTime) {
printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime);
return null;
Expand All @@ -192,7 +255,9 @@ public static Job createSubmittableJob(Configuration conf, String[] args) throws
job.setJarByClass(RowCounter.class);
Scan scan = new Scan();
scan.setCacheBlocks(false);
setScanFilter(scan, rowRangeList);
// raw scan will be needed to account for delete markers when --countDeleteMarkers flag is set
scan.setRaw(countDeleteMarkers);
setScanFilter(scan, rowRangeList, countDeleteMarkers);
if (sb.length() > 0) {
for (String columnName : sb.toString().trim().split(" ")) {
String family = StringUtils.substringBefore(columnName, ":");
Expand Down Expand Up @@ -250,9 +315,11 @@ private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter(String
* Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. If rowRangeList
* contains exactly one element, startRow and stopRow are set to the scan.
*/
private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) {
private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList,
boolean countDeleteMarkers) {
final int size = rowRangeList == null ? 0 : rowRangeList.size();
if (size <= 1) {
// all cells will be needed if --countDeleteMarkers flag is set, hence, skipping filter
if (size <= 1 && !countDeleteMarkers) {
scan.setFilter(new FirstKeyOnlyFilter());
}
if (size == 1) {
Expand Down Expand Up @@ -295,10 +362,15 @@ protected void addOptions() {
.desc("[startKey],[endKey][;[startKey],[endKey]...]]").longOpt(OPT_RANGE).build();
Option expectedOption = Option.builder(null).valueSeparator('=').hasArg(true)
.desc("expected number of rows to be count.").longOpt(OPT_EXPECTED_COUNT).build();
Option countDeleteMarkersOption = Option.builder(null).hasArg(false)
.desc("counts the number of Delete Markers of all types, i.e. "
+ "(DELETE, DELETE_COLUMN, DELETE_FAMILY, DELETE_FAMILY_VERSION)")
.longOpt(OPT_COUNT_DELETE_MARKERS).build();
addOption(startTimeOption);
addOption(endTimeOption);
addOption(rangeOption);
addOption(expectedOption);
addOption(countDeleteMarkersOption);
}

@Override
Expand All @@ -316,6 +388,7 @@ protected void processOptions(CommandLine cmd) throws IllegalArgumentException {
this.startTime = cmd.getOptionValue(OPT_START_TIME) == null
? 0
: Long.parseLong(cmd.getOptionValue(OPT_START_TIME));
this.countDeleteMarkers = cmd.hasOption(OPT_COUNT_DELETE_MARKERS);

for (int i = 1; i < cmd.getArgList().size(); i++) {
String argument = cmd.getArgList().get(i);
Expand Down Expand Up @@ -347,7 +420,7 @@ protected void processOldArgs(List<String> args) {

@Override
protected int doWork() throws Exception {
Job job = createSubmittableJob(getConf());
job = createSubmittableJob(getConf());
if (job == null) {
return -1;
}
Expand Down Expand Up @@ -388,4 +461,10 @@ protected CommandLineParser newParser() {
return new RowCounterCommandLineParser();
}

@RestrictedApi(explanation = "Only visible for testing", link = "",
allowedOnPath = ".*/src/test/.*")
Job getMapReduceJob() {
return job;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand All @@ -37,6 +39,7 @@
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -524,6 +527,137 @@ public void testInvalidTable() throws Exception {
}
}

/**
* Step 1: Add 10 rows(row1, row2, row3, row4, row5, row6, row7, row8, row9, row10) to a table.
* Each row contains 1 column family and 4 columns and values for two different timestamps - 5 &
* 10.
* <p>
* Step 2: Delete the latest version of column A for row1. --> 1 X Delete
* <p>
* Step 3: Delete the cell for timestamp 5 of column B for row1. --> 1 X Delete
* <p>
* Step 4: Delete a column family for row2 and row4. --> 2 X DeleteFamily
* <p>
* Step 5: Delete all versions of a specific column for row3, row5 and row6. --> 3 X DeleteColumn
* <p>
* Step 6: Delete all columns for timestamp 5 for row 7. --> 1 X DeleteFamilyVersion
* <p>
* Case 1: Run row counter without countDeleteMarkers and validate counter values.
* <p>
* Case 2: Run row counter with countDeleteMarkers flag and validate counter values.
* <p>
* Case 3: Run row counter with countDeleteMarkers flag for a row range and validate counter
* values.
*/
@Test
public void testRowCounterWithCountDeleteMarkersOption() throws Exception {
// Test Setup

final TableName tableName =
TableName.valueOf(TABLE_NAME + "_" + "withCountDeleteMarkersOption");
// Row keys are represented in this way because of HBASE-15287
final byte[][] rowKeys = { Bytes.toBytesBinary("\\x00row1"), Bytes.toBytesBinary("\\x00row2"),
Bytes.toBytesBinary("\\x00row3"), Bytes.toBytesBinary("\\x00row4"),
Bytes.toBytesBinary("\\x00row5"), Bytes.toBytesBinary("\\x00row6"),
Bytes.toBytesBinary("\\x00row7"), Bytes.toBytesBinary("\\x00row8"),
Bytes.toBytesBinary("\\x00row9"), Bytes.toBytesBinary("\\x00row10") };
final byte[] columnFamily = Bytes.toBytes("cf");
final byte[][] columns =
{ Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C"), Bytes.toBytes("D") };
final byte[][] values = { Bytes.toBytes("a"), Bytes.toBytes("b") };

try (Table table = TEST_UTIL.createTable(tableName, columnFamily)) {
// Step 1: Insert rows with columns
for (byte[] rowKey : rowKeys) {
Put put = new Put(rowKey);
for (byte[] col : columns) {
long timestamp = 5L;
for (byte[] value : values) {
put.addColumn(columnFamily, col, timestamp, value);
timestamp += 5L;
}
}
table.put(put);
}
TEST_UTIL.getAdmin().flush(tableName);

// Steps 2-6
Delete deleteA = new Delete(rowKeys[0]).addColumn(columnFamily, columns[0]);
Delete deleteB = new Delete(rowKeys[0]).addColumn(columnFamily, columns[1], 5L);
Delete deleteC = new Delete(rowKeys[1]).addFamily(columnFamily);
Delete deleteD = new Delete(rowKeys[2]).addColumns(columnFamily, columns[0]);
Delete deleteE = new Delete(rowKeys[3]).addFamily(columnFamily);
Delete deleteF = new Delete(rowKeys[4]).addColumns(columnFamily, columns[0]);
Delete deleteG = new Delete(rowKeys[5]).addColumns(columnFamily, columns[0]);
Delete deleteH = new Delete(rowKeys[6]).addFamilyVersion(columnFamily, 5L);

table.delete(deleteA);
table.delete(deleteB);
table.delete(deleteC);
table.delete(deleteD);
table.delete(deleteE);
table.delete(deleteF);
table.delete(deleteG);
table.delete(deleteH);
TEST_UTIL.getAdmin().flush(tableName);
}

RowCounter rowCounterWithoutCountDeleteMarkers = new RowCounter();
RowCounter rowCounterWithCountDeleteMarkers = new RowCounter();
RowCounter rowCounterForRangeWithCountDeleteMarkers = new RowCounter();
rowCounterWithoutCountDeleteMarkers.setConf(new Configuration(TEST_UTIL.getConfiguration()));
rowCounterWithCountDeleteMarkers.setConf(new Configuration(TEST_UTIL.getConfiguration()));
rowCounterForRangeWithCountDeleteMarkers
.setConf(new Configuration(TEST_UTIL.getConfiguration()));

// Invocation

rowCounterWithoutCountDeleteMarkers.run(new String[] { tableName.getNameAsString() });
rowCounterWithCountDeleteMarkers
.run(new String[] { tableName.getNameAsString(), "--countDeleteMarkers" });
rowCounterForRangeWithCountDeleteMarkers.run(new String[] { tableName.getNameAsString(),
"--countDeleteMarkers", "--range=\\x00row8,\\x00row9" });

// Validation

// Case 1:
validateCounterCounts(rowCounterWithoutCountDeleteMarkers.getMapReduceJob().getCounters(), 8, 0,
0, 0, 0, 0);

// Case 2:
validateCounterCounts(rowCounterWithCountDeleteMarkers.getMapReduceJob().getCounters(), 10, 7,
2, 3, 2, 1);

// Case 3:
validateCounterCounts(rowCounterForRangeWithCountDeleteMarkers.getMapReduceJob().getCounters(),
1, 0, 0, 0, 0, 0);
}

private void validateCounterCounts(Counters counters, long rowCount,
long rowsWithDeleteMarkersCount, long deleteCount, long deleteColumnCount,
long deleteFamilyCount, long deleteFamilyVersionCount) {

long actualRowCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue();
long actualRowsWithDeleteMarkersCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.ROWS_WITH_DELETE_MARKER).getValue();
long actualDeleteCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE).getValue();
long actualDeleteColumnCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE_COLUMN).getValue();
long actualDeleteFamilyCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE_FAMILY).getValue();
long actualDeleteFamilyVersionCount =
counters.findCounter(RowCounter.RowCounterMapper.Counters.DELETE_FAMILY_VERSION).getValue();

assertEquals(rowCount, actualRowCount);
assertEquals(rowsWithDeleteMarkersCount, actualRowsWithDeleteMarkersCount);
assertEquals(deleteCount, actualDeleteCount);
assertEquals(deleteColumnCount, actualDeleteColumnCount);
assertEquals(deleteFamilyCount, actualDeleteFamilyCount);
assertEquals(deleteFamilyVersionCount, actualDeleteFamilyVersionCount);
}

private void assertUsageContent(String usage) {
assertTrue(usage
.contains("usage: hbase rowcounter " + "<tablename> [options] [<column1> <column2>...]"));
Expand Down