diff --git a/Makefile b/Makefile index 35f973165..b2191b759 100644 --- a/Makefile +++ b/Makefile @@ -7,14 +7,16 @@ compile: test: unit-test integration-test +test-ts: unit-test integration-test-timeseries + unit-test: mvn test integration-test: - mvn -Pitest,default -Dcom.basho.riak.2i=true -Dcom.basho.riak.yokozuna=true -Dcom.basho.riak.buckettype=true -Dcom.basho.riak.crdt=true -Dcom.basho.riak.lifecycle=true -Dcom.basho.riak.pbcport=$(RIAK_PORT) verify + mvn -Pitest,default -Dcom.basho.riak.pbcport=$(RIAK_PORT) verify integration-test-timeseries: - mvn -Pitest,default -Dcom.basho.riak.buckettype=true -Dcom.basho.riak.crdt=true -Dcom.basho.riak.lifecycle=true -Dcom.basho.riak.timeseries=true -Dcom.basho.riak.pbcport=$(RIAK_PORT) verify + mvn -Pitest,default -Dcom.basho.riak.timeseries=true -Dcom.basho.riak.pbcport=$(RIAK_PORT) verify integration-test-security: mvn -Pitest,default -Dcom.basho.riak.security=true -Dcom.basho.riak.security.clientcert=true -Dcom.basho.riak.pbcport=$(RIAK_PORT) test-compile failsafe:integration-test diff --git a/buildbot/Makefile b/buildbot/Makefile index a8720bfac..f924fedef 100644 --- a/buildbot/Makefile +++ b/buildbot/Makefile @@ -47,7 +47,11 @@ test: test-normal test-security test-normal: $(RIAK_ADMIN) security disable - @cd ..; mvn -Pitest,default -Dcom.basho.riak.2i=true -Dcom.basho.riak.yokozuna=true -Dcom.basho.riak.buckettype=true -Dcom.basho.riak.crdt=true -Dcom.basho.riak.lifecycle=true verify + @cd ..; mvn -Pitest,default verify + +test-timeseries: + $(RIAK_ADMIN) security disable + mvn -Pitest,default -Dcom.basho.riak.timeseries=true verify test-security: $(RIAK_ADMIN) security enable diff --git a/src/main/java/com/basho/riak/client/api/commands/CoreFutureAdapter.java b/src/main/java/com/basho/riak/client/api/commands/CoreFutureAdapter.java index ba46b67ca..9753fe109 100644 --- a/src/main/java/com/basho/riak/client/api/commands/CoreFutureAdapter.java +++ b/src/main/java/com/basho/riak/client/api/commands/CoreFutureAdapter.java @@ -34,12 +34,12 @@ public abstract class CoreFutureAdapter extends ListenableFuture implements RiakFutureListener { private final RiakFuture coreFuture; - + public CoreFutureAdapter(RiakFuture coreFuture) { this.coreFuture = coreFuture; } - + @Override public boolean cancel(boolean mayInterruptIfRunning) { @@ -71,7 +71,7 @@ public T2 getNow() return null; } } - + @Override public boolean isCancelled() { @@ -91,9 +91,9 @@ public void await() throws InterruptedException } @Override - public void await(long timeout, TimeUnit unit) throws InterruptedException + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - coreFuture.await(timeout, unit); + return coreFuture.await(timeout, unit); } @Override @@ -113,13 +113,13 @@ public S2 getQueryInfo() { return convertQueryInfo(coreFuture.getQueryInfo()); } - + @Override public void handle(RiakFuture f) { notifyListeners(); } - + protected abstract T2 convertResponse(T coreResponse); protected abstract S2 convertQueryInfo(S coreQueryInfo); } diff --git a/src/main/java/com/basho/riak/client/api/commands/indexes/BigIntIndexQuery.java b/src/main/java/com/basho/riak/client/api/commands/indexes/BigIntIndexQuery.java index 4114facb4..43fb9d185 100644 --- a/src/main/java/com/basho/riak/client/api/commands/indexes/BigIntIndexQuery.java +++ b/src/main/java/com/basho/riak/client/api/commands/indexes/BigIntIndexQuery.java @@ -32,9 +32,9 @@ * *

* A BigIntIndexQuery is used when you are using integers for your 2i keys. The - * parameters are provided as BigInteger values. Use this query if your - * 2i key values exceed that - * which can be stored in a (64 bit) long, + * parameters are provided as BigInteger values. Use this query if your + * 2i key values exceed that + * which can be stored in a (64 bit) long, *

*
  * {@code
@@ -54,7 +54,7 @@ protected IndexConverter getConverter()
     {
         return converter;
     }
-    
+
     protected BigIntIndexQuery(Init builder)
     {
         super(builder);
@@ -78,19 +78,19 @@ public BinaryValue convert(BigInteger input)
             }
         };
     }
-    
+
     @Override
     protected RiakFuture executeAsync(RiakCluster cluster)
     {
         RiakFuture coreFuture =
             executeCoreAsync(cluster);
-        
+
         BigIntQueryFuture future = new BigIntQueryFuture(coreFuture);
         coreFuture.addListener(future);
         return future;
-            
+
     }
-    
+
     protected final class BigIntQueryFuture extends CoreFutureAdapter
     {
         public BigIntQueryFuture(RiakFuture coreFuture)
@@ -110,7 +110,7 @@ protected BigIntIndexQuery convertQueryInfo(SecondaryIndexQueryOperation.Query c
             return BigIntIndexQuery.this;
         }
     }
-    
+
     protected static abstract class Init> extends SecondaryIndexQuery.Init
     {
 
@@ -135,7 +135,7 @@ public T withRegexTermFilter(String filter)
             throw new IllegalArgumentException("Cannot use term filter with _int query");
         }
     }
-    
+
     /**
      * Builder used to construct a BigIntIndexQuery.
      */
@@ -151,7 +151,7 @@ public static class Builder extends Init
          * @param indexName The name of the index in Riak.
          * @param coverContext cover context.
          */
-        public Builder(Namespace namespace, String indexName, byte[] coverContext)
+        public Builder(Namespace namespace, String indexName,  byte[] coverContext)
         {
             super(namespace, indexName, coverContext);
         }
@@ -160,7 +160,7 @@ public Builder(Namespace namespace, String indexName, byte[] coverContext)
          * Construct a Builder for a BigIntIndexQuery with a range.
          * 

* Note that your index name should not include the Riak {@literal _int} or - * {@literal _bin} extension. + * {@literal _bin} extension. *

* @param namespace The namespace in Riak to query. * @param indexName The name of the index in Riak. @@ -176,7 +176,7 @@ public Builder(Namespace namespace, String indexName, BigInteger start, BigInteg * Construct a Builder for a BigIntIndexQuery with a single 2i key. *

* Note that your index name should not include the Riak {@literal _int} or - * {@literal _bin} extension. + * {@literal _bin} extension. *

* @param namespace The namespace in Riak to query. * @param indexName The name of the index in Riak. @@ -202,14 +202,14 @@ public BigIntIndexQuery build() return new BigIntIndexQuery(this); } } - + public static class Response extends SecondaryIndexQuery.Response { protected Response(Namespace queryLocation, SecondaryIndexQueryOperation.Response coreResponse, IndexConverter converter) { super(queryLocation, coreResponse, converter); } - + @Override public List getEntries() { diff --git a/src/main/java/com/basho/riak/client/api/commands/indexes/BinIndexQuery.java b/src/main/java/com/basho/riak/client/api/commands/indexes/BinIndexQuery.java index c7e55df4b..86d00702a 100644 --- a/src/main/java/com/basho/riak/client/api/commands/indexes/BinIndexQuery.java +++ b/src/main/java/com/basho/riak/client/api/commands/indexes/BinIndexQuery.java @@ -22,6 +22,7 @@ import com.basho.riak.client.core.operations.SecondaryIndexQueryOperation; import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace; +import com.basho.riak.client.core.query.indexes.IndexNames; import com.basho.riak.client.core.util.BinaryValue; import com.basho.riak.client.core.util.DefaultCharset; @@ -120,17 +121,28 @@ protected static abstract class Init> extends SecondaryI public Init(Namespace namespace, String indexName, S start, S end) { - super(namespace, indexName + Type._BIN, start, end); + super(namespace, generateIndexName(indexName), start, end); } public Init(Namespace namespace, String indexName, S match) { - super(namespace, indexName + Type._BIN, match); + super(namespace, generateIndexName(indexName), match); } public Init(Namespace namespace, String indexName, byte[] coverContext) { - super(namespace, indexName + Type._BIN, coverContext); + super(namespace, generateIndexName(indexName), coverContext); + } + + private static String generateIndexName(String baseIndexName) + { + if(IndexNames.BUCKET.equalsIgnoreCase(baseIndexName) || + IndexNames.KEY.equalsIgnoreCase(baseIndexName)) + { + return baseIndexName; + } + + return baseIndexName + Type._BIN; } T withCharacterSet(Charset charset) @@ -278,6 +290,10 @@ public String convert(BinaryValue input) @Override public BinaryValue convert(String input) { + if (input == null ) + { + return null; + } return BinaryValue.create(input, charset); } diff --git a/src/main/java/com/basho/riak/client/api/commands/indexes/BucketIndexQuery.java b/src/main/java/com/basho/riak/client/api/commands/indexes/BucketIndexQuery.java index a887c564e..730209403 100644 --- a/src/main/java/com/basho/riak/client/api/commands/indexes/BucketIndexQuery.java +++ b/src/main/java/com/basho/riak/client/api/commands/indexes/BucketIndexQuery.java @@ -2,6 +2,7 @@ import com.basho.riak.client.core.operations.SecondaryIndexQueryOperation; import com.basho.riak.client.core.query.Namespace; +import com.basho.riak.client.core.query.indexes.IndexNames; import com.basho.riak.client.core.util.BinaryValue; /** @@ -19,18 +20,23 @@ * @author Alex Moore * @since 2.0.7 */ -public class BucketIndexQuery extends RawIndexQuery +public class BucketIndexQuery extends BinIndexQuery { - private BucketIndexQuery(Init builder) + private BucketIndexQuery(Init builder) { super(builder); } - public static class Builder extends SecondaryIndexQuery.Init + public static class Builder extends BinIndexQuery.Init { public Builder(Namespace namespace) { - super(namespace, "$bucket", namespace.getBucketName()); + super(namespace, IndexNames.BUCKET, namespace.getBucketName().toStringUtf8()); + } + + public Builder(Namespace namespace, byte[] coverContext) + { + super(namespace, IndexNames.BUCKET, coverContext); } @Override diff --git a/src/main/java/com/basho/riak/client/api/commands/indexes/IntIndexQuery.java b/src/main/java/com/basho/riak/client/api/commands/indexes/IntIndexQuery.java index 83839a475..2bc867879 100644 --- a/src/main/java/com/basho/riak/client/api/commands/indexes/IntIndexQuery.java +++ b/src/main/java/com/basho/riak/client/api/commands/indexes/IntIndexQuery.java @@ -83,11 +83,11 @@ protected RiakFuture executeAsync(RiakCluster cluster) { RiakFuture coreFuture = executeCoreAsync(cluster); - + IntQueryFuture future = new IntQueryFuture(coreFuture); coreFuture.addListener(future); return future; - + } protected final class IntQueryFuture extends CoreFutureAdapter @@ -96,7 +96,7 @@ public IntQueryFuture(RiakFuture> extends SecondaryIndexQuery.Init { @@ -127,7 +127,7 @@ public Init(Namespace namespace, String indexName, S match) { super(namespace, indexName + Type._INT, match); } - + @Override public T withRegexTermFilter(String filter) { @@ -160,7 +160,7 @@ public Builder(Namespace namespace, String indexName, byte[] coverContext) * Construct a Builder for a IntIndexQuery with a range. *

* Note that your index name should not include the Riak {@literal _int} or - * {@literal _bin} extension. + * {@literal _bin} extension. *

* @param namespace The namespace in Riak to query. * @param indexName The name of the index in Riak. @@ -176,7 +176,7 @@ public Builder(Namespace namespace, String indexName, Long start, Long end) * Construct a Builder for a IntIndexQuery with a single 2i key. *

* Note that your index name should not include the Riak {@literal _int} or - * {@literal _bin} extension. + * {@literal _bin} extension. *

* @param namespace The namespace in Riak to query. * @param indexName The name of the index in Riak. @@ -202,14 +202,14 @@ public IntIndexQuery build() return new IntIndexQuery(this); } } - + public static class Response extends SecondaryIndexQuery.Response { protected Response(Namespace queryLocation, SecondaryIndexQueryOperation.Response coreResponse, IndexConverter converter) { super(queryLocation, coreResponse, converter); } - + @Override public List getEntries() { diff --git a/src/main/java/com/basho/riak/client/api/commands/indexes/KeyIndexQuery.java b/src/main/java/com/basho/riak/client/api/commands/indexes/KeyIndexQuery.java index aa08128cb..bfebfd24a 100644 --- a/src/main/java/com/basho/riak/client/api/commands/indexes/KeyIndexQuery.java +++ b/src/main/java/com/basho/riak/client/api/commands/indexes/KeyIndexQuery.java @@ -2,6 +2,7 @@ import com.basho.riak.client.core.operations.SecondaryIndexQueryOperation; import com.basho.riak.client.core.query.Namespace; +import com.basho.riak.client.core.query.indexes.IndexNames; import com.basho.riak.client.core.util.BinaryValue; import com.basho.riak.client.core.util.DefaultCharset; @@ -39,7 +40,7 @@ public Builder(Namespace namespace, String start, String end) public Builder(Namespace namespace, BinaryValue start, BinaryValue end) { - super(namespace, "$key", start, end); + super(namespace, IndexNames.KEY, start, end); } @Override diff --git a/src/main/java/com/basho/riak/client/api/commands/indexes/SecondaryIndexQuery.java b/src/main/java/com/basho/riak/client/api/commands/indexes/SecondaryIndexQuery.java index 07e264044..b6c7dbd11 100644 --- a/src/main/java/com/basho/riak/client/api/commands/indexes/SecondaryIndexQuery.java +++ b/src/main/java/com/basho/riak/client/api/commands/indexes/SecondaryIndexQuery.java @@ -550,7 +550,7 @@ public abstract static class Response final protected IndexConverter converter; final protected SecondaryIndexQueryOperation.Response coreResponse; final protected Namespace queryLocation; - + protected Response(Namespace queryLocation, SecondaryIndexQueryOperation.Response coreResponse, IndexConverter converter) diff --git a/src/main/java/com/basho/riak/client/api/commands/kv/FullBucketRead.java b/src/main/java/com/basho/riak/client/api/commands/kv/FullBucketRead.java index 4acfa53e0..a2830c3af 100644 --- a/src/main/java/com/basho/riak/client/api/commands/kv/FullBucketRead.java +++ b/src/main/java/com/basho/riak/client/api/commands/kv/FullBucketRead.java @@ -23,6 +23,7 @@ import com.basho.riak.client.core.operations.SecondaryIndexQueryOperation; import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace; +import com.basho.riak.client.core.query.indexes.IndexNames; import com.basho.riak.client.core.util.BinaryValue; import com.google.protobuf.ByteString; @@ -109,12 +110,12 @@ private static class BuilderFullBucketRead2i extends SecondaryIndexQuery.Init responses) diff --git a/src/main/java/com/basho/riak/client/core/RiakCluster.java b/src/main/java/com/basho/riak/client/core/RiakCluster.java index 72e1fe249..0b2866ba3 100644 --- a/src/main/java/com/basho/riak/client/core/RiakCluster.java +++ b/src/main/java/com/basho/riak/client/core/RiakCluster.java @@ -47,7 +47,6 @@ public class RiakCluster implements OperationRetrier, NodeStateListener enum State { CREATED, RUNNING, QUEUING, SHUTTING_DOWN, SHUTDOWN } private final Logger logger = LoggerFactory.getLogger(RiakCluster.class); private final int executionAttempts; - private final int operationQueueMaxDepth; private final NodeManager nodeManager; private final AtomicInteger inFlightCount = new AtomicInteger(); private final ScheduledExecutorService executor; @@ -57,7 +56,8 @@ enum State { CREATED, RUNNING, QUEUING, SHUTTING_DOWN, SHUTDOWN } private final LinkedBlockingQueue retryQueue = new LinkedBlockingQueue(); private final boolean queueOperations; - private final LinkedBlockingDeque operationQueue; + private final ConcurrentLinkedDeque operationQueue; + private final RiakNode.Sync operationQueuePermits; private final List stateListeners = Collections.synchronizedList(new LinkedList()); @@ -119,8 +119,8 @@ private RiakCluster(Builder builder) if (this.queueOperations) { - this.operationQueueMaxDepth = builder.operationQueueMaxDepth; - this.operationQueue = new LinkedBlockingDeque(); + this.operationQueue = new ConcurrentLinkedDeque<>(); + this.operationQueuePermits = new RiakNode.Sync(builder.operationQueueMaxDepth); for (RiakNode node : nodeList) { @@ -129,12 +129,12 @@ private RiakCluster(Builder builder) } else { - this.operationQueueMaxDepth = 0; + this.operationQueuePermits = new RiakNode.Sync(0); this.operationQueue = null; } // Pass a *copy* of the list to the NodeManager - nodeManager.init(new ArrayList(nodeList)); + nodeManager.init(new ArrayList<>(nodeList)); state = State.CREATED; } @@ -244,7 +244,7 @@ public RiakFuture execute(FutureOperation operation) // Queue it up, run next from queue if (this.queueOperations) { - executeWithQueueStrategy(operation, null); + executeWithQueueStrategy(operation); } else // Out of connections, retrier will pick it up later. { @@ -260,11 +260,12 @@ private boolean notQueuingOrQueueIsEmpty() return !this.queueOperations || this.operationQueue.size() == 0; } - private void executeWithQueueStrategy(FutureOperation operation, RiakNode previousNode) + private void executeWithQueueStrategy(FutureOperation operation) { - if (operationQueue.size() >= operationQueueMaxDepth) + if (!operationQueuePermits.tryAcquire()) { - logger.warn("No Nodes Available, and Operation Queue at Max Depth"); + logger.warn("Can't execute operation {}, no connections available, and Operation Queue at Max Depth", + System.identityHashCode(operation)); operation.setRetrier(this, 1); operation.setException(new NoNodesAvailableException("No Nodes Available, and Operation Queue at Max Depth")); return; @@ -285,32 +286,39 @@ private void executeWithQueueStrategy(FutureOperation operation, RiakNode previo private boolean executeWithRequeueOnNoConnection(FutureOperation operation) { + logger.debug("Queued operation {} attempting to be executed.", System.identityHashCode(operation)); // Attempt to run boolean gotConnection = this.execute(operation, null); // If we can't get a connection, put it back at the beginning of the queue if (!gotConnection) { + logger.debug("Queued operation {} wasn't executed, no connection available, requeuing operation.", + System.identityHashCode(operation)); operationQueue.offerFirst(operation); } + else + { + operationQueuePermits.release(); + } verifyQueueStatus(); return gotConnection; } - private void verifyQueueStatus() + private synchronized void verifyQueueStatus() { - Integer queueSize = operationQueue.size(); + Integer queueSize = operationQueuePermits.getMaxPermits() - operationQueuePermits.availablePermits(); if (queueSize > 0 && state == State.RUNNING) { state = State.QUEUING; - logger.debug("RiakCluster queuing operations."); + logger.debug("RiakCluster state change: Now Queuing operations."); } else if (queueSize == 0 && (state == State.QUEUING || state == State.SHUTTING_DOWN)) { - logger.debug("RiakCluster cleared operation queue."); + logger.debug("RiakCluster state change: Cleared operation queue."); if (state == State.QUEUING) { state = State.RUNNING; @@ -441,7 +449,7 @@ public void nodeStateChanged(RiakNode node, RiakNode.State state) @Override public void operationFailed(FutureOperation operation, int remainingRetries) { - logger.debug("operation failed; remaining retries: {}", remainingRetries); + logger.debug("operation {} failed; remaining retries: {}", System.identityHashCode(operation), remainingRetries); if (remainingRetries > 0) { retryQueue.add(operation); @@ -456,7 +464,7 @@ public void operationFailed(FutureOperation operation, int remainingRetries) public void operationComplete(FutureOperation operation, int remainingRetries) { inFlightCount.decrementAndGet(); - logger.debug("operation complete; remaining retries: {}", remainingRetries); + logger.debug("operation {} complete; remaining retries: {}", System.identityHashCode(operation), remainingRetries); } private void retryOperation() throws InterruptedException @@ -473,7 +481,14 @@ private void retryOperation() throws InterruptedException private void queueDrainOperation() throws InterruptedException { - FutureOperation operation = operationQueue.take(); + logger.debug("QueueDrainer - Polling for queued operations."); + FutureOperation operation = operationQueue.poll(); + if(operation == null) + { + logger.debug("QueueDrainer - No queued operation available, sleeping."); + Thread.sleep(50); + return; + } boolean connectionSuccess = executeWithRequeueOnNoConnection(operation); @@ -481,6 +496,8 @@ private void queueDrainOperation() throws InterruptedException // Sleep for a bit so we don't spinwait our CPUs to death if (!connectionSuccess) { + logger.debug("QueueDrainer - Pulled queued operation {}, but no connection available, sleeping.", System.identityHashCode(operation)); + // TODO: should this timeout be configurable, or based on an // average command execution time? Thread.sleep(50); diff --git a/src/main/java/com/basho/riak/client/core/RiakFuture.java b/src/main/java/com/basho/riak/client/core/RiakFuture.java index 4ce8e0ab0..548be4d86 100644 --- a/src/main/java/com/basho/riak/client/core/RiakFuture.java +++ b/src/main/java/com/basho/riak/client/core/RiakFuture.java @@ -21,20 +21,20 @@ import java.util.concurrent.TimeoutException; /** - * The result of an asynchronous Riak operation. + * The result of an asynchronous Riak operation. *

* All Riak operations are asynchronous. It means when you execute an operation on - * the cluster it will return immediately with no guarantee that the requested - * operation has been completed at the end of the call. Instead, you will be returned - * a {@code RiakFuture} instance which gives you the information about the result + * the cluster it will return immediately with no guarantee that the requested + * operation has been completed at the end of the call. Instead, you will be returned + * a {@code RiakFuture} instance which gives you the information about the result * or status of the operation. *

*

- * A {@code RiakFuture} is either uncompleted or completed. When an operation begins, a new future - * object is created. The new future is uncompleted initially - it is neither - * succeeded, failed, nor canceled because the operation is not finished yet. - * If the operation is finished either successfully, with failure, or by cancellation, - * the future is marked as completed with more specific information, such as the + * A {@code RiakFuture} is either uncompleted or completed. When an operation begins, a new future + * object is created. The new future is uncompleted initially - it is neither + * succeeded, failed, nor canceled because the operation is not finished yet. + * If the operation is finished either successfully, with failure, or by cancellation, + * the future is marked as completed with more specific information, such as the * cause of the failure. Please note that even failure and cancellation belong to the completed state. *

  *                                      +---------------------------+
@@ -61,7 +61,7 @@
  * then call {@literal getNow()} or {@literal cause()}

* @author Brian Roach * @param the (response) return type - * @param The query info type + * @param The query info type * @since 2.0 */ public interface RiakFuture extends Future @@ -70,14 +70,14 @@ public interface RiakFuture extends Future * Not supported due to limitations of the Riak API. *

* At present time there is no way to cancel an operation sent to Riak. - * This method will never succeed and always return false. + * This method will never succeed and always return false. *

* @param mayInterruptIfRunning * @return always false. */ @Override boolean cancel(boolean mayInterruptIfRunning); - + /** * Waits for this RiakFuture to be completed if necessary and returns the response if available. * @return The response from the operation. @@ -88,13 +88,13 @@ public interface RiakFuture extends Future */ @Override V get() throws InterruptedException, ExecutionException; - + /** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. *

* Note that the timeout value here is how long you are willing to wait - * for this RiakFuture to complete. If you wish to set a timeout on the command + * for this RiakFuture to complete. If you wish to set a timeout on the command * itself, use the timeout() method provided in the command's associated builder. *

* @param timeout the amount of time to wait before returning. @@ -115,12 +115,12 @@ public interface RiakFuture extends Future /** * Waits for this RiakFuture to be completed. *

- * Upon returning, the operation has completed. Checking isSuccess() tells + * Upon returning, the operation has completed. Checking isSuccess() tells * you if it did so successfully. *

* @throws InterruptedException if the current thread was interrupted * while waiting - * @see #isSuccess() + * @see #isSuccess() */ void await() throws InterruptedException; /** @@ -134,33 +134,35 @@ public interface RiakFuture extends Future * If you wish to set a timeout on the command itself, use the timeout() method * provided in the command's associated builder. *

- * + * * @param timeout the amount of time to wait before returning. * @param unit the unit of time. + * @return {@code true} if the future completed and {@code false} + * if the waiting time elapsed before it completed * @throws InterruptedException if the current thread was interrupted * while waiting - * @see #isDone() - * @see #isSuccess() + * @see #isDone() + * @see #isSuccess() */ - void await(long timeout, TimeUnit unit) throws InterruptedException; + boolean await(long timeout, TimeUnit unit) throws InterruptedException; /** * Determine if the operation was successful. *

- * In the case of true, get() will then return the response. If false, + * In the case of true, get() will then return the response. If false, * cause() will then return non-null. * @return true if completed and successful, false otherwise. - * @see #cause() + * @see #cause() */ - + /** - * Return the result without blocking or throwing an exception. - * If the future is not yet done or has failed this will return null. - * As it is possible that a null value is used to mark the future as successful - * you also need to check if the future is really done with {@link #isDone()} + * Return the result without blocking or throwing an exception. + * If the future is not yet done or has failed this will return null. + * As it is possible that a null value is used to mark the future as successful + * you also need to check if the future is really done with {@link #isDone()} * and not rely on the returned null value. * @return The response, or {@literal null} if the future is not yet completed or failed. - * @see #isDone() - * @see #isSuccess() + * @see #isDone() + * @see #isSuccess() */ V getNow(); /** @@ -171,11 +173,11 @@ public interface RiakFuture extends Future /** * Return information about the operation and why it failed. *

- * Note this will return {@literal null} if the operation completed + * Note this will return {@literal null} if the operation completed * successfully. *

* @return The exception thrown during the operation, or null if not set. - * @see #isSuccess() + * @see #isSuccess() */ Throwable cause(); /** diff --git a/src/main/java/com/basho/riak/client/core/RiakNode.java b/src/main/java/com/basho/riak/client/core/RiakNode.java index aab8cc0bd..9d3288793 100644 --- a/src/main/java/com/basho/riak/client/core/RiakNode.java +++ b/src/main/java/com/basho/riak/client/core/RiakNode.java @@ -592,13 +592,14 @@ public boolean execute(FutureOperation operation) inProgressMap.put(channel, operation); ChannelFuture writeFuture = channel.writeAndFlush(operation); writeFuture.addListener(writeListener); - logger.debug("Operation being executed on RiakNode {}:{}", remoteAddress, port); + logger.debug("Operation {} being executed on RiakNode {}:{}", + System.identityHashCode(operation), remoteAddress, port); return true; } else { - logger.debug("Operation not being executed Riaknode {}:{}; no connections available", - remoteAddress, port); + logger.debug("Operation {} not being executed Riaknode {}:{}; no connections available", + System.identityHashCode(operation), remoteAddress, port); return false; } } @@ -857,8 +858,7 @@ private void closeConnection(Channel c) @Override public void onSuccess(Channel channel, final RiakMessage response) { - logger.debug("Operation onSuccess() channel: id:{} {}:{}", channel.hashCode(), - remoteAddress, port); + logger.debug("Operation onSuccess() channel: id:{} {}:{}", channel.hashCode(), remoteAddress, port); consecutiveFailedOperations.set(0); final FutureOperation inProgress = inProgressMap.get(channel); @@ -994,7 +994,7 @@ public long getIdleStart() } } - private class Sync extends Semaphore + static class Sync extends Semaphore { private static final long serialVersionUID = -5118488872281021072L; private volatile int maxPermits; diff --git a/src/main/java/com/basho/riak/client/core/netty/HealthCheckDecoder.java b/src/main/java/com/basho/riak/client/core/netty/HealthCheckDecoder.java index 2263d2454..6ef54f463 100644 --- a/src/main/java/com/basho/riak/client/core/netty/HealthCheckDecoder.java +++ b/src/main/java/com/basho/riak/client/core/netty/HealthCheckDecoder.java @@ -218,9 +218,9 @@ public void await() throws InterruptedException } @Override - public void await(long timeout, TimeUnit unit) throws InterruptedException + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { - latch.await(timeout, unit); + return latch.await(timeout, unit); } @Override diff --git a/src/main/java/com/basho/riak/client/core/operations/SecondaryIndexQueryOperation.java b/src/main/java/com/basho/riak/client/core/operations/SecondaryIndexQueryOperation.java index 8b66e6345..4a73b26dc 100644 --- a/src/main/java/com/basho/riak/client/core/operations/SecondaryIndexQueryOperation.java +++ b/src/main/java/com/basho/riak/client/core/operations/SecondaryIndexQueryOperation.java @@ -18,6 +18,7 @@ import com.basho.riak.client.core.FutureOperation; import com.basho.riak.client.core.RiakMessage; import com.basho.riak.client.core.query.Namespace; +import com.basho.riak.client.core.query.indexes.IndexNames; import com.basho.riak.client.core.util.BinaryValue; import com.basho.riak.protobuf.RiakMessageCodes; import com.basho.riak.protobuf.RiakKvPB; @@ -83,7 +84,7 @@ protected SecondaryIndexQueryOperation.Response convert(List rawResponse * Also, the $key index queries just ignore return_terms altogether. */ - if (pbReq.getReturnTerms() && !query.indexName.toString().equalsIgnoreCase("$key")) + if (pbReq.getReturnTerms() && !query.indexName.toString().equalsIgnoreCase(IndexNames.KEY)) { convertTerms(responseBuilder, pbEntry); } @@ -227,7 +228,7 @@ public Builder(Query query) .setIndex(ByteString.copyFrom(query.indexName.unsafeGetValue())) .setReturnTerms(query.returnKeyAndIndex) .setReturnBody(query.returnBody); - + if (query.indexKey != null) { pbReqBuilder.setKey(ByteString.copyFrom(query.indexKey.unsafeGetValue())) @@ -246,7 +247,7 @@ else if(query.getRangeStart() != null) pbReqBuilder.setCoverContext(ByteString.copyFrom(query.coverageContext)) .setKey(ByteString.EMPTY) - .setIndex(ByteString.copyFromUtf8("$bucket")) + .setIndex(ByteString.copyFromUtf8(IndexNames.BUCKET)) .clearReturnTerms() .setQtype(RiakKvPB.RpbIndexReq.IndexQueryType.eq); } @@ -445,7 +446,7 @@ public static class Builder private Integer timeout; private byte[] coverageContext; private boolean returnBody; - + /** * Constructs a builder for a (2i) Query. * The index name must be the complete name with the _int or _bin suffix. diff --git a/src/main/java/com/basho/riak/client/core/query/indexes/IndexNames.java b/src/main/java/com/basho/riak/client/core/query/indexes/IndexNames.java new file mode 100644 index 000000000..71a36fdc0 --- /dev/null +++ b/src/main/java/com/basho/riak/client/core/query/indexes/IndexNames.java @@ -0,0 +1,17 @@ +package com.basho.riak.client.core.query.indexes; + +/** + * Collection of Built-in Riak Secondary Index Names + */ +public class IndexNames +{ + /* + The $bucket index name, used to fetch all the keys in a certain bucket. + */ + public static final String BUCKET = "$bucket"; + + /* + The $key index name, used to fetch a range of keys in a certain bucket. + */ + public static final String KEY = "$key"; +} diff --git a/src/main/java/com/basho/riak/client/core/query/indexes/IndexType.java b/src/main/java/com/basho/riak/client/core/query/indexes/IndexType.java index ca2a6a2a3..633d728c2 100644 --- a/src/main/java/com/basho/riak/client/core/query/indexes/IndexType.java +++ b/src/main/java/com/basho/riak/client/core/query/indexes/IndexType.java @@ -19,7 +19,7 @@ * Enum that encapsulates the suffix used to determine and index type in Riak. *

* There are two types of Secondary Indexes (2i) in Riak; "Integer" and - * "Binary". The current server API distinguishes between them via a + * "Binary". The current server API distinguishes between them via a * suffix ({@code "_int"} and {@code "_bin"} respectively). *

* @author Brian Roach @@ -31,7 +31,7 @@ public enum IndexType /** * Encapsulates the {@code "_int"} suffix for Riak index names. */ - INT("_int"), + INT("_int"), /** * Encapsulates the {@code "_bin"} suffix for Riak index names. */ @@ -44,14 +44,14 @@ public enum IndexType * Used for the special $key index */ KEY(""); - + private final String suffix; - + private IndexType(String suffix) { this.suffix = suffix; } - + /** * Returns the suffix for this type. * @return a {@code String} containing the suffix for this index type. @@ -60,7 +60,7 @@ public String suffix() { return suffix; } - + /** * Returns the index type from its fully qualified name

There are two * types of Secondary Indexes (2i) in Riak; "Integer" and "Binary". The @@ -76,11 +76,11 @@ public String suffix() */ public static IndexType typeFromFullname(String fullname) { - if (fullname.equalsIgnoreCase("$bucket")) + if (fullname.equalsIgnoreCase(IndexNames.BUCKET)) { return IndexType.BUCKET; } - else if (fullname.equalsIgnoreCase("$key")) + else if (fullname.equalsIgnoreCase(IndexNames.KEY)) { return IndexType.KEY; } diff --git a/src/test/java/com/basho/riak/client/api/ITestClusterLifecycle.java b/src/test/java/com/basho/riak/client/api/ITestClusterLifecycle.java index d671ec38c..7eae085bf 100644 --- a/src/test/java/com/basho/riak/client/api/ITestClusterLifecycle.java +++ b/src/test/java/com/basho/riak/client/api/ITestClusterLifecycle.java @@ -36,7 +36,7 @@ public class ITestClusterLifecycle public ITestClusterLifecycle() { - testLifecycle = Boolean.getBoolean("com.basho.riak.lifecycle"); + testLifecycle = Boolean.parseBoolean(System.getProperty("com.basho.riak.lifecycle", "true")); hostname = System.getProperty("com.basho.riak.host", RiakNode.Builder.DEFAULT_REMOTE_ADDRESS); pbcPort = Integer.getInteger("com.basho.riak.pbcport", RiakNode.Builder.DEFAULT_REMOTE_PORT); diff --git a/src/test/java/com/basho/riak/client/api/commands/ImmediateRiakFuture.java b/src/test/java/com/basho/riak/client/api/commands/ImmediateRiakFuture.java index f431763c1..100a620fc 100644 --- a/src/test/java/com/basho/riak/client/api/commands/ImmediateRiakFuture.java +++ b/src/test/java/com/basho/riak/client/api/commands/ImmediateRiakFuture.java @@ -53,7 +53,7 @@ public V getNow() { return value; } - + @Override public boolean isCancelled() { @@ -85,7 +85,7 @@ public void await() throws InterruptedException } @Override - public void await(long timeout, TimeUnit unit) throws InterruptedException + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. } diff --git a/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestFullBucketRead.java b/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestFullBucketRead.java index 533194097..cce739317 100644 --- a/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestFullBucketRead.java +++ b/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestFullBucketRead.java @@ -10,13 +10,11 @@ import com.basho.riak.client.core.operations.CoveragePlanOperation; import com.basho.riak.client.core.operations.CoveragePlanOperation.Response.CoverageEntry; import com.basho.riak.client.core.operations.itest.ITestAutoCleanupBase; +import com.basho.riak.client.core.operations.itest.ITestBase; import com.basho.riak.client.core.query.RiakObject; import com.basho.riak.client.core.util.BinaryValue; import com.basho.riak.client.core.util.HostAndPort; -import org.junit.Assume; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,23 +27,24 @@ /** * @author Sergey Galkin */ -public class ITestFullBucketRead extends ITestAutoCleanupBase +public class ITestFullBucketRead extends ITestBase { final static int minPartitions = 5; final static int NUMBER_OF_TEST_VALUES = 100; private final static Logger logger = LoggerFactory.getLogger(ITestFullBucketRead.class); - private CoveragePlan.Response coveragePlan; - private RiakClient client; + private static CoveragePlan.Response coveragePlan; + private static RiakClient client; // TODO: Remove assumption as Riak KV with PEx and Coverage plan will be released @BeforeClass - public static void BeforeClass() { + public static void BeforeClass() throws ExecutionException, InterruptedException + { Assume.assumeTrue(testTimeSeries); + setupData(); } - @Before - public void setupData() throws ExecutionException, InterruptedException + private static void setupData() throws ExecutionException, InterruptedException { String indexName = "creationNo"; String keyBase = "k"; @@ -53,9 +52,17 @@ public void setupData() throws ExecutionException, InterruptedException setupIndexTestData(defaultNamespace(), indexName, keyBase, value); + setupCoveragePlan(); + + // To be sure that settle down across nodes + Thread.sleep(1000); + } + + private static void setupCoveragePlan() throws ExecutionException, InterruptedException + { final CoveragePlan cmd = CoveragePlan.Builder.create(defaultNamespace()) - .withMinPartitions(minPartitions) - .build(); + .withMinPartitions(minPartitions) + .build(); client = new RiakClient(cluster); @@ -64,7 +71,7 @@ public void setupData() throws ExecutionException, InterruptedException if (logger.isInfoEnabled()) { final StringBuilder builder = new StringBuilder("\n\tGot the following list of Coverage Entries:"); - for (CoveragePlanOperation.Response.CoverageEntry ce : coveragePlan) + for (CoverageEntry ce : coveragePlan) { builder.append(String.format("\n\t%s:%d ('%s')", ce.getHost(), @@ -74,9 +81,16 @@ public void setupData() throws ExecutionException, InterruptedException } logger.info(builder.toString()); } + } - // To be sure that settle down across nodes - Thread.sleep(1000); + @AfterClass + public static void cleanupData() throws ExecutionException, InterruptedException + { + resetAndEmptyBucket(bucketName); + if (testBucketType) + { + resetAndEmptyBucket(defaultNamespace()); + } } @Test diff --git a/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestRawIndexQuery.java b/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestRawIndexQuery.java index 11c838ff0..ed3ed7597 100644 --- a/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestRawIndexQuery.java +++ b/src/test/java/com/basho/riak/client/api/commands/indexes/itest/ITestRawIndexQuery.java @@ -22,9 +22,7 @@ import com.basho.riak.client.api.annotations.RiakKey; import com.basho.riak.client.api.annotations.RiakVClock; import com.basho.riak.client.api.cap.VClock; -import com.basho.riak.client.api.commands.indexes.BucketIndexQuery; -import com.basho.riak.client.api.commands.indexes.KeyIndexQuery; -import com.basho.riak.client.api.commands.indexes.RawIndexQuery; +import com.basho.riak.client.api.commands.indexes.*; import com.basho.riak.client.api.commands.indexes.SecondaryIndexQuery.Type; import com.basho.riak.client.api.commands.kv.StoreValue; import com.basho.riak.client.core.RiakFuture; @@ -33,6 +31,7 @@ import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace; import com.basho.riak.client.core.query.RiakObject; +import com.basho.riak.client.core.query.indexes.IndexNames; import com.basho.riak.client.core.util.BinaryValue; import org.junit.*; @@ -116,7 +115,7 @@ public void testKeyIndexHack() throws InterruptedException, ExecutionException RiakClient client = new RiakClient(cluster); RawIndexQuery biq = - new RawIndexQuery.Builder(sharedNamespace, "$key", Type._KEY, BinaryValue.create("my_key10"), BinaryValue.create("my_key19")) + new RawIndexQuery.Builder(sharedNamespace, IndexNames.KEY, Type._KEY, BinaryValue.create("my_key10"), BinaryValue.create("my_key19")) .withKeyAndIndex(true).build(); RawIndexQuery.Response iResp = client.execute(biq); assertTrue(iResp.hasEntries()); @@ -145,7 +144,8 @@ public void testBucketIndexHack() throws InterruptedException, ExecutionExceptio RiakClient client = new RiakClient(cluster); RawIndexQuery biq = - new RawIndexQuery.Builder(sharedNamespace, "$bucket", Type._BUCKET, BinaryValue.create(sharedBucket)) + new RawIndexQuery.Builder(sharedNamespace, + IndexNames.BUCKET, Type._BUCKET, BinaryValue.create(sharedBucket)) .withKeyAndIndex(true).build(); RawIndexQuery.Response iResp = client.execute(biq); @@ -162,7 +162,7 @@ public void testBucketIndexQuery() throws InterruptedException, ExecutionExcepti BucketIndexQuery bq = new BucketIndexQuery.Builder(sharedNamespace).build(); - final RawIndexQuery.Response bqResp = client.execute(bq); + final BinIndexQuery.Response bqResp = client.execute(bq); assertTrue(bqResp.hasEntries()); assertEquals(100, bqResp.getEntries().size()); } diff --git a/src/test/java/com/basho/riak/client/api/commands/itest/ITestTimeSeries.java b/src/test/java/com/basho/riak/client/api/commands/itest/ITestTimeSeries.java index 483a4e658..24a957b4a 100644 --- a/src/test/java/com/basho/riak/client/api/commands/itest/ITestTimeSeries.java +++ b/src/test/java/com/basho/riak/client/api/commands/itest/ITestTimeSeries.java @@ -241,7 +241,8 @@ public void test_k_TestStoringDataOutOfOrderResultsInError() throws ExecutionExc { RiakClient client = new RiakClient(cluster); - Row row = new Row(com.basho.riak.client.core.query.timeseries.Cell.newTimestamp(fifteenMinsAgo), new Cell("hash1"), new Cell("user1"), new Cell("cloudy"), new Cell(79.0)); + Row row = new Row(Cell.newTimestamp(fifteenMinsAgo), new Cell("hash1"), new Cell("user1"), + new Cell("cloudy"), new Cell(79.0)); Store store = new Store.Builder(BAD_TABLE_NAME).withRow(row).build(); RiakFuture future = client.executeAsync(store); @@ -255,9 +256,8 @@ public void test_l_TestFetchingSingleRowsWorks() throws ExecutionException, Inte { RiakClient client = new RiakClient(cluster); - final List keyCells = Arrays.asList(new Cell("hash2"), new Cell("user4"), com.basho.riak.client.core - .query.timeseries.Cell - .newTimestamp(fifteenMinsAgo)); + final List keyCells = Arrays.asList(new Cell("hash2"), new Cell("user4"), + Cell.newTimestamp(fifteenMinsAgo)); Fetch fetch = new Fetch.Builder(tableName, keyCells).build(); QueryResult queryResult = client.execute(fetch); @@ -273,8 +273,8 @@ public void test_m_TestFetchingWithNotFoundKeyReturnsNoRows() throws ExecutionEx { RiakClient client = new RiakClient(cluster); - final List keyCells = Arrays.asList(new Cell("nohash"), new Cell("nouser"), com.basho.riak.client.core.query.timeseries.Cell - .newTimestamp(fifteenMinsAgo)); + final List keyCells = Arrays.asList(new Cell("nohash"), new Cell("nouser"), + Cell.newTimestamp(fifteenMinsAgo)); Fetch fetch = new Fetch.Builder(tableName, keyCells).build(); QueryResult queryResult = client.execute(fetch); @@ -284,8 +284,8 @@ public void test_m_TestFetchingWithNotFoundKeyReturnsNoRows() throws ExecutionEx @Test public void test_n_TestDeletingRowRemovesItFromQueries() throws ExecutionException, InterruptedException { - final List keyCells = Arrays.asList(new Cell("hash2"), new Cell("user4"), com.basho.riak.client.core.query.timeseries.Cell - .newTimestamp(fiveMinsAgo)); + final List keyCells = Arrays.asList(new Cell("hash2"), new Cell("user4"), + Cell.newTimestamp(fiveMinsAgo)); RiakClient client = new RiakClient(cluster); @@ -313,7 +313,7 @@ public void test_o_TestDeletingWithNotFoundKeyDoesNotReturnError() throws Execut { RiakClient client = new RiakClient(cluster); - final List keyCells = Arrays.asList(new Cell("nohash"), new Cell("nouser"), com.basho.riak.client.core.query.timeseries.Cell + final List keyCells = Arrays.asList(new Cell("nohash"), new Cell("nouser"), Cell .newTimestamp(fifteenMinsAgo)); Delete delete = new Delete.Builder(tableName, keyCells).build(); diff --git a/src/test/java/com/basho/riak/client/core/RiakClusterFixtureTest.java b/src/test/java/com/basho/riak/client/core/RiakClusterFixtureTest.java index 0250ce06b..542d5248e 100644 --- a/src/test/java/com/basho/riak/client/core/RiakClusterFixtureTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakClusterFixtureTest.java @@ -22,12 +22,15 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.UnknownHostException; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -35,12 +38,14 @@ /** * * @author Brian Roach + * @author Alex Moore */ -public class RiakClusterFixtureTest +public class RiakClusterFixtureTest { + private final Logger logger = LoggerFactory.getLogger(RiakClusterFixtureTest.class); private NetworkTestFixture[] fixtures; - - + + @Before public void setUp() throws IOException { @@ -51,7 +56,7 @@ public void setUp() throws IOException new Thread(fixtures[i]).start(); } } - + @After public void tearDown() throws IOException { @@ -60,12 +65,12 @@ public void tearDown() throws IOException fixtures[i].shutdown(); } } - + @Test(timeout = 10000) public void operationSuccess() throws UnknownHostException, InterruptedException, ExecutionException { List list = new LinkedList(); - + for (int i = 5000; i < 8000; i += 1000) { RiakNode.Builder builder = new RiakNode.Builder() @@ -73,18 +78,18 @@ public void operationSuccess() throws UnknownHostException, InterruptedException .withRemotePort(i + NetworkTestFixture.PB_FULL_WRITE_STAY_OPEN); list.add(builder.build()); } - + RiakCluster cluster = new RiakCluster.Builder(list).build(); cluster.start(); - + Namespace ns = new Namespace(Namespace.DEFAULT_BUCKET_TYPE, "test_bucket"); Location location = new Location(ns, "test_key2"); - - FetchOperation operation = + + FetchOperation operation = new FetchOperation.Builder(location).build(); cluster.execute(operation); - + try { FetchOperation.Response response = operation.get(); @@ -93,19 +98,19 @@ public void operationSuccess() throws UnknownHostException, InterruptedException } catch(InterruptedException ignored) { - + } finally { cluster.shutdown(); } } - + @Test(timeout = 10000) public void operationFail() throws UnknownHostException, ExecutionException, InterruptedException { List list = new LinkedList(); - + for (int i = 5000; i < 8000; i += 1000) { RiakNode.Builder builder = new RiakNode.Builder() @@ -113,20 +118,20 @@ public void operationFail() throws UnknownHostException, ExecutionException, Int .withRemotePort(i + NetworkTestFixture.ACCEPT_THEN_CLOSE); list.add(builder.build()); } - + RiakCluster cluster = new RiakCluster.Builder(list).build(); - + cluster.start(); - + Namespace ns = new Namespace(Namespace.DEFAULT_BUCKET_TYPE, "test_bucket"); Location location = new Location(ns, "test_key2"); - - FetchOperation operation = + + FetchOperation operation = new FetchOperation.Builder(location) .build(); cluster.execute(operation); - + try { operation.await(); @@ -138,12 +143,12 @@ public void operationFail() throws UnknownHostException, ExecutionException, Int cluster.shutdown(); } } - + @Test(timeout = 10000) public void testStateListener() throws UnknownHostException, InterruptedException, ExecutionException { List list = new LinkedList(); - + for (int i = 5000; i < 8000; i += 1000) { RiakNode.Builder builder = new RiakNode.Builder() @@ -151,20 +156,20 @@ public void testStateListener() throws UnknownHostException, InterruptedExceptio .withRemotePort(i + NetworkTestFixture.PB_FULL_WRITE_STAY_OPEN); list.add(builder.build()); } - + RiakCluster cluster = new RiakCluster.Builder(list).build(); - + StateListener listener = new StateListener(); cluster.registerNodeStateListener(listener); - + cluster.start(); - + // Yeah, yeah, fragile ... whatever Thread.sleep(3000); - + cluster.shutdown().get(); - - + + // Upon registering the initial node state of each node should be sent. assertEquals(3, listener.stateCreated); // All three nodes should go through all three states and notify. @@ -185,11 +190,14 @@ public void testOperationQueue() throws Exception .withRemotePort(5000 + NetworkTestFixture.PB_FULL_WRITE_STAY_OPEN); RiakNode goodNode = goodNodeBuilder.build(); + logger.debug("testOperationQueue - Starting cluster..."); // Pass in 0 nodes, cause a queue backup. RiakCluster cluster = new RiakCluster.Builder(list).withOperationQueueMaxDepth(2).build(); cluster.start(); + logger.debug("testOperationQueue - Cluster started"); + Namespace ns = new Namespace(Namespace.DEFAULT_BUCKET_TYPE, "test_bucket"); Location location = new Location(ns, "test_key2"); FetchOperation.Builder opBuilder = new FetchOperation.Builder(location); @@ -199,57 +207,72 @@ public void testOperationQueue() throws Exception FetchOperation operation3 = opBuilder.build(); FetchOperation operation4 = opBuilder.build(); + logger.debug("testOperationQueue - Executing 1st Operation"); RiakFuture future1 = cluster.execute(operation1); + logger.debug("testOperationQueue - Executing 2nd Operation"); RiakFuture future2 = cluster.execute(operation2); + logger.debug("testOperationQueue - Executing 3rd Operation"); RiakFuture future3 = cluster.execute(operation3); try { + logger.debug("testOperationQueue - Waiting on 3rd Operation"); // Verify that the third operation was rejected - operation3.await(); + assertTrue(operation3.await(5, TimeUnit.SECONDS)); assertFalse(operation3.isSuccess()); Throwable cause = operation3.cause(); - assertNotNull(cause != null && cause.getMessage() != null ? cause.getMessage() : "No message set?", cause); + assertNotNull(cause); + logger.debug("testOperationQueue - Adding Node to Cluster"); // Add a node to start processing the queue backlog cluster.addNode(goodNode); + logger.debug("testOperationQueue - Waiting on 1st Operation"); - future1.await(); + assertTrue(future1.await(1, TimeUnit.SECONDS)); // Process the first queue item - assertEquals(future1.get().getObjectList().get(0).getValue().toString(), "This is a value!"); - assertFalse(future1.get().isNotFound()); + assertGoodResponse(future1.get()); + logger.debug("testOperationQueue - Executing 4th Operation"); // Add another to fill it back up RiakFuture future4 = cluster.execute(operation4); + logger.debug("testOperationQueue - Waiting on 2nd Operation"); // Get next item in Queue - future2.await(); + assertTrue(future2.await(1, TimeUnit.SECONDS)); - assertEquals(future2.get().getObjectList().get(0).getValue().toString(), "This is a value!"); - assertFalse(future2.get().isNotFound()); + assertGoodResponse(future2.get()); + logger.debug("testOperationQueue - Waiting on 4th Operation"); // Get last item in Queue - future4.await(); + assertTrue(future4.await(1, TimeUnit.SECONDS)); - assertEquals(future4.get().getObjectList().get(0).getValue().toString(), "This is a value!"); - assertFalse(future4.get().isNotFound()); + assertGoodResponse(future4.get()); } finally { + logger.debug("testOperationQueue - Shutting Down Cluster"); cluster.shutdown(); + logger.debug("testOperationQueue - Cluster Shut Down"); } } + private void assertGoodResponse(FetchOperation.Response response) + throws InterruptedException, ExecutionException + { + assertEquals(response.getObjectList().get(0).getValue().toString(), "This is a value!"); + assertFalse(response.isNotFound()); + } + public static class StateListener implements NodeStateListener { public int stateCreated; public int stateRunning; public int stateShuttingDown; public int stateShutdown; - + @Override public void nodeStateChanged(RiakNode node, RiakNode.State state) { diff --git a/src/test/java/com/basho/riak/client/core/RiakClusterTest.java b/src/test/java/com/basho/riak/client/core/RiakClusterTest.java index 5a3adf419..f659f5437 100644 --- a/src/test/java/com/basho/riak/client/core/RiakClusterTest.java +++ b/src/test/java/com/basho/riak/client/core/RiakClusterTest.java @@ -25,7 +25,7 @@ import java.net.UnknownHostException; import java.util.List; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.LinkedBlockingQueue; import static org.junit.Assert.*; @@ -88,7 +88,7 @@ else if ("10.0.0.1".equals(n.getRemoteAddress())) } } } - + @Test public void addNodeToCluster() throws UnknownHostException { @@ -96,13 +96,13 @@ public void addNodeToCluster() throws UnknownHostException RiakNode node = mock(RiakNode.class); RiakNode.Builder nodeBuilder = spy(new RiakNode.Builder()); doReturn(node).when(nodeBuilder).build(); - + RiakCluster cluster = new RiakCluster.Builder(nodeBuilder.build()).withNodeManager(nodeManager).build(); cluster.addNode(nodeBuilder.build()); assertEquals(2, cluster.getNodes().size()); verify(nodeManager).addNode(node); } - + @Test public void removeNodeFromCluster() { @@ -111,13 +111,13 @@ public void removeNodeFromCluster() RiakNode.Builder nodeBuilder = spy(new RiakNode.Builder()); doReturn(node).when(nodeBuilder).build(); doReturn(true).when(nodeManager).removeNode(node); - + RiakCluster cluster = new RiakCluster.Builder(nodeBuilder.build()).withNodeManager(nodeManager).build(); assertTrue(cluster.removeNode(node)); verify(nodeManager).removeNode(node); assertEquals(0, cluster.getNodes().size()); } - + @Test public void allNodesShutdownStopsCluster() { @@ -126,13 +126,13 @@ public void allNodesShutdownStopsCluster() RiakNode.Builder nodeBuilder = spy(new RiakNode.Builder()); doReturn(node).when(nodeBuilder).build(); doReturn(true).when(nodeManager).removeNode(node); - + RiakCluster cluster = new RiakCluster.Builder(nodeBuilder.build()).withNodeManager(nodeManager).build(); cluster.nodeStateChanged(node, RiakNode.State.SHUTDOWN); RiakCluster.State state = Whitebox.getInternalState(cluster, "state"); assertEquals(state, RiakCluster.State.SHUTDOWN); } - + @Test @SuppressWarnings("unchecked") public void clusterExecutesOperation() @@ -143,7 +143,7 @@ public void clusterExecutesOperation() RiakNode.Builder nodeBuilder = spy(new RiakNode.Builder()); doReturn(node).when(nodeBuilder).build(); doReturn(true).when(node).execute(operation); - + RiakCluster cluster = new RiakCluster.Builder(nodeBuilder.build()).withNodeManager(nodeManager).build(); Whitebox.setInternalState(cluster, "state", RiakCluster.State.RUNNING); cluster.execute(operation); @@ -151,7 +151,7 @@ public void clusterExecutesOperation() verify(nodeManager).executeOnNode(operation, null); cluster.operationComplete(operation, 2); assertEquals(0, cluster.inFlightCount()); - + cluster.execute(operation); cluster.operationFailed(operation, 1); LinkedBlockingQueue retryQueue = Whitebox.getInternalState(cluster, "retryQueue"); @@ -230,7 +230,7 @@ private void assertQueueStatus(RiakCluster cluster, Integer expectedQueueSize, FutureOperation expectedQueueHead) { boolean queueEnabled = Whitebox.getInternalState(cluster, "queueOperations"); - LinkedBlockingDeque< FutureOperation > operationQueue = Whitebox.getInternalState(cluster, "operationQueue"); + ConcurrentLinkedDeque operationQueue = Whitebox.getInternalState(cluster, "operationQueue"); RiakCluster.State state = Whitebox.getInternalState(cluster, "state"); assertTrue(queueEnabled); assertEquals((long)expectedQueueSize, operationQueue.size()); diff --git a/src/test/java/com/basho/riak/client/core/operations/itest/ITestBase.java b/src/test/java/com/basho/riak/client/core/operations/itest/ITestBase.java index 37caecd94..97a467483 100644 --- a/src/test/java/com/basho/riak/client/core/operations/itest/ITestBase.java +++ b/src/test/java/com/basho/riak/client/core/operations/itest/ITestBase.java @@ -86,7 +86,7 @@ public static void setUp() throws CertificateException, IOException, KeyStoreExc NoSuchAlgorithmException { bucketName = BinaryValue.unsafeCreate("ITestBase".getBytes()); - + /** * Riak security. * @@ -94,7 +94,7 @@ public static void setUp() throws CertificateException, IOException, KeyStoreExc * in the README.md's "Security Tests" section. */ - security = Boolean.parseBoolean(System.getProperty("com.basho.riak.security")); + security = Boolean.parseBoolean(System.getProperty("com.basho.riak.security", "false")); overrideCert = System.getProperty("com.basho.riak.security.cacert"); /** @@ -106,7 +106,7 @@ public static void setUp() throws CertificateException, IOException, KeyStoreExc * riak-admin bucket-type activate yokozuna */ yokozunaBucketType = BinaryValue.create("yokozuna"); - testYokozuna = Boolean.parseBoolean(System.getProperty("com.basho.riak.yokozuna")); + testYokozuna = Boolean.parseBoolean(System.getProperty("com.basho.riak.yokozuna", "true")); /** * Bucket type @@ -116,7 +116,7 @@ public static void setUp() throws CertificateException, IOException, KeyStoreExc * riak-admin bucket-type create plain '{"props":{}}' * riak-admin bucket-type activate plain */ - testBucketType = Boolean.parseBoolean(System.getProperty("com.basho.riak.buckettype")); + testBucketType = Boolean.parseBoolean(System.getProperty("com.basho.riak.buckettype", "true")); bucketType = BinaryValue.unsafeCreate("plain".getBytes()); /** @@ -124,10 +124,10 @@ public static void setUp() throws CertificateException, IOException, KeyStoreExc * * The backend must be 'leveldb' in riak config to us this */ - test2i = Boolean.parseBoolean(System.getProperty("com.basho.riak.2i")); + test2i = Boolean.parseBoolean(System.getProperty("com.basho.riak.2i", "true")); - legacyRiakSearch = Boolean.parseBoolean(System.getProperty("com.basho.riak.riakSearch")); + legacyRiakSearch = Boolean.parseBoolean(System.getProperty("com.basho.riak.riakSearch", "false")); /** @@ -148,8 +148,8 @@ public static void setUp() throws CertificateException, IOException, KeyStoreExc mapReduceBucketType = BinaryValue.create("mr"); - testCrdt = Boolean.parseBoolean(System.getProperty("com.basho.riak.crdt")); - testTimeSeries = Boolean.parseBoolean(System.getProperty("com.basho.riak.timeseries")); + testCrdt = Boolean.parseBoolean(System.getProperty("com.basho.riak.crdt", "true")); + testTimeSeries = Boolean.parseBoolean(System.getProperty("com.basho.riak.timeseries", "false")); /** * Riak PBC host diff --git a/src/test/java/com/basho/riak/client/core/operations/itest/ITestCoveragePlan.java b/src/test/java/com/basho/riak/client/core/operations/itest/ITestCoveragePlan.java index 8dec8995b..84161c26d 100644 --- a/src/test/java/com/basho/riak/client/core/operations/itest/ITestCoveragePlan.java +++ b/src/test/java/com/basho/riak/client/core/operations/itest/ITestCoveragePlan.java @@ -3,6 +3,8 @@ import com.basho.riak.client.api.RiakClient; import com.basho.riak.client.api.commands.buckets.FetchBucketProperties; import com.basho.riak.client.api.commands.indexes.BinIndexQuery; +import com.basho.riak.client.api.commands.indexes.BucketIndexQuery; +import com.basho.riak.client.api.commands.indexes.RawIndexQuery; import com.basho.riak.client.api.commands.kv.CoveragePlan; import com.basho.riak.client.core.RiakCluster; import com.basho.riak.client.core.RiakNode; @@ -148,9 +150,8 @@ public void fetchAllDataByUsingCoverageContext() throws ExecutionException, Inte { for(CoverageEntry ce: response.hostEntries(host)) { - // The only "$bucket" Binary Index may be used for Full Bucket Reads - final BinIndexQuery query = new BinIndexQuery.Builder(defaultNamespace(), "$bucket", ce.getCoverageContext()) - .withCoverageContext(ce.getCoverageContext()) + final BucketIndexQuery query = + new BucketIndexQuery.Builder(defaultNamespace(), ce.getCoverageContext()) .withTimeout(2000) .build(); diff --git a/src/test/java/com/basho/riak/client/core/operations/itest/ITestSecondaryIndexQueryOp.java b/src/test/java/com/basho/riak/client/core/operations/itest/ITestSecondaryIndexQueryOp.java index db799558e..198d70115 100644 --- a/src/test/java/com/basho/riak/client/core/operations/itest/ITestSecondaryIndexQueryOp.java +++ b/src/test/java/com/basho/riak/client/core/operations/itest/ITestSecondaryIndexQueryOp.java @@ -1,5 +1,5 @@ /* - * Copyright 2013 Basho Technologies Inc. + * Copyright 2013 Basho Technologies Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import com.basho.riak.client.core.query.Location; import com.basho.riak.client.core.query.Namespace; import com.basho.riak.client.core.query.RiakObject; +import com.basho.riak.client.core.query.indexes.IndexNames; import com.basho.riak.client.core.query.indexes.LongIntIndex; import com.basho.riak.client.core.query.indexes.StringBinIndex; import com.basho.riak.client.core.util.BinaryValue; @@ -153,7 +154,7 @@ public void testBucketIndexHack() throws InterruptedException, ExecutionExceptio Assume.assumeTrue(test2i); SecondaryIndexQueryOperation.Query query = - new SecondaryIndexQueryOperation.Query.Builder(defaultTypeNamespace, BinaryValue.unsafeCreate("$bucket".getBytes())) + new SecondaryIndexQueryOperation.Query.Builder(defaultTypeNamespace, BinaryValue.unsafeCreate(IndexNames.BUCKET.getBytes())) .withIndexKey(BinaryValue.create(bucketName)) .withReturnKeyAndIndex(true) .build(); @@ -174,7 +175,7 @@ public void testKeyIndexHack() throws InterruptedException, ExecutionException Assume.assumeTrue(test2i); SecondaryIndexQueryOperation.Query query = - new SecondaryIndexQueryOperation.Query.Builder(defaultTypeNamespace, BinaryValue.unsafeCreate("$key".getBytes())) + new SecondaryIndexQueryOperation.Query.Builder(defaultTypeNamespace, BinaryValue.unsafeCreate(IndexNames.KEY.getBytes())) .withRangeStart(BinaryValue.create("my_key10")) .withRangeEnd(BinaryValue.create("my_key19")) .withReturnKeyAndIndex(true) @@ -537,4 +538,4 @@ private void AssertLongObjectsInOrder(SecondaryIndexQueryOperation.Response resp previousKey = currentKey; } } -} \ No newline at end of file +} diff --git a/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestListKeysOperation.java b/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestListKeysOperation.java index 846abb2c2..e74d7d115 100644 --- a/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestListKeysOperation.java +++ b/src/test/java/com/basho/riak/client/core/operations/itest/ts/ITestListKeysOperation.java @@ -34,6 +34,7 @@ public static void InsertData() throws ExecutionException, InterruptedException future.get(); assertTrue(future.isSuccess()); + Thread.sleep(1000); } @Test @@ -52,7 +53,7 @@ public void testSingleFetch() throws ExecutionException, InterruptedException final List rows = result.getRowsCopy(); final List expectedKeys = getKeyHeads(); - assertTrue(expectedKeys.containsAll(rows)); + assertTrue(rows.containsAll(expectedKeys)); } private static List getKeyHeads() diff --git a/src/test/java/com/basho/riak/client/core/query/indexes/IndexTest.java b/src/test/java/com/basho/riak/client/core/query/indexes/IndexTest.java index 6f21b2370..b2288c560 100644 --- a/src/test/java/com/basho/riak/client/core/query/indexes/IndexTest.java +++ b/src/test/java/com/basho/riak/client/core/query/indexes/IndexTest.java @@ -45,7 +45,7 @@ public void correctSuffixes() Assert.assertEquals(IndexType.BUCKET.suffix(), ""); Assert.assertEquals(IndexType.KEY.suffix(), ""); } - + @Test public void testValidIndexTypeExtensions() { @@ -54,11 +54,11 @@ public void testValidIndexTypeExtensions() IndexType indexType1 = IndexType.typeFromFullname("indexname_bin"); Assert.assertTrue(indexType1.equals(IndexType.BIN)); - - IndexType indexType2 = IndexType.typeFromFullname("$bucket"); + + IndexType indexType2 = IndexType.typeFromFullname(IndexNames.BUCKET); Assert.assertTrue(indexType2.equals(IndexType.BUCKET)); - - IndexType indexType3 = IndexType.typeFromFullname("$key"); + + IndexType indexType3 = IndexType.typeFromFullname(IndexNames.KEY); Assert.assertTrue(indexType3.equals(IndexType.KEY)); }