Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,21 @@ public long skip(final long len) throws IOException {
}

/**
* Call the delegate's {@code available()} method.
* @return the delegate's available bytes if the current position is less than the
* @return the remaining bytes within the bound if the current position is less than the
* limit, or 0 otherwise.
*/
@Override
public int available() throws IOException {
if (pos >= limit) {
return 0;
}
int available = in.available();
return (int) Math.min(available, limit - pos);
// Do not call the delegate's available() method. Data in a bounded input stream is assumed
// available up to the limit and that is the contract we have with our callers. Regardless
// of what we do here, read() and skip() will behave as expected when EOF is encountered if
// the underlying stream is closed early or otherwise could not provide enough bytes.
// Note: This class is used to supply buffers to compression codecs during WAL tailing and
// successful decompression depends on this behavior.
return (int) (limit - pos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, something like this in a wrapping input stream is what I had in my mind but doing this in BoundedDelegatingInputStream is even more elegant and clean..

nit: Looks like javadoc needs updation..
I think this behavior is very subtle and not obvious, would be great to back it up with a small comment and how tailing reads depend on it..

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;

import java.io.IOException;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(MediumTests.class)
public class TestReplicationCompressedWAL extends TestReplicationBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationCompressedWAL.class);

static final Logger LOG = LoggerFactory.getLogger(TestReplicationCompressedWAL.class);
static final int NUM_BATCHES = 20;
static final int NUM_ROWS_PER_BATCH = 100;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
TestReplicationBase.setUpBeforeClass();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TestReplicationBase.tearDownAfterClass();
}

@Test
public void testMultiplePuts() throws Exception {
runMultiplePutTest();
}

protected static void runMultiplePutTest() throws IOException, InterruptedException {
for (int i = 0; i < NUM_BATCHES; i++) {
putBatch(i);
getBatch(i);
}
}

protected static void getBatch(int batch) throws IOException, InterruptedException {
for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) {
byte[] row = getRowKey(batch, i);
Get get = new Get(row);
for (int j = 0; j < NB_RETRIES; j++) {
if (j == NB_RETRIES - 1) {
fail("Waited too much time for replication");
}
Result res = htable2.get(get);
if (res.isEmpty()) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(row, res.value());
break;
}
}
}
}

protected static byte[] getRowKey(int batch, int count) {
return Bytes.toBytes("row" + ((batch * NUM_ROWS_PER_BATCH) + count));
}

protected static void putBatch(int batch) throws IOException {
for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) {
byte[] row = getRowKey(batch, i);
Put put = new Put(row);
put.addColumn(famName, row, row);
htable1.put(put);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(MediumTests.class)
public class TestReplicationValueCompressedWAL extends TestReplicationBase {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationValueCompressedWAL.class);

static final Logger LOG = LoggerFactory.getLogger(TestReplicationValueCompressedWAL.class);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
CONF1.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
TestReplicationBase.setUpBeforeClass();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TestReplicationBase.tearDownAfterClass();
}

@Test
public void testMultiplePuts() throws Exception {
TestReplicationCompressedWAL.runMultiplePutTest();
}

}