Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ee9b42a
TTB Initial commit
srgg Dec 9, 2015
f43e250
Added proper handling of: TsPutReq (encoding), tsputresp(encoding), r…
srgg Dec 9, 2015
c28724e
RighTTBCodec is forced to be used by default
srgg Dec 9, 2015
01ef430
Netty Logger has been removed
srgg Dec 10, 2015
f43f0fa
TsGetReq Added TsGetReq and TsGetResp
srgg Dec 10, 2015
48dc9bd
Added ToggleTTBEncodingOperation and smart TTB/PB encoding/decodingю
srgg Jan 12, 2016
b46e1b1
Fix broken tests
srgg Jan 15, 2016
83e509f
TTB codec adjusted to RiakTS 1.1 changes
srgg Jan 15, 2016
65f2a92
Deferred encoding operation support was introduces for TS Store and F…
srgg Jan 20, 2016
970735c
Added proper handling of com.basho.riak.pbcport
srgg Jan 21, 2016
a59d9f3
rework preventive toggle the Native/TTB encoding to be toggled on per…
srgg Jan 22, 2016
914c215
getting rid of creation TsPutReq.Builder and TsGetReq.Builder
srgg Jan 25, 2016
fcc01e0
Fix error propogation in case of Channel fails to switch to use Nativ…
srgg Jan 27, 2016
c4e5e5c
Pull in change from another branch by @lehoff
alexmoore Apr 12, 2016
9281f24
Merge branch 'develop' into perf/ttb_encoding_v2
alexmoore Apr 12, 2016
5c09a18
Remove TTB Switch code, update PB dependency
alexmoore Apr 14, 2016
f743ff3
Add new TTB codec boilerplate
alexmoore Apr 18, 2016
6aa4237
Stubbing out Store operation
alexmoore Apr 18, 2016
d3a2d93
Encoding of tsgetreq to byte[] is complete.
lukebakken Apr 18, 2016
c061b25
Encode query and put TTB requests.
lukebakken Apr 18, 2016
e01e3fe
Stub out other TS operations to use TTB
alexmoore Apr 19, 2016
05df991
Remove TTB methods for TsDelReq - not supported yet.
lukebakken Apr 19, 2016
15834e6
Revert Delete to use PB
alexmoore Apr 19, 2016
51d60cc
Decode TTB Get/Query response
lukebakken Apr 19, 2016
519cea2
stub out test file
alexmoore Apr 20, 2016
e568a4c
Move error decoding into RiakMessage for pbuf and ttb messages.
lukebakken Apr 20, 2016
f34401c
Added tests for RiakMessage error decoding.
lukebakken Apr 20, 2016
572a031
Add test for TTB encoding TsGetReq
lukebakken Apr 20, 2016
b84619d
Add test for TTB encoding TsQueryReq
lukebakken Apr 21, 2016
d55c4ef
Add null cell handling to TTB PUT encoding, fix tuple/list mixup, add…
alexmoore Apr 21, 2016
ae6bbfe
fix example request tuple
alexmoore Apr 21, 2016
30a676f
Add a TsPutReq test
lukebakken Apr 21, 2016
e5e650c
Fix unit tests for TTB codec.
lukebakken Apr 21, 2016
92e31ea
Remove the double-header from TTB data.
lukebakken Apr 21, 2016
ac31017
Fix a few encoding/decoding issues
alexmoore Apr 22, 2016
e90065e
Fix codec to parse QueryResponses correctly, add some error handling …
alexmoore Apr 22, 2016
fe88d2b
Fix a ttb row read bug, change a test to match current behavior
alexmoore Apr 23, 2016
e4ef475
Simplify detection of when to read end of list.
lukebakken Apr 25, 2016
f99977e
Merge branch 'develop' into perf/ttb_encoding/v3
alexmoore Apr 25, 2016
8834f07
Add port option + TS testing to makefile
alexmoore Apr 25, 2016
7243f38
Small Makefile change to only set RIAK_PORT if it isn't set in the env.
lukebakken Apr 25, 2016
f497262
Remainder of TODOs for 611
alexmoore Apr 26, 2016
aa0d7a4
Code cleanup
alexmoore Apr 26, 2016
2bf1d8d
Remove unused case branch
alexmoore Apr 26, 2016
2edbd44
No need to use OtpErlangAtom when write_atom() exists.
lukebakken Apr 26, 2016
dd7f951
Add security test option to makefile
alexmoore Apr 28, 2016
75b92f6
Merge branch 'perf/ttb_encoding/v3' of https://github.com/basho/riak-…
alexmoore Apr 28, 2016
5b092d2
Remove PB objects entirely from TTB TS put path
alexmoore Apr 29, 2016
9ccf8e7
Some small optimizations
alexmoore May 3, 2016
1f353b4
Add test for PB error handling while using TTB operation
alexmoore May 4, 2016
a0beb8e
A few PR cleanup items
alexmoore May 4, 2016
3d7cd4d
The final final
alexmoore May 4, 2016
c748fbd
Merge pull request #617 from basho/perf/ttb_encoding/v4
alexmoore May 4, 2016
450a95e
Dont do error checking while writing TTB messages
alexmoore May 5, 2016
8918c5b
Dont convert to PB and back when creating a row
alexmoore May 5, 2016
09ee18f
Adding single atom response handling
alexmoore May 6, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
.PHONY: compile test unit-test integration-test protogen
.PHONY: compile test unit-test integration-test integration-test-timeseries protogen

RIAK_PORT ?= 8087

compile:
mvn clean compile
Expand All @@ -9,7 +11,13 @@ unit-test:
mvn test

integration-test:
mvn -Pitest,default -Dcom.basho.riak.2i=true -Dcom.basho.riak.yokozuna=true -Dcom.basho.riak.buckettype=true -Dcom.basho.riak.crdt=true -Dcom.basho.riak.lifecycle=true verify
mvn -Pitest,default -Dcom.basho.riak.2i=true -Dcom.basho.riak.yokozuna=true -Dcom.basho.riak.buckettype=true -Dcom.basho.riak.crdt=true -Dcom.basho.riak.lifecycle=true -Dcom.basho.riak.pbcport=$(RIAK_PORT) verify

integration-test-timeseries:
mvn -Pitest,default -Dcom.basho.riak.buckettype=true -Dcom.basho.riak.crdt=true -Dcom.basho.riak.lifecycle=true -Dcom.basho.riak.timeseries=true -Dcom.basho.riak.pbcport=$(RIAK_PORT) verify

integration-test-security:
mvn -Pitest,default -Dcom.basho.riak.security=true -Dcom.basho.riak.security.clientcert=true -Dcom.basho.riak.pbcport=$(RIAK_PORT) test-compile failsafe:integration-test

protogen:
mvn -Pprotobuf-generate generate-sources
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -431,5 +431,10 @@
<artifactId>antlr-complete</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>org.erlang.otp</groupId>
<artifactId>jinterface</artifactId>
<version>1.5.6</version>
</dependency>
</dependencies>
</project>
39 changes: 20 additions & 19 deletions src/main/java/com/basho/riak/client/core/DefaultNodeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* The default {@link NodeManager} used by {@link RiakCluster} if none is
* The default {@link NodeManager} used by {@link RiakCluster} if none is
* specified.
*
* This NodeManager round-robins through a list of {@link RiakNode}s and attempts
* to execute the operation passed to it. If a node reports that it is
* health checking it is removed from the list until it sends an update that it
* is again running. If the selected node cannot accept the operation because all
* connections are in use or it unable to make a new connection, the next node in
* the list is tried until either the operation is accepted or all nodes have
* been tried. If no nodes are able to accept the operation its setException()
*
* This NodeManager round-robins through a list of {@link RiakNode}s and attempts
* to execute the operation passed to it. If a node reports that it is
* health checking it is removed from the list until it sends an update that it
* is again running. If the selected node cannot accept the operation because all
* connections are in use or it unable to make a new connection, the next node in
* the list is tried until either the operation is accepted or all nodes have
* been tried. If no nodes are able to accept the operation its setException()
* method is called with a {@link NoNodesAvailableException}.
*
*
* @author Brian Roach <roach at basho dot com>
* @since 2.0
*/
Expand All @@ -47,7 +48,7 @@ public class DefaultNodeManager implements NodeManager, NodeStateListener
private final AtomicInteger index = new AtomicInteger();
private final Logger logger = LoggerFactory.getLogger(DefaultNodeManager.class);
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

@Override
public void init(List<RiakNode> nodes)
{
Expand All @@ -73,7 +74,7 @@ public boolean executeOnNode(FutureOperation operation, RiakNode previousNode)
{
int startIndex = index.getAndIncrement();
int currentIndex = startIndex;

do
{
if (healthy.get(Math.abs(currentIndex % healthy.size())).execute(operation))
Expand All @@ -89,15 +90,15 @@ else if (healthy.size() == 1)
{
executed = healthy.get(0).execute(operation);
}

return executed;
}
finally
{
lock.readLock().unlock();
}
}

@Override
public void nodeStateChanged(RiakNode node, State state)
{
Expand All @@ -110,7 +111,7 @@ public void nodeStateChanged(RiakNode node, State state)
if (unhealthy.remove(node))
{
healthy.add(node);
logger.info("NodeManager moved node to healthy list; {}:{}",
logger.info("NodeManager moved node to healthy list; {}:{}",
node.getRemoteAddress(), node.getPort());
}
}
Expand All @@ -126,7 +127,7 @@ public void nodeStateChanged(RiakNode node, State state)
if (healthy.remove(node))
{
unhealthy.add(node);
logger.info("NodeManager moved node to unhealthy list; {}:{}",
logger.info("NodeManager moved node to unhealthy list; {}:{}",
node.getRemoteAddress(), node.getPort());
}
}
Expand Down Expand Up @@ -174,7 +175,7 @@ public void addNode(RiakNode newNode)
{
lock.writeLock().unlock();
}

}

@Override
Expand All @@ -194,12 +195,12 @@ public boolean removeNode(RiakNode node)
{
lock.writeLock().unlock();
}

if (removed)
{
node.removeStateListener(this);
node.shutdown();
logger.info("NodeManager removed and shutdown node; {}:{}",
logger.info("NodeManager removed and shutdown node; {}:{}",
node.getRemoteAddress(), node.getPort());
}
return removed;
Expand Down
54 changes: 40 additions & 14 deletions src/main/java/com/basho/riak/client/core/FutureOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
package com.basho.riak.client.core;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
Expand All @@ -28,6 +25,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Brian Roach <roach at basho dot com>
Expand Down Expand Up @@ -262,7 +261,7 @@ synchronized final void setException(Throwable t)

public synchronized final Object channelMessage()
{
Object message = createChannelMessage();
final Object message = createChannelMessage();
state = State.WRITTEN;
return message;
}
Expand Down Expand Up @@ -309,17 +308,35 @@ public final T get() throws InterruptedException, ExecutionException
{
latch.await();

throwExceptionIfSet();

if (null == converted)
{
tryConvertResponse();
}

return converted;
}

private void throwExceptionIfSet() throws ExecutionException
{
if (exception != null)
{
throw new ExecutionException(exception);
}
else if (null == converted)
}

private void tryConvertResponse() throws ExecutionException
{
try
{
converted = convert(rawResponse);

}

return converted;
catch(IllegalArgumentException ex)
{
exception = ex;
throwExceptionIfSet();
}
}

@Override
Expand All @@ -331,13 +348,12 @@ public final T get(long timeout, TimeUnit unit) throws InterruptedException, Exe
{
throw new TimeoutException();
}
else if (exception != null)
{
throw new ExecutionException(exception);
}
else if (null == converted)

throwExceptionIfSet();

if (null == converted)
{
converted = convert(rawResponse);
tryConvertResponse();
}

return converted;
Expand Down Expand Up @@ -373,6 +389,16 @@ public final void await(long timeout, TimeUnit unit) throws InterruptedException
latch.await(timeout, unit);
}

protected U checkAndGetSingleResponse(List<U> responses)
{
if (responses.size() > 1)
{
LoggerFactory.getLogger(this.getClass()).error("Received {} responses when only one was expected.",
responses.size());
}

return responses.get(0);
}

private void stateCheck(State... allowedStates)
{
Expand Down
Loading