From 868e90f4a907e731c11f470d3714350b156369e9 Mon Sep 17 00:00:00 2001 From: Evie Boland Date: Mon, 30 Sep 2024 11:11:26 -0400 Subject: [PATCH 01/18] Add row statistics coprocessor example --- .../example/row/stats/RowStatistics.java | 42 ++ .../RowStatisticsCompactionObserver.java | 359 ++++++++++++++++++ .../example/row/stats/RowStatisticsImpl.java | 314 +++++++++++++++ .../example/row/stats/SizeBucket.java | 38 ++ .../example/row/stats/SizeBucketTracker.java | 89 +++++ .../row/stats/recorder/CombinedRecorder.java | 34 ++ .../stats/recorder/RowStatisticsRecorder.java | 9 + .../row/stats/recorder/TableRecorder.java | 218 +++++++++++ .../ringbuffer/DisruptorExceptionHandler.java | 37 ++ .../stats/ringbuffer/RingBufferEnvelope.java | 17 + .../stats/ringbuffer/RingBufferPayload.java | 33 ++ .../ringbuffer/RowStatisticsEventHandler.java | 51 +++ .../row/stats/utils/ConfigurationUtil.java | 25 ++ .../row/stats/utils/RowStatisticsUtil.java | 47 +++ .../example/row/stats/utils/TableUtil.java | 36 ++ pom.xml | 24 ++ 16 files changed, 1373 insertions(+) create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatistics.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsCompactionObserver.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucket.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucketTracker.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/CombinedRecorder.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsRecorder.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/TableRecorder.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/DisruptorExceptionHandler.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RingBufferEnvelope.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RingBufferPayload.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsEventHandler.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/ConfigurationUtil.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsUtil.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/TableUtil.java diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatistics.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatistics.java new file mode 100644 index 000000000000..db0a54c81cbb --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatistics.java @@ -0,0 +1,42 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats; + +import java.util.Map; + +@InterfaceAudience.Private +public interface RowStatistics { + String getTable(); + + String getRegion(); + + String getColumnFamily(); + + long getLargestRowBytes(); + + int getLargestRowCells(); + + long getLargestCellBytes(); + + int getCellsLargerThanOneBlock(); + + int getRowsLargerThanOneBlock(); + + int getCellsLargerThanMaxCacheSize(); + + int getTotalDeletes(); + + int getTotalCells(); + + int getTotalRows(); + + long getTotalBytes(); + + String getLargestRowAsString(); + + String getLargestCellAsString(); + + Map getRowSizeBuckets(); + + Map getValueSizeBuckets(); + + String getJsonString(); +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsCompactionObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsCompactionObserver.java new file mode 100644 index 000000000000..e182e6bf36d8 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsCompactionObserver.java @@ -0,0 +1,359 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats; + +import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.TableUtil.CF; +import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.TableUtil.NAMESPACE; +import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.TableUtil.NAMESPACED_TABLE_NAME; +import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.TableUtil.TABLE_RECORDER_KEY; +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.ExtendedCell; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.RawCellBuilder; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; +import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.TableRecorder; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsUtil; +import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; +import org.apache.hadoop.hbase.metrics.Counter; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.Shipper; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RowStatisticsCompactionObserver + implements RegionCoprocessor, RegionObserver, MasterCoprocessor, MasterObserver { + + private static final Logger LOG = LoggerFactory.getLogger(RowStatisticsCompactionObserver.class); + + // From private field BucketAllocator.DEFAULT_BUCKET_SIZES + private static final long DEFAULT_MAX_BUCKET_SIZE = 512 * 1024 + 1024; + private static final ConcurrentMap TABLE_COUNTERS = + new ConcurrentHashMap(); + private static final String ROW_STATISTICS_DROPPED = "rowStatisticsDropped"; + private static final String ROW_STATISTICS_PUT_FAILED = "rowStatisticsPutFailures"; + private Counter rowStatisticsDropped; + private Counter rowStatisticsPutFailed; + private long maxCacheSize; + private final RowStatisticsRecorder recorder; + private TableRecorder tableRecorder; + + @InterfaceAudience.Private + public RowStatisticsCompactionObserver(RowStatisticsRecorder recorder) { + this.recorder = recorder; + this.tableRecorder = null; + } + + public RowStatisticsCompactionObserver() { + this(null); + } + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public Optional getMasterObserver() { + return Optional.of(this); + } + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + if (e instanceof RegionCoprocessorEnvironment) { + RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment) e; + if (RowStatisticsUtil.isInternalTable(regionEnv)) { + return; + } + + String[] configuredBuckets = regionEnv + .getConfiguration() + .getStrings(BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY); + maxCacheSize = DEFAULT_MAX_BUCKET_SIZE; + if (configuredBuckets != null && configuredBuckets.length > 0) { + String lastBucket = configuredBuckets[configuredBuckets.length - 1]; + try { + maxCacheSize = Integer.parseInt(lastBucket.trim()); + } catch (NumberFormatException ex) { + LOG.warn( + "Failed to parse {} value {} as int", + BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY, + lastBucket, + ex + ); + } + } + + rowStatisticsDropped = + regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_DROPPED); + rowStatisticsPutFailed = + regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_PUT_FAILED); + + TableName tableName = regionEnv.getRegionInfo().getTable(); + TABLE_COUNTERS.merge(tableName.getNameAsString(), 1L, Long::sum); + } + } + + @Override + public void stop(CoprocessorEnvironment e) throws IOException { + if (e instanceof RegionCoprocessorEnvironment) { + RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment) e; + if (RowStatisticsUtil.isInternalTable(regionEnv)) { + return; + } + TableName tableName = regionEnv.getRegionInfo().getTable(); + long tableCount = TABLE_COUNTERS.merge(tableName.getNameAsString(), -1L, Long::sum); + if (tableCount == 0) { + long regionCount = 0; + for (long count : TABLE_COUNTERS.values()) { + regionCount += count; + } + if (regionCount == 0) { + regionEnv + .getMetricRegistryForRegionServer() + .remove(ROW_STATISTICS_DROPPED, rowStatisticsDropped); + regionEnv + .getMetricRegistryForRegionServer() + .remove(ROW_STATISTICS_PUT_FAILED, rowStatisticsPutFailed); + boolean removed = regionEnv + .getSharedData() + .remove(TABLE_RECORDER_KEY, tableRecorder); + if (removed) { + tableRecorder.close(); + } + } + } + } + } + + @Override + public void postStartMaster(ObserverContext ctx) + throws IOException { + try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) { + if (admin.tableExists(NAMESPACED_TABLE_NAME)) { + LOG.info( + "Table {} already exists. Skipping table creation process.", + NAMESPACED_TABLE_NAME + ); + } else { + boolean shouldCreateNamespace = Arrays + .stream(admin.listNamespaces()) + .filter(namespace -> namespace.equals(NAMESPACE)) + .collect(Collectors.toUnmodifiableSet()) + .isEmpty(); + if (shouldCreateNamespace) { + NamespaceDescriptor nd = NamespaceDescriptor.create(NAMESPACE).build(); + try { + admin.createNamespace(nd); + } catch (IOException e) { + LOG.error("Failed to create namespace {}", NAMESPACE, e); + } + } + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder + .newBuilder(CF) + .setMaxVersions(25) + .setTimeToLive((int) Duration.ofDays(7).toSeconds()) + .build(); + TableDescriptor td = TableDescriptorBuilder + .newBuilder(NAMESPACED_TABLE_NAME) + .setColumnFamily(cfd) + .build(); + LOG.info("Creating table {}", NAMESPACED_TABLE_NAME); + try { + admin.createTable(td); + } catch (IOException e) { + LOG.error("Failed to create table {}", NAMESPACED_TABLE_NAME, e); + } + } + } catch (IOException e) { + LOG.error( + "Failed to get Connection or Admin. Cannot determine if table {} exists.", + NAMESPACED_TABLE_NAME, + e + ); + } + } + + @Override + public InternalScanner preCompact( + ObserverContext context, + Store store, + InternalScanner scanner, + ScanType scanType, + CompactionLifeCycleTracker tracker, + CompactionRequest request + ) { + if (RowStatisticsUtil.isInternalTable(store.getTableName())) { + LOG.debug( + "Region {} belongs to an internal table {}, so no row statistics will be recorded", + store.getRegionInfo().getRegionNameAsString(), + store.getTableName().getNameAsString() + ); + return scanner; + } + int blocksize = store.getColumnFamilyDescriptor().getBlocksize(); + boolean isMajor = request.isMajor(); + RowStatisticsImpl stats; + if (isMajor) { + stats = + new RowStatisticsImpl( + store.getTableName().getNameAsString(), + store.getRegionInfo().getEncodedName(), + store.getColumnFamilyName(), + blocksize, + maxCacheSize, + true + ); + } else { + stats = + new RowStatisticsImpl( + store.getTableName().getNameAsString(), + store.getRegionInfo().getEncodedName(), + store.getColumnFamilyName(), + blocksize, + maxCacheSize, + false + ); + } + return new RowStatisticsScanner(scanner, isMajor, stats, context.getEnvironment()); + } + + private class RowStatisticsScanner implements InternalScanner, Shipper { + + private final InternalScanner scanner; + private final Shipper shipper; + private final boolean isMajor; + private final RowStatisticsImpl rowStatistics; + private final RegionCoprocessorEnvironment regionEnv; + private RawCellBuilder cellBuilder; + private Cell lastCell; + + public RowStatisticsScanner( + InternalScanner scanner, + boolean isMajor, + RowStatisticsImpl rowStatistics, + RegionCoprocessorEnvironment regionEnv + ) { + this.scanner = scanner; + if (scanner instanceof Shipper) { + this.shipper = (Shipper) scanner; + } else { + this.shipper = null; + } + this.isMajor = isMajor; + this.rowStatistics = rowStatistics; + this.regionEnv = regionEnv; + this.cellBuilder = regionEnv.getCellBuilder(); + } + + @Override + public boolean next(List list, ScannerContext scannerContext) + throws IOException { + boolean ret = scanner.next(list, scannerContext); + + if (list.isEmpty()) { + return ret; + } + + // each next() call returns at most 1 row (maybe less for large rows) + // so we just need to check if the first cell has changed rows + ExtendedCell first = (ExtendedCell) list.get(0); + if (rowChanged(first)) { + rowStatistics.handleRowChanged(lastCell); + } + + for (int i = 0; i < list.size(); i++) { + ExtendedCell cell = (ExtendedCell) list.get(i); + rowStatistics.consumeCell(cell); + lastCell = cell; + } + + return ret; + } + + @Override public boolean next(List result) throws IOException { + return InternalScanner.super.next(result); + } + + @Override + public void close() throws IOException { + rowStatistics.handleRowChanged(lastCell); + rowStatistics.shipped(cellBuilder); + record(); + scanner.close(); + } + + @Override + public void shipped() throws IOException { + if (shipper != null) { + lastCell = RowStatisticsUtil.cloneWithoutValue(cellBuilder, lastCell); + rowStatistics.shipped(cellBuilder); + shipper.shipped(); + } + } + + private boolean rowChanged(Cell cell) { + if (lastCell == null) { + return false; + } + return !CellUtil.matchingRows(lastCell, cell); + } + + private void record() { + tableRecorder = + (TableRecorder) regionEnv + .getSharedData() + .computeIfAbsent( + TABLE_RECORDER_KEY, + k -> + TableRecorder.forClusterConnection( + regionEnv.getConnection(), + rowStatisticsDropped, + rowStatisticsPutFailed + ) + ); + if (tableRecorder != null) { + tableRecorder.record( + this.rowStatistics, + this.isMajor, + Optional.of(regionEnv.getRegion().getRegionInfo().getRegionName()) + ); + } else { + LOG.error( + "Failed to initialize a TableRecorder. Will not record row statistics for region={}", + rowStatistics.getRegion() + ); + rowStatisticsDropped.increment(); + } + if (recorder != null) { + recorder.record(this.rowStatistics, this.isMajor, Optional.empty()); + } + } + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java new file mode 100644 index 000000000000..edbe44a225aa --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java @@ -0,0 +1,314 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats; + +import java.util.Map; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.RawCellBuilder; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsUtil; +import org.apache.hadoop.hbase.regionserver.Shipper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.GsonUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.gson.Gson; +import org.apache.hbase.thirdparty.com.google.gson.JsonObject; + +/** + * Holder for accumulating row statistics in {@link RowStatisticsCompactionObserver} + * Creates various cell, row, and total stats. + */ +@InterfaceAudience.Private +public class RowStatisticsImpl implements RowStatistics { + + private static final Logger LOG = LoggerFactory.getLogger(RowStatisticsImpl.class); + private static final Gson GSON = GsonUtil.createGson().create(); + + // + // Transient fields which are not included in gson serialization + // + private final transient long blockSize; + private final transient long maxCacheSize; + private transient int rowCells; + private transient long rowBytes; + private transient byte[] largestRow; + private transient Cell largestCell; + private final transient boolean isMajor; + private final transient SizeBucketTracker rowSizeBuckets; + private final transient SizeBucketTracker valueSizeBuckets; + + // We don't need to clone anything until shipped() is called on scanner. + // To avoid allocations, we keep a reference until that point + private transient Cell largestRowRef; + private transient Cell largestCellRef; + // + // Non-transient fields which are included in gson + // + private final String table; + private final String region; + private final String columnFamily; + private long largestRowBytes; + private int largestRowCells; + private long largestCellBytes; + private int cellsLargerThanOneBlock; + private int rowsLargerThanOneBlock; + private int cellsLargerThanMaxCacheSize; + private int totalDeletes; + private int totalCells; + private int totalRows; + private long totalBytes; + + RowStatisticsImpl( + String table, + String encodedRegion, + String columnFamily, + long blockSize, + long maxCacheSize, + boolean isMajor + ) { + this.table = table; + this.region = encodedRegion; + this.columnFamily = columnFamily; + this.blockSize = blockSize; + this.maxCacheSize = maxCacheSize; + this.isMajor = isMajor; + this.rowSizeBuckets = new SizeBucketTracker(); + this.valueSizeBuckets = new SizeBucketTracker(); + } + + public void handleRowChanged(Cell lastCell) { + if (rowBytes > largestRowBytes) { + largestRowRef = lastCell; + largestRowBytes = rowBytes; + largestRowCells = rowCells; + } + if (rowBytes > blockSize) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "RowTooLarge: rowBytes={}, blockSize={}, table={}, rowKey={}", + rowBytes, + blockSize, + table, + Bytes.toStringBinary( + lastCell.getRowArray(), + lastCell.getRowOffset(), + lastCell.getRowLength() + ) + ); + } + rowsLargerThanOneBlock++; + } + rowSizeBuckets.add(rowBytes); + rowBytes = 0; + rowCells = 0; + totalRows++; + } + + public void consumeCell(Cell cell) { + int cellSize = cell.getSerializedSize(); + + rowBytes += cellSize; + rowCells++; + + boolean tooLarge = false; + if (cellSize > maxCacheSize) { + cellsLargerThanMaxCacheSize++; + tooLarge = true; + } + if (cellSize > blockSize) { + cellsLargerThanOneBlock++; + tooLarge = true; + } + + if (tooLarge && LOG.isDebugEnabled()) { + LOG.debug( + "CellTooLarge: size={}, blockSize={}, maxCacheSize={}, table={}, cell={}", + cellSize, + blockSize, + maxCacheSize, + table, + CellUtil.toString(cell, false) + ); + } + + if (cellSize > largestCellBytes) { + largestCellRef = cell; + largestCellBytes = cellSize; + } + valueSizeBuckets.add(cell.getValueLength()); + + totalCells++; + if (CellUtil.isDelete(cell)) { + totalDeletes++; + } + totalBytes += cellSize; + } + + /** + * Clone the cell refs so they can be cleaned up by {@link Shipper#shipped()}. + * Doing this lazily here, rather than eagerly in the above two methods can save + * us on some allocations. We might change the largestCell/largestRow multiple times + * between shipped() calls. + */ + public void shipped(RawCellBuilder cellBuilder) { + if (largestRowRef != null) { + largestRow = CellUtil.cloneRow(largestRowRef); + largestRowRef = null; + } + if (largestCellRef != null) { + largestCell = RowStatisticsUtil.cloneWithoutValue(cellBuilder, largestCellRef); + largestCellRef = null; + } + } + + public String getTable() { + return table; + } + + public String getRegion() { + return region; + } + + public String getColumnFamily() { + return columnFamily; + } + + public byte[] getLargestRow() { + return largestRow; + } + + public String getLargestRowAsString() { + return Bytes.toStringBinary(getLargestRow()); + } + + public long getLargestRowBytes() { + return largestRowBytes; + } + + public int getLargestRowCells() { + return largestRowCells; + } + + public Cell getLargestCell() { + return largestCell; + } + + public String getLargestCellAsString() { + return CellUtil.toString(getLargestCell(), false); + } + + public long getLargestCellBytes() { + return largestCellBytes; + } + + public int getCellsLargerThanOneBlock() { + return cellsLargerThanOneBlock; + } + + public int getRowsLargerThanOneBlock() { + return rowsLargerThanOneBlock; + } + + public int getCellsLargerThanMaxCacheSize() { + return cellsLargerThanMaxCacheSize; + } + + public int getTotalDeletes() { + return totalDeletes; + } + + public int getTotalCells() { + return totalCells; + } + + public int getTotalRows() { + return totalRows; + } + + public long getTotalBytes() { + return totalBytes; + } + + public Map getRowSizeBuckets() { + return rowSizeBuckets.toMap(); + } + + public Map getValueSizeBuckets() { + return valueSizeBuckets.toMap(); + } + + @Override + public String toString() { + return ( + "RowStatistics{" + + "largestRow=" + + Bytes.toStringBinary(largestRow) + + ", largestRowBytes=" + + largestRowBytes + + ", largestRowCells=" + + largestRowCells + + ", largestCell=" + + largestCell + + ", largestCellBytes=" + + largestCellBytes + + ", cellsLargerThanOneBlock=" + + cellsLargerThanOneBlock + + ", rowsLargerThanOneBlock=" + + rowsLargerThanOneBlock + + ", cellsLargerThanMaxCacheSize=" + + cellsLargerThanMaxCacheSize + + ", totalDeletes=" + + totalDeletes + + ", totalCells=" + + totalCells + + ", totalRows=" + + totalRows + + ", totalBytes=" + + totalBytes + + ", rowSizeBuckets=" + + getRowSizeBuckets() + + ", valueSizeBuckets=" + + getValueSizeBuckets() + + ", isMajor=" + + isMajor + + '}' + ); + } + + @Override + public String getJsonString() { + JsonObject json = (JsonObject) GSON.toJsonTree(this); + json.add("largestCellParts", buildLargestCellPartsJson()); + json.addProperty("largestRowAsString", getLargestRowAsString()); + json.add("rowSizeBuckets", rowSizeBuckets.toJsonObject()); + json.add("valueSizeBuckets", valueSizeBuckets.toJsonObject()); + return json.toString(); + } + + private JsonObject buildLargestCellPartsJson() { + JsonObject cellJson = new JsonObject(); + Cell cell = getLargestCell(); + cellJson.addProperty( + "rowKey", + Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + ); + cellJson.addProperty( + "family", + Bytes.toStringBinary( + cell.getFamilyArray(), + cell.getFamilyOffset(), + cell.getFamilyLength() + ) + ); + cellJson.addProperty( + "qualifier", + Bytes.toStringBinary( + cell.getQualifierArray(), + cell.getQualifierOffset(), + cell.getQualifierLength() + ) + ); + cellJson.addProperty("timestamp", cell.getTimestamp()); + cellJson.addProperty("type", cell.getType().toString()); + return cellJson; + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucket.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucket.java new file mode 100644 index 000000000000..9bddf2473cf0 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucket.java @@ -0,0 +1,38 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats; + +@InterfaceAudience.Private +public enum SizeBucket { + KILOBYTES_1(0, 1 * 1024, "[0, 1)"), + KILOBYTES_2(1 * 1024, 2 * 1024, "[1, 2)"), + KILOBYTES_4(2 * 1024, 4 * 1024, "[2, 4)"), + KILOBYTES_8(4 * 1024, 8 * 1024, "[4, 8)"), + KILOBYTES_16(8 * 1024, 16 * 1024, "[8, 16)"), + KILOBYTES_32(16 * 1024, 32 * 1024, "[16, 32)"), + KILOBYTES_64(32 * 1024, 64 * 1024, "[32, 64)"), + KILOBYTES_128(64 * 1024, 128 * 1024, "[64, 128)"), + KILOBYTES_256(128 * 1024, 256 * 1024, "[128, 256)"), + KILOBYTES_512(256 * 1024, 512 * 1024, "[256, 512)"), + KILOBYTES_MAX(512 * 1024, Long.MAX_VALUE, "[512, inf)"); + + private final long minBytes; + private final long maxBytes; + private final String bucket; + + SizeBucket(long minBytes, long maxBytes, String bucket) { + this.minBytes = minBytes; + this.maxBytes = maxBytes; + this.bucket = bucket; + } + + public long minBytes() { + return minBytes; + } + + public long maxBytes() { + return maxBytes; + } + + public String bucket() { + return bucket; + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucketTracker.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucketTracker.java new file mode 100644 index 000000000000..70c29053bd17 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucketTracker.java @@ -0,0 +1,89 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats; + +import java.util.HashMap; +import java.util.Map; +import org.apache.hbase.thirdparty.com.google.gson.JsonObject; + +@InterfaceAudience.Private +public class SizeBucketTracker { + + private static final SizeBucket[] SIZE_BUCKET_ARRAY = SizeBucket.values(); + private final Map bucketToCount; + + public SizeBucketTracker() { + SizeBucket[] sizeBucketsArray = SizeBucket.values(); + + bucketToCount = new HashMap<>(sizeBucketsArray.length); + for (SizeBucket sizeBucket : sizeBucketsArray) { + bucketToCount.put(sizeBucket, 0L); + } + } + + public void add(long rowBytes) { + if (rowBytes < 0) { + return; + } + SizeBucket sizeBucket = search(rowBytes); + if (sizeBucket == null) { + return; + } + long val = bucketToCount.get(sizeBucket); + bucketToCount.put(sizeBucket, getSafeIncrementedValue(val)); + } + + public Map toMap() { + Map copy = new HashMap<>(SIZE_BUCKET_ARRAY.length); + for (SizeBucket sizeBucket : SIZE_BUCKET_ARRAY) { + long val = bucketToCount.get(sizeBucket); + copy.put(sizeBucket.bucket(), val); + } + return copy; + } + + public JsonObject toJsonObject() { + JsonObject bucketJson = new JsonObject(); + for (SizeBucket sizeBucket : SIZE_BUCKET_ARRAY) { + long val = bucketToCount.get(sizeBucket); + bucketJson.addProperty(sizeBucket.bucket(), val); + } + return bucketJson; + } + + private SizeBucket search(long val) { + /* + Performance tested a few different search implementations + 1. Linear + 2. Binary + 3. Search - bucket search order changes over time as more information about the table is gained + + Linear and Binary implementations had roughly similar throughput + - Linear performs slightly better when the sizes are small + - Binary performs slightly better when the sizes are irregularly distributed or skewed high + + Smart implementation had the lowest throughput + - Reassessing the bucket search order is an expensive operation + - Tuning the number bucket search order reassessments is tricky, since it depended on the + - Write patterns to a table -- including how hot/cold the compacting data is + - Number of values per row + - Number of rows per region + - Small number of SizeBucket values means that there is NOT a ton of value ot be gained from reassessing the bucket search order + + Landed on Linear implementation because + - Looping through a small array is quick, especially if most of the values end up exiting out early + - Many tables at HubSpot have small values + - Implementation is clear and requires no tuning + + PR with more testing context: https://git.hubteam.com/HubSpot/HubSpotCoprocessors/pull/243 + */ + for (SizeBucket sizeBucket : SIZE_BUCKET_ARRAY) { + if (val < sizeBucket.maxBytes()) { + return sizeBucket; + } + } + return val == Long.MAX_VALUE ? SizeBucket.KILOBYTES_MAX : null; + } + + private static long getSafeIncrementedValue(long val) { + return val == Long.MAX_VALUE ? val : val + 1; + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/CombinedRecorder.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/CombinedRecorder.java new file mode 100644 index 000000000000..e2310e693bcd --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/CombinedRecorder.java @@ -0,0 +1,34 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder; + +import java.util.Optional; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatisticsImpl; + +@InterfaceAudience.Private +public class CombinedRecorder implements RowStatisticsRecorder { + + private final RowStatisticsRecorder one; + private final RowStatisticsRecorder two; + + public CombinedRecorder(RowStatisticsRecorder one, RowStatisticsRecorder two) { + this.one = one; + this.two = two; + } + + @Override + public void record( + RowStatisticsImpl stats, + boolean isMajor, + Optional fullRegionName + ) { + one.record(stats, isMajor, fullRegionName); + two.record(stats, isMajor, fullRegionName); + } + + public RowStatisticsRecorder getOne() { + return one; + } + + public RowStatisticsRecorder getTwo() { + return two; + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsRecorder.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsRecorder.java new file mode 100644 index 000000000000..6fea58ac86b6 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsRecorder.java @@ -0,0 +1,9 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder; + +import java.util.Optional; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatisticsImpl; + +@InterfaceAudience.Private +public interface RowStatisticsRecorder { + void record(RowStatisticsImpl stats, boolean isMajor, Optional fullRegionName); +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/TableRecorder.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/TableRecorder.java new file mode 100644 index 000000000000..afcade0664be --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/TableRecorder.java @@ -0,0 +1,218 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder; + +import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.ConfigurationUtil.getInt; +import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.ConfigurationUtil.getLong; +import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.TableUtil.NAMESPACED_TABLE_NAME; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionConfiguration; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatisticsImpl; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.DisruptorExceptionHandler; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RingBufferEnvelope; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RingBufferPayload; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsEventHandler; +import org.apache.hadoop.hbase.metrics.Counter; +import org.apache.hadoop.hbase.util.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + +@InterfaceAudience.Private +public class TableRecorder implements RowStatisticsRecorder { + + private static final Logger LOG = + LoggerFactory.getLogger(TableRecorder.class); + private static final int DEFAULT_EVENT_COUNT = 1024; // Must be multiple of 2. Each RS has < 100 regions, so a ring buffer size of 1024 is generous + private static final long DISRUPTOR_SHUTDOWN_TIMEOUT_MS = 60_0000L; + private final BufferedMutator bufferedMutator; + private final Counter rowStatisticsDropped; + private final Disruptor disruptor; + private final RingBuffer ringBuffer; + private final AtomicBoolean closed; + + /* + * This constructor is ONLY for testing + * Use TableRecorder#forClusterConnection if you want to instantiate a TableRecorder object + */ + public TableRecorder( + BufferedMutator bufferedMutator, + Disruptor disruptor, + Counter rowStatisticsDropped + ) { + this.bufferedMutator = bufferedMutator; + this.disruptor = disruptor; + this.ringBuffer = disruptor.getRingBuffer(); + this.rowStatisticsDropped = rowStatisticsDropped; + this.closed = new AtomicBoolean(false); + } + + public static TableRecorder forClusterConnection( + Connection clusterConnection, + Counter rowStatisticsDropped, + Counter rowStatisticsPutFailed + ) { + BufferedMutator bufferedMutator = initializeBufferedMutator( + clusterConnection, + rowStatisticsPutFailed + ); + if (bufferedMutator == null) { + return null; + } + + Disruptor disruptor = initializeDisruptor( + bufferedMutator, + rowStatisticsPutFailed + ); + disruptor.start(); + + return new TableRecorder(bufferedMutator, disruptor, rowStatisticsDropped); + } + + @Override + public void record( + RowStatisticsImpl rowStatistics, + boolean isMajor, + Optional fullRegionName + ) { + if (!closed.get()) { + if ( + !ringBuffer.tryPublishEvent((envelope, seqId) -> + envelope.load( + new RingBufferPayload(rowStatistics, isMajor, fullRegionName.get()) + ) + ) + ) { + rowStatisticsDropped.increment(); + LOG.error( + "Failed to load row statistics for region={} into the ring buffer", + rowStatistics.getRegion() + ); + } + } else { + rowStatisticsDropped.increment(); + LOG.error( + "TableRecorder is closed. Will not record row statistics for region={}", + rowStatistics.getRegion() + ); + } + } + + public void close() throws IOException { + if (!closed.compareAndSet(false, true)) { + return; + } + try { + disruptor.shutdown(DISRUPTOR_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.warn( + "Disruptor shutdown timed out after {} ms. Forcing halt. Some row statistics may be lost", + DISRUPTOR_SHUTDOWN_TIMEOUT_MS + ); + disruptor.halt(); + disruptor.shutdown(); + } + bufferedMutator.close(); + } + + private static BufferedMutator initializeBufferedMutator( + Connection conn, + Counter rowStatisticsPutFailed + ) { + Configuration conf = conn.getConfiguration(); + TableRecorderExceptionListener exceptionListener = new TableRecorderExceptionListener( + rowStatisticsPutFailed + ); + BufferedMutatorParams params = new BufferedMutatorParams(NAMESPACED_TABLE_NAME) + .rpcTimeout(getInt(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 15_000)) // timeout for each RPC in ms + .operationTimeout(getInt(conf, HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30_000)) // call completion time in ms + .setWriteBufferPeriodicFlushTimeoutMs( + getLong( + conf, + ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, + 60_000L + ) + ) // periodic flush interval in ms + .writeBufferSize( + getLong( + conf, + ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY, + ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT + ) + ) // buffer size (in bytes) before a flush + .listener(exceptionListener); + BufferedMutator bufferedMutator = null; + try { + bufferedMutator = conn.getBufferedMutator(params); + } catch (IOException e) { + LOG.error( + "This should NEVER print! ConnectionImplementation#getBufferedMutator(BufferedMutatorParams bmp) does NOT raise IOExceptions", + e + ); + } + return bufferedMutator; + } + + private static Disruptor initializeDisruptor( + BufferedMutator bufferedMutator, + Counter rowStatisticsPutFailures + ) { + Disruptor disruptor = new Disruptor<>( + RingBufferEnvelope::new, + DEFAULT_EVENT_COUNT, + new ThreadFactoryBuilder() + .setNameFormat("rowstats.append-pool-%d") + .setDaemon(true) + .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER) + .build(), + ProducerType.MULTI, + new BlockingWaitStrategy() + ); + disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); + RowStatisticsEventHandler rowStatisticsEventHandler = new RowStatisticsEventHandler( + bufferedMutator, + rowStatisticsPutFailures + ); + disruptor.handleEventsWith( + new RowStatisticsEventHandler[] { rowStatisticsEventHandler } + ); + return disruptor; + } + + protected static class TableRecorderExceptionListener + implements BufferedMutator.ExceptionListener { + + private final Counter rowStatisticsPutFailures; + + TableRecorderExceptionListener(Counter counter) { + this.rowStatisticsPutFailures = counter; + } + + public void onException( + RetriesExhaustedWithDetailsException exception, + BufferedMutator mutator + ) throws RetriesExhaustedWithDetailsException { + long failedPuts = mutator.getWriteBufferSize(); + for (int i = 0; i < failedPuts; i++) { + rowStatisticsPutFailures.increment(); + } + LOG.error( + "Periodic flush of buffered mutator failed. Cannot persist {} row statistics stored in buffer", + failedPuts, + exception + ); + } + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/DisruptorExceptionHandler.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/DisruptorExceptionHandler.java new file mode 100644 index 000000000000..f411bf8a1324 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/DisruptorExceptionHandler.java @@ -0,0 +1,37 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer; + +import com.lmax.disruptor.ExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class DisruptorExceptionHandler implements ExceptionHandler { + + private static final Logger LOG = LoggerFactory.getLogger( + DisruptorExceptionHandler.class + ); + + @Override + public void handleEventException(Throwable e, long sequence, RingBufferEnvelope event) { + if (event != null) { + LOG.error( + "Unable to persist event={} with sequence={}", + event.getPayload(), + sequence, + e + ); + } else { + LOG.error("Event with sequence={} was null", sequence, e); + } + } + + @Override + public void handleOnStartException(Throwable e) { + LOG.error("Disruptor onStartException", e); + } + + @Override + public void handleOnShutdownException(Throwable e) { + LOG.error("Disruptor onShutdownException", e); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RingBufferEnvelope.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RingBufferEnvelope.java new file mode 100644 index 000000000000..89b333e36d6a --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RingBufferEnvelope.java @@ -0,0 +1,17 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer; + +@InterfaceAudience.Private +public final class RingBufferEnvelope { + + private RingBufferPayload payload; + + public void load(RingBufferPayload payload) { + this.payload = payload; + } + + public RingBufferPayload getPayload() { + final RingBufferPayload payload = this.payload; + this.payload = null; + return payload; + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RingBufferPayload.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RingBufferPayload.java new file mode 100644 index 000000000000..6a904935019b --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RingBufferPayload.java @@ -0,0 +1,33 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer; + +import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatistics; + +@InterfaceAudience.Private +public class RingBufferPayload { + + private final RowStatistics rowStatistics; + private final boolean isMajor; + private final byte[] fullRegionName; + + public RingBufferPayload( + RowStatistics rowStatistics, + boolean isMajor, + byte[] fullRegionName + ) { + this.rowStatistics = rowStatistics; + this.isMajor = isMajor; + this.fullRegionName = fullRegionName; + } + + public RowStatistics getRowStatistics() { + return rowStatistics; + } + + public boolean getIsMajor() { + return isMajor; + } + + public byte[] getFullRegionName() { + return fullRegionName; + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsEventHandler.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsEventHandler.java new file mode 100644 index 000000000000..62d113a95bb8 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsEventHandler.java @@ -0,0 +1,51 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer; + +import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.TableUtil.buildPutForRegion; +import com.lmax.disruptor.EventHandler; +import java.io.IOException; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatistics; +import org.apache.hadoop.hbase.metrics.Counter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class RowStatisticsEventHandler implements EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger( + RowStatisticsEventHandler.class + ); + private final BufferedMutator bufferedMutator; + private final Counter rowStatisticsPutFailures; + + public RowStatisticsEventHandler( + BufferedMutator bufferedMutator, + Counter rowStatisticsPutFailures + ) { + this.bufferedMutator = bufferedMutator; + this.rowStatisticsPutFailures = rowStatisticsPutFailures; + } + + @Override + public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) + throws Exception { + final RingBufferPayload payload = event.getPayload(); + if (payload != null) { + final RowStatistics rowStatistics = payload.getRowStatistics(); + final boolean isMajor = payload.getIsMajor(); + final byte[] fullRegionName = payload.getFullRegionName(); + Put put = buildPutForRegion(fullRegionName, rowStatistics, isMajor); + try { + bufferedMutator.mutate(put); + } catch (IOException e) { + rowStatisticsPutFailures.increment(); + LOG.error( + "Mutate operation failed. Cannot persist row statistics for region {}", + rowStatistics.getRegion(), + e + ); + } + } + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/ConfigurationUtil.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/ConfigurationUtil.java new file mode 100644 index 000000000000..c9189ca4b10e --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/ConfigurationUtil.java @@ -0,0 +1,25 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats.utils; + +import org.apache.hadoop.conf.Configuration; + +@InterfaceAudience.Private +public class ConfigurationUtil { + + private static final String ROW_STATISTICS_PREFIX = "hubspot.row.statistics."; + + public static int getInt(Configuration conf, String name, int defaultValue) { + return conf.getInt(ROW_STATISTICS_PREFIX + name, defaultValue); + } + + public static long getLong(Configuration conf, String name, long defaultValue) { + return conf.getLong(ROW_STATISTICS_PREFIX + name, defaultValue); + } + + private static void setInt(Configuration conf, String name, int defaultValue) { + conf.setInt(name, conf.getInt(ROW_STATISTICS_PREFIX + name, defaultValue)); + } + + private static void setLong(Configuration conf, String name, long defaultValue) { + conf.setLong(name, conf.getLong(ROW_STATISTICS_PREFIX + name, defaultValue)); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsUtil.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsUtil.java new file mode 100644 index 000000000000..e52a287cd96a --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsUtil.java @@ -0,0 +1,47 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats.utils; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.RawCellBuilder; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; + +@InterfaceAudience.Private +public class RowStatisticsUtil { + + public static Cell cloneWithoutValue(RawCellBuilder cellBuilder, Cell cell) { + return cellBuilder + .clear() + .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + .setQualifier( + cell.getQualifierArray(), + cell.getQualifierOffset(), + cell.getQualifierLength() + ) + .setTimestamp(cell.getTimestamp()) + .setType(cell.getType()) + .build(); + } + + public static boolean isInternalTable(RegionCoprocessorEnvironment environment) { + return isInternalTable(environment.getRegionInfo().getTable()); + } + + public static boolean isInternalTable(TableName tableName) { + return ( + !isDefaultNamespace(tableName.getNamespaceAsString()) || + isTestTable(tableName.getNameAsString()) + ); + } + + private static boolean isDefaultNamespace(String namespace) { + return namespace.equals(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR); + } + + private static boolean isTestTable(String table) { + return ( + table.startsWith("hbase-test-table") || table.startsWith("sharded-hbase-test-table") + ); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/TableUtil.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/TableUtil.java new file mode 100644 index 000000000000..315b67590636 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/TableUtil.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hbase.coprocessor.example.row.stats.utils; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatistics; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class TableUtil { + + private static final Logger LOG = LoggerFactory.getLogger(TableUtil.class); + public static final String NAMESPACE = "infra"; + public static final String TABLE_STRING = "row-statistics"; + public static final String VERSIONED_TABLE_STRING = TABLE_STRING + "-1"; + public static final TableName NAMESPACED_TABLE_NAME = TableName.valueOf( + NAMESPACE, + VERSIONED_TABLE_STRING + ); + public static final byte[] CF = Bytes.toBytes("0"); + public static final String TABLE_RECORDER_KEY = "tableRecorder"; + + public static Put buildPutForRegion( + byte[] regionRowKey, + RowStatistics rowStatistics, + boolean isMajor + ) { + Put put = new Put(regionRowKey); + String cq = rowStatistics.getColumnFamily() + (isMajor ? "1" : "0"); + String jsonString = rowStatistics.getJsonString(); + put.addColumn(CF, Bytes.toBytes(cq), Bytes.toBytes(jsonString)); + LOG.debug(jsonString); + return put; + } +} diff --git a/pom.xml b/pom.xml index 539473cdc7ce..4e16f636c979 100644 --- a/pom.xml +++ b/pom.xml @@ -1863,6 +1863,30 @@ junit test + + org.apache.hbase.thirdparty + hbase-shaded-miscellaneous + + + com.lmax + disruptor + + + org.apache.logging.log4j + log4j-1.2-api + + + org.slf4j + slf4j-api + + + org.apache.hbase.thirdparty + hbase-shaded-gson + + + org.apache.hadoop + hadoop-annotations +