From 7e627ffe605f8bc1ef38d13897d6560c9506a19c Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Sun, 17 Nov 2019 14:36:17 -0800 Subject: [PATCH 1/8] Fixing the ChainWalEntryFilter to filter the whole entry if all cells get filtered. Also, adding test cases for WALCellFilter --- .../replication/ChainWALEntryFilter.java | 3 ++ .../TestReplicationWALEntryFilters.java | 44 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index 2bb981119af5..a7816ddd878c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -75,6 +75,9 @@ public Entry filter(Entry entry) { entry = filter.filter(entry); } filterCells(entry); + if (entry.getEdit().isEmpty()) { + return null; + } return entry; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 7faaefb8f3b8..e511f48d9c31 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -155,6 +155,50 @@ public Entry filter(Entry entry) { } }; + public static class FilterSomeCellsWALCellFilter implements WALEntryFilter, WALCellFilter { + @Override + public Entry filter(Entry entry) { + return entry; + } + + @Override + public Cell filterCell(Entry entry, Cell cell) { + if (Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()).equals("a")) { + return null; + } else { + return cell; + } + } + } + + public static class FilterAllCellsWALCellFilter implements WALEntryFilter, WALCellFilter { + @Override + public Entry filter(Entry entry) { + return entry; + } + + @Override + public Cell filterCell(Entry entry, Cell cell) { + return null; + } + } + + @Test + public void testChainWALEntryWithCellFilter() { + Entry userEntry = createEntry(null, a, b, c); + ChainWALEntryFilter filterSomeCells = new ChainWALEntryFilter(new FilterSomeCellsWALCellFilter()); + // since WALCellFilter filter cells with rowkey 'a' + assertEquals(createEntry(null, b,c), filterSomeCells.filter(userEntry)); + + Entry userEntry2 = createEntry(null, b, c, d); + // since there is no cell to get filtered, nothing should get filtered + assertEquals(userEntry2, filterSomeCells.filter(userEntry2)); + + ChainWALEntryFilter filterAllCells = new ChainWALEntryFilter(new FilterAllCellsWALCellFilter()); + // since WALCellFilter filter all cells, whole entry should be filtered + assertEquals(null, filterAllCells.filter(userEntry)); + } + @Test public void testChainWALEntryFilter() { Entry userEntry = createEntry(null, a, b, c); From 9faebc9ea27cb5fa6f9afd34b6ffbc845c35c523 Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Sun, 17 Nov 2019 15:48:21 -0800 Subject: [PATCH 2/8] Adding tests for WALCellFilter to cover all the scenarios --- .../hbase/replication/ChainWALEntryFilter.java | 16 ++++++++++++---- .../TestReplicationWALEntryFilters.java | 3 +++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index a7816ddd878c..036cf53a1cdb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -37,6 +37,10 @@ public class ChainWALEntryFilter implements WALEntryFilter { private final WALEntryFilter[] filters; private WALCellFilter[] cellFilters; + // To allow the empty entries to get filtered, we want to this optional flag to decide + // if we want to filter the entries which have no cells or all cells got filtered though WALCellFilter + private boolean filterEmptyEntry = false; + public ChainWALEntryFilter(WALEntryFilter...filters) { this.filters = filters; initCellFilters(); @@ -68,14 +72,14 @@ public void initCellFilters() { @Override public Entry filter(Entry entry) { + if (entry == null) { + return null; + } for (WALEntryFilter filter : filters) { - if (entry == null) { - return null; - } entry = filter.filter(entry); } filterCells(entry); - if (entry.getEdit().isEmpty()) { + if (filterEmptyEntry && entry.getEdit().isEmpty()) { return null; } return entry; @@ -97,4 +101,8 @@ private Cell filterCell(Entry entry, Cell cell) { } return cell; } + + public void setFilterEmptyEntry(boolean filterEmptyEntry) { + this.filterEmptyEntry = filterEmptyEntry; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index e511f48d9c31..c98c9e5ff1d5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -195,6 +195,9 @@ public void testChainWALEntryWithCellFilter() { assertEquals(userEntry2, filterSomeCells.filter(userEntry2)); ChainWALEntryFilter filterAllCells = new ChainWALEntryFilter(new FilterAllCellsWALCellFilter()); + assertEquals(createEntry(null), filterAllCells.filter(userEntry)); + // let's set the filter empty entry flag to true now for the above case + filterAllCells.setFilterEmptyEntry(true); // since WALCellFilter filter all cells, whole entry should be filtered assertEquals(null, filterAllCells.filter(userEntry)); } From 1b16c3a5b02c7a715200bcb56cc51c81fc1a46fc Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Sun, 17 Nov 2019 22:31:17 -0800 Subject: [PATCH 3/8] Adding the filter empty entry into peer configuration --- .../org/apache/hadoop/hbase/HConstants.java | 3 ++ .../replication/BaseReplicationEndpoint.java | 9 +++--- .../replication/ChainWALEntryFilter.java | 28 +++++++++++++------ .../regionserver/ReplicationSource.java | 2 +- .../TestReplicationWALEntryFilters.java | 8 ++++-- 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ce482ed022e7..6f9663e7cb9b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1295,6 +1295,9 @@ public enum OperationStatusCode { /** Configuration key for SplitLog manager timeout */ public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout"; + /** To allow the empty entries to get filtered which have no cells or all cells got filtered though WALCellFilter */ + public static final String HBASE_REPLICATION_WAL_FILTER_EMPTY_ENTRY = "hbase.replication.wal.filteremptyentry"; + /** * Configuration keys for Bucket cache */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index 56576a6cf3e1..a60818b46653 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -41,15 +41,16 @@ public abstract class BaseReplicationEndpoint extends AbstractService public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY = "hbase.replication.source.custom.walentryfilters"; protected Context ctx; + private ReplicationPeer replicationPeer; @Override public void init(Context context) throws IOException { this.ctx = context; if (this.ctx != null){ - ReplicationPeer peer = this.ctx.getReplicationPeer(); - if (peer != null){ - peer.registerPeerConfigListener(this); + replicationPeer = this.ctx.getReplicationPeer(); + if (replicationPeer != null){ + replicationPeer.registerPeerConfigListener(this); } else { LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() + " because there's no such peer"); @@ -91,7 +92,7 @@ public WALEntryFilter getWALEntryfilter() { } } } - return filters.isEmpty() ? null : new ChainWALEntryFilter(filters); + return filters.isEmpty() ? null : new ChainWALEntryFilter(filters, this.replicationPeer); } /** Returns a WALEntryFilter for checking the scope. Subclasses can diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index 036cf53a1cdb..0dd8cb6f66f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; +import static org.apache.hadoop.hbase.HConstants.HBASE_REPLICATION_WAL_FILTER_EMPTY_ENTRY; + /** * A {@link WALEntryFilter} which contains multiple filters and applies them * in chain order @@ -36,17 +38,25 @@ public class ChainWALEntryFilter implements WALEntryFilter { private final WALEntryFilter[] filters; private WALCellFilter[] cellFilters; + private ReplicationPeerConfig peerConfig; // To allow the empty entries to get filtered, we want to this optional flag to decide // if we want to filter the entries which have no cells or all cells got filtered though WALCellFilter - private boolean filterEmptyEntry = false; + private String filterEmptyEntry; public ChainWALEntryFilter(WALEntryFilter...filters) { this.filters = filters; initCellFilters(); } - public ChainWALEntryFilter(List filters) { + public ChainWALEntryFilter(List filters, ReplicationPeer replicationPeer) { + if (replicationPeer != null) { + peerConfig = replicationPeer.getPeerConfig(); + if (peerConfig != null) { + filterEmptyEntry = peerConfig.getConfiguration().get(HBASE_REPLICATION_WAL_FILTER_EMPTY_ENTRY); + } + } + ArrayList rawFilters = new ArrayList<>(filters.size()); // flatten the chains for (WALEntryFilter filter : filters) { @@ -72,14 +82,15 @@ public void initCellFilters() { @Override public Entry filter(Entry entry) { - if (entry == null) { - return null; - } + for (WALEntryFilter filter : filters) { + if (entry == null) { + return null; + } entry = filter.filter(entry); } filterCells(entry); - if (filterEmptyEntry && entry.getEdit().isEmpty()) { + if (shouldFilterEmptyEntry() && entry != null && entry.getEdit().isEmpty()) { return null; } return entry; @@ -102,7 +113,8 @@ private Cell filterCell(Entry entry, Cell cell) { return cell; } - public void setFilterEmptyEntry(boolean filterEmptyEntry) { - this.filterEmptyEntry = filterEmptyEntry; + public boolean shouldFilterEmptyEntry() { + return (filterEmptyEntry != null + && filterEmptyEntry.equalsIgnoreCase("true")); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 32739b1b4318..704602a229d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -301,7 +301,7 @@ private void initializeWALEntryFilter(UUID peerClusterId) { filters.add(filterFromEndpoint); } filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); - this.walEntryFilter = new ChainWALEntryFilter(filters); + this.walEntryFilter = new ChainWALEntryFilter(filters, this.replicationPeer); } private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index c98c9e5ff1d5..90487fb4199c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -48,6 +48,7 @@ import org.junit.experimental.categories.Category; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.mockito.Mockito; @Category({ ReplicationTests.class, SmallTests.class }) public class TestReplicationWALEntryFilters { @@ -195,11 +196,12 @@ public void testChainWALEntryWithCellFilter() { assertEquals(userEntry2, filterSomeCells.filter(userEntry2)); ChainWALEntryFilter filterAllCells = new ChainWALEntryFilter(new FilterAllCellsWALCellFilter()); - assertEquals(createEntry(null), filterAllCells.filter(userEntry)); + ChainWALEntryFilter spyFilterAllCells = Mockito.spy(filterAllCells); + assertEquals(createEntry(null), spyFilterAllCells.filter(userEntry)); // let's set the filter empty entry flag to true now for the above case - filterAllCells.setFilterEmptyEntry(true); + Mockito.doReturn(true).when(spyFilterAllCells).shouldFilterEmptyEntry(); // since WALCellFilter filter all cells, whole entry should be filtered - assertEquals(null, filterAllCells.filter(userEntry)); + assertEquals(null, spyFilterAllCells.filter(userEntry)); } @Test From ffb78d39af4069820e778c2805dcfae6be5d1ad7 Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Thu, 21 Nov 2019 22:57:15 -0800 Subject: [PATCH 4/8] Removing the set filter empty entry from the replication peer config and a aub class for more flexibility. --- .../org/apache/hadoop/hbase/HConstants.java | 3 - .../replication/BaseReplicationEndpoint.java | 9 ++- .../replication/ChainWALEntryFilter.java | 36 ++++------- .../CustomChainWALEntryFilter.java | 61 +++++++++++++++++++ .../regionserver/ReplicationSource.java | 2 +- .../TestReplicationWALEntryFilters.java | 12 ++-- 6 files changed, 82 insertions(+), 41 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 6f9663e7cb9b..ce482ed022e7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1295,9 +1295,6 @@ public enum OperationStatusCode { /** Configuration key for SplitLog manager timeout */ public static final String HBASE_SPLITLOG_MANAGER_TIMEOUT = "hbase.splitlog.manager.timeout"; - /** To allow the empty entries to get filtered which have no cells or all cells got filtered though WALCellFilter */ - public static final String HBASE_REPLICATION_WAL_FILTER_EMPTY_ENTRY = "hbase.replication.wal.filteremptyentry"; - /** * Configuration keys for Bucket cache */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index a60818b46653..56576a6cf3e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -41,16 +41,15 @@ public abstract class BaseReplicationEndpoint extends AbstractService public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY = "hbase.replication.source.custom.walentryfilters"; protected Context ctx; - private ReplicationPeer replicationPeer; @Override public void init(Context context) throws IOException { this.ctx = context; if (this.ctx != null){ - replicationPeer = this.ctx.getReplicationPeer(); - if (replicationPeer != null){ - replicationPeer.registerPeerConfigListener(this); + ReplicationPeer peer = this.ctx.getReplicationPeer(); + if (peer != null){ + peer.registerPeerConfigListener(this); } else { LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() + " because there's no such peer"); @@ -92,7 +91,7 @@ public WALEntryFilter getWALEntryfilter() { } } } - return filters.isEmpty() ? null : new ChainWALEntryFilter(filters, this.replicationPeer); + return filters.isEmpty() ? null : new ChainWALEntryFilter(filters); } /** Returns a WALEntryFilter for checking the scope. Subclasses can diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java index 0dd8cb6f66f4..ae3c74ad4753 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEntryFilter.java @@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; -import static org.apache.hadoop.hbase.HConstants.HBASE_REPLICATION_WAL_FILTER_EMPTY_ENTRY; - /** * A {@link WALEntryFilter} which contains multiple filters and applies them * in chain order @@ -38,25 +36,13 @@ public class ChainWALEntryFilter implements WALEntryFilter { private final WALEntryFilter[] filters; private WALCellFilter[] cellFilters; - private ReplicationPeerConfig peerConfig; - - // To allow the empty entries to get filtered, we want to this optional flag to decide - // if we want to filter the entries which have no cells or all cells got filtered though WALCellFilter - private String filterEmptyEntry; public ChainWALEntryFilter(WALEntryFilter...filters) { this.filters = filters; initCellFilters(); } - public ChainWALEntryFilter(List filters, ReplicationPeer replicationPeer) { - if (replicationPeer != null) { - peerConfig = replicationPeer.getPeerConfig(); - if (peerConfig != null) { - filterEmptyEntry = peerConfig.getConfiguration().get(HBASE_REPLICATION_WAL_FILTER_EMPTY_ENTRY); - } - } - + public ChainWALEntryFilter(List filters) { ArrayList rawFilters = new ArrayList<>(filters.size()); // flatten the chains for (WALEntryFilter filter : filters) { @@ -82,21 +68,26 @@ public void initCellFilters() { @Override public Entry filter(Entry entry) { + entry = filterEntry(entry); + if (entry == null) { + return null; + } + filterCells(entry); + return entry; + } + + protected Entry filterEntry(Entry entry) { for (WALEntryFilter filter : filters) { if (entry == null) { return null; } entry = filter.filter(entry); } - filterCells(entry); - if (shouldFilterEmptyEntry() && entry != null && entry.getEdit().isEmpty()) { - return null; - } return entry; } - private void filterCells(Entry entry) { + protected void filterCells(Entry entry) { if (entry == null || cellFilters.length == 0) { return; } @@ -112,9 +103,4 @@ private Cell filterCell(Entry entry, Cell cell) { } return cell; } - - public boolean shouldFilterEmptyEntry() { - return (filterEmptyEntry != null - && filterEmptyEntry.equalsIgnoreCase("true")); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java new file mode 100644 index 000000000000..8d8f3ebf8f88 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.wal.WAL; +import java.util.List; + +/** + * A {@link ChainWALEntryFilter} for providing more flexible options + * */ +public class CustomChainWALEntryFilter extends ChainWALEntryFilter { + + private boolean filterEmptyEntry = false; + + public CustomChainWALEntryFilter(final WALEntryFilter... filters) { + super(filters); + } + + public CustomChainWALEntryFilter(final List filters) { + super(filters); + } + + @Override + public WAL.Entry filter(WAL.Entry entry) { + filterEntry(entry); + if (entry == null) { + return null; + } + + filterCells(entry); + if (filterEmptyEntry && entry != null && entry.getEdit().isEmpty()) { + return null; + } + return entry; + } + + /** + * To allow the empty entries to get filtered, we want to set this optional flag to decide + * if we want to filter the entries which have no cells or all cells got filtered + * though {@link WALCellFilter}. + * @param filterEmptyEntry flag + */ + public void setFilterEmptyEntry(final boolean filterEmptyEntry) { + this.filterEmptyEntry = filterEmptyEntry; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 704602a229d2..32739b1b4318 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -301,7 +301,7 @@ private void initializeWALEntryFilter(UUID peerClusterId) { filters.add(filterFromEndpoint); } filters.add(new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint)); - this.walEntryFilter = new ChainWALEntryFilter(filters, this.replicationPeer); + this.walEntryFilter = new ChainWALEntryFilter(filters); } private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 90487fb4199c..a60b4728aabe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -48,7 +48,6 @@ import org.junit.experimental.categories.Category; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.mockito.Mockito; @Category({ ReplicationTests.class, SmallTests.class }) public class TestReplicationWALEntryFilters { @@ -187,7 +186,7 @@ public Cell filterCell(Entry entry, Cell cell) { @Test public void testChainWALEntryWithCellFilter() { Entry userEntry = createEntry(null, a, b, c); - ChainWALEntryFilter filterSomeCells = new ChainWALEntryFilter(new FilterSomeCellsWALCellFilter()); + CustomChainWALEntryFilter filterSomeCells = new CustomChainWALEntryFilter(new FilterSomeCellsWALCellFilter()); // since WALCellFilter filter cells with rowkey 'a' assertEquals(createEntry(null, b,c), filterSomeCells.filter(userEntry)); @@ -195,13 +194,12 @@ public void testChainWALEntryWithCellFilter() { // since there is no cell to get filtered, nothing should get filtered assertEquals(userEntry2, filterSomeCells.filter(userEntry2)); - ChainWALEntryFilter filterAllCells = new ChainWALEntryFilter(new FilterAllCellsWALCellFilter()); - ChainWALEntryFilter spyFilterAllCells = Mockito.spy(filterAllCells); - assertEquals(createEntry(null), spyFilterAllCells.filter(userEntry)); + CustomChainWALEntryFilter filterAllCells = new CustomChainWALEntryFilter(new FilterAllCellsWALCellFilter()); + assertEquals(createEntry(null), filterAllCells.filter(userEntry)); // let's set the filter empty entry flag to true now for the above case - Mockito.doReturn(true).when(spyFilterAllCells).shouldFilterEmptyEntry(); + filterAllCells.setFilterEmptyEntry(true); // since WALCellFilter filter all cells, whole entry should be filtered - assertEquals(null, spyFilterAllCells.filter(userEntry)); + assertEquals(null, filterAllCells.filter(userEntry)); } @Test From 02fdfec0a146f6f56eed4820089f72d66c2b507b Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Fri, 22 Nov 2019 09:57:21 -0800 Subject: [PATCH 5/8] nit fix --- .../hadoop/hbase/replication/CustomChainWALEntryFilter.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java index 8d8f3ebf8f88..f1f20065d384 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hbase.replication; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + import java.util.List; /** * A {@link ChainWALEntryFilter} for providing more flexible options - * */ + */ public class CustomChainWALEntryFilter extends ChainWALEntryFilter { private boolean filterEmptyEntry = false; @@ -53,8 +55,10 @@ public WAL.Entry filter(WAL.Entry entry) { * To allow the empty entries to get filtered, we want to set this optional flag to decide * if we want to filter the entries which have no cells or all cells got filtered * though {@link WALCellFilter}. + * * @param filterEmptyEntry flag */ + @VisibleForTesting public void setFilterEmptyEntry(final boolean filterEmptyEntry) { this.filterEmptyEntry = filterEmptyEntry; } From cd3f767edadc1f9cbabcd851130f0482d0a27a77 Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Fri, 22 Nov 2019 15:01:06 -0800 Subject: [PATCH 6/8] Adding an InterfaceAudience --- .../hadoop/hbase/replication/CustomChainWALEntryFilter.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java index f1f20065d384..38bcedd191a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.hbase.replication; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.yetus.audience.InterfaceAudience; import java.util.List; /** * A {@link ChainWALEntryFilter} for providing more flexible options */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public class CustomChainWALEntryFilter extends ChainWALEntryFilter { private boolean filterEmptyEntry = false; From fec27dfc59ed851adb3b457274c7823764c85703 Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Mon, 2 Dec 2019 14:01:25 -0800 Subject: [PATCH 7/8] Addressing comments and fixing style checking --- ...ter.java => ChainWALEmptyEntryFilter.java} | 16 ++++------- .../TestReplicationWALEntryFilters.java | 27 ++++++++++++++++--- 2 files changed, 29 insertions(+), 14 deletions(-) rename hbase-server/src/main/java/org/apache/hadoop/hbase/replication/{CustomChainWALEntryFilter.java => ChainWALEmptyEntryFilter.java} (86%) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEmptyEntryFilter.java similarity index 86% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEmptyEntryFilter.java index 38bcedd191a1..c623541cf65e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/CustomChainWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEmptyEntryFilter.java @@ -17,37 +17,31 @@ */ package org.apache.hadoop.hbase.replication; +import java.util.List; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; -import java.util.List; - /** * A {@link ChainWALEntryFilter} for providing more flexible options */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) -public class CustomChainWALEntryFilter extends ChainWALEntryFilter { +public class ChainWALEmptyEntryFilter extends ChainWALEntryFilter { private boolean filterEmptyEntry = false; - public CustomChainWALEntryFilter(final WALEntryFilter... filters) { + public ChainWALEmptyEntryFilter(final WALEntryFilter... filters) { super(filters); } - public CustomChainWALEntryFilter(final List filters) { + public ChainWALEmptyEntryFilter(final List filters) { super(filters); } @Override public WAL.Entry filter(WAL.Entry entry) { - filterEntry(entry); - if (entry == null) { - return null; - } - - filterCells(entry); + entry = super.filter(entry); if (filterEmptyEntry && entry != null && entry.getEdit().isEmpty()) { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index a60b4728aabe..bae9625eaaca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -163,7 +163,8 @@ public Entry filter(Entry entry) { @Override public Cell filterCell(Entry entry, Cell cell) { - if (Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()).equals("a")) { + if (Bytes.toString( + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()).equals("a")) { return null; } else { return cell; @@ -186,7 +187,8 @@ public Cell filterCell(Entry entry, Cell cell) { @Test public void testChainWALEntryWithCellFilter() { Entry userEntry = createEntry(null, a, b, c); - CustomChainWALEntryFilter filterSomeCells = new CustomChainWALEntryFilter(new FilterSomeCellsWALCellFilter()); + ChainWALEntryFilter filterSomeCells = + new ChainWALEntryFilter(new FilterSomeCellsWALCellFilter()); // since WALCellFilter filter cells with rowkey 'a' assertEquals(createEntry(null, b,c), filterSomeCells.filter(userEntry)); @@ -194,7 +196,26 @@ public void testChainWALEntryWithCellFilter() { // since there is no cell to get filtered, nothing should get filtered assertEquals(userEntry2, filterSomeCells.filter(userEntry2)); - CustomChainWALEntryFilter filterAllCells = new CustomChainWALEntryFilter(new FilterAllCellsWALCellFilter()); + // since we filter all the cells, we should get empty entry + ChainWALEntryFilter filterAllCells = + new ChainWALEntryFilter(new FilterAllCellsWALCellFilter()); + assertEquals(createEntry(null), filterAllCells.filter(userEntry)); + } + + @Test + public void testChainWALEmptyEntryWithCellFilter() { + Entry userEntry = createEntry(null, a, b, c); + ChainWALEmptyEntryFilter filterSomeCells = + new ChainWALEmptyEntryFilter(new FilterSomeCellsWALCellFilter()); + // since WALCellFilter filter cells with rowkey 'a' + assertEquals(createEntry(null, b,c), filterSomeCells.filter(userEntry)); + + Entry userEntry2 = createEntry(null, b, c, d); + // since there is no cell to get filtered, nothing should get filtered + assertEquals(userEntry2, filterSomeCells.filter(userEntry2)); + + ChainWALEmptyEntryFilter filterAllCells = + new ChainWALEmptyEntryFilter(new FilterAllCellsWALCellFilter()); assertEquals(createEntry(null), filterAllCells.filter(userEntry)); // let's set the filter empty entry flag to true now for the above case filterAllCells.setFilterEmptyEntry(true); From 5b28db970bb5fa5719e7ef9d1f8cf4f8ac35de00 Mon Sep 17 00:00:00 2001 From: Sandeep Pal Date: Mon, 2 Dec 2019 23:54:10 -0800 Subject: [PATCH 8/8] fixing check style --- .../hadoop/hbase/replication/ChainWALEmptyEntryFilter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEmptyEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEmptyEntryFilter.java index c623541cf65e..19fd0c77e7e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEmptyEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ChainWALEmptyEntryFilter.java @@ -20,8 +20,8 @@ import java.util.List; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** * A {@link ChainWALEntryFilter} for providing more flexible options