diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java index 322ac6ae06..ebe8164c3d 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java @@ -74,6 +74,14 @@ public interface ActorSystemManager { * Publish message to all topic subscribers in all regions. */ void publishToAllRegions( String topic, Object message, ActorRef sender ); + /** + * Publish message to all topic subscribers in local region only. + */ + void publishToLocalRegion( String topic, Object message, ActorRef sender ); + /** + * Publish message to all topic subscribers in remote regions only. + */ + void publishToRemoteRegions( String topic, Object message, ActorRef sender ); void leaveCluster(); } diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index ea9ada84bd..27c90cce42 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -152,9 +152,21 @@ public String getCurrentRegion() { @Override public void publishToAllRegions( String topic, Object message, ActorRef sender ) { + publishToLocalRegion(topic, message, sender); + publishToRemoteRegions(topic, message, sender); + } + + @Override + public void publishToLocalRegion( String topic, Object message, ActorRef sender ) { + // send to local subscribers to topic mediator.tell( new DistributedPubSubMediator.Publish( topic, message ), sender ); + } + + @Override + public void publishToRemoteRegions( String topic, Object message, ActorRef sender ) { + // send to each ClusterClient for ( ActorRef clusterClient : clusterClientsByRegion.values() ) { clusterClient.tell( new ClusterClient.Publish( topic, message ), sender ); @@ -423,7 +435,7 @@ private void createClientActors( ActorSystem system ) { } ActorRef clusterClient = system.actorOf( ClusterClient.props( - ClusterClientSettings.create(system).withInitialContacts( seedPaths )), "client"); + ClusterClientSettings.create(system).withInitialContacts( seedPaths )), "client-"+region); clusterClientsByRegion.put( region, clusterClient ); } diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java index 93b6ddb39c..5a71305f3f 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java @@ -38,14 +38,17 @@ public class UniqueValueActor extends UntypedActor { private final ActorSystemManager actorSystemManager; private final UniqueValuesTable table; + + private final UniqueValuesFig uniqueValuesFig; private int count = 0; @Inject - public UniqueValueActor( UniqueValuesTable table, ActorSystemManager actorSystemManager ) { + public UniqueValueActor( UniqueValuesTable table, ActorSystemManager actorSystemManager, UniqueValuesFig uniqueValuesFig) { - this.table = table; + this.uniqueValuesFig = uniqueValuesFig; + this.table = table; this.actorSystemManager = actorSystemManager; } @@ -86,8 +89,12 @@ public void onReceive(Object message) { getSender().tell( new Response( Response.Status.IS_UNIQUE, res.getConsistentHashKey() ), getSender() ); - - actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() ); + + if(uniqueValuesFig.getSkipRemoteRegions()) { + actorSystemManager.publishToLocalRegion( "content", new Reservation( res ), getSelf() ); + } else { + actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() ); + } } catch (Throwable t) { @@ -111,14 +118,24 @@ public void onReceive(Object message) { // cannot reserve, somebody else owns the unique value Response response = new Response( Response.Status.NOT_UNIQUE, con.getConsistentHashKey()); getSender().tell( response, getSender() ); - actorSystemManager.publishToAllRegions( "content", response, getSelf() ); + if(uniqueValuesFig.getSkipRemoteRegions()) { + actorSystemManager.publishToLocalRegion( "content", response, getSelf() ); + } else { + actorSystemManager.publishToAllRegions( "content", response, getSelf() ); + } return; } else if ( owner == null ) { // cannot commit without first reserving Response response = new Response( Response.Status.BAD_REQUEST, con.getConsistentHashKey()); getSender().tell( response, getSender() ); - actorSystemManager.publishToAllRegions( "content", response, getSelf() ); + + if(uniqueValuesFig.getSkipRemoteRegions()) { + actorSystemManager.publishToLocalRegion( "content", response, getSelf() ); + } else { + actorSystemManager.publishToAllRegions( "content", response, getSelf() ); + } + return; } @@ -127,7 +144,11 @@ public void onReceive(Object message) { Response response = new Response( Response.Status.IS_UNIQUE, con.getConsistentHashKey() ); getSender().tell( response, getSender() ); - actorSystemManager.publishToAllRegions( "content", response, getSelf() ); + if(uniqueValuesFig.getSkipRemoteRegions()) { + actorSystemManager.publishToLocalRegion( "content", response, getSelf() ); + } else { + actorSystemManager.publishToAllRegions( "content", response, getSelf() ); + } } catch (Throwable t) { getSender().tell( new Response( Response.Status.ERROR, con.getConsistentHashKey() ), @@ -158,7 +179,11 @@ public void onReceive(Object message) { getSender() ); // unique value record may have already been cleaned up, also clear cache - actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() ); + if(uniqueValuesFig.getSkipRemoteRegions()) { + actorSystemManager.publishToLocalRegion( "content", new Cancellation( can ), getSelf() ); + } else { + actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() ); + } return; } @@ -170,7 +195,11 @@ public void onReceive(Object message) { getSender().tell( new Response( Response.Status.SUCCESS, can.getConsistentHashKey() ), getSender() ); - actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() ); + if(uniqueValuesFig.getSkipRemoteRegions()) { + actorSystemManager.publishToLocalRegion( "content", new Cancellation( can ), getSelf() ); + } else { + actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() ); + } } catch (Throwable t) { getSender().tell( new Response( Response.Status.ERROR, can.getConsistentHashKey() ), diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java index 0134779e26..ab0f4a437a 100644 --- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java +++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java @@ -29,6 +29,8 @@ public interface UniqueValuesFig extends GuicyFig, Serializable { String UNIQUEVALUE_USE_CLUSTER = "collection.uniquevalues.usecluster"; + + String UNIQUEVALUE_SKIP_REMOTE_REGIONS = "collection.uniquevalues.skip.remote.regions"; String UNIQUEVALUE_ACTORS = "collection.uniquevalues.actors"; @@ -41,14 +43,25 @@ public interface UniqueValuesFig extends GuicyFig, Serializable { String UNIQUEVALUE_REQUEST_TIMEOUT = "collection.uniquevalues.request.timeout"; String UNIQUEVALUE_REQUEST_RETRY_COUNT = "collection.uniquevalues.request.retrycount"; + + /** - * Tells Usergrid whether or not to use the Akka Cluster sytem to verify unique values ( more consistent) + * Tells Usergrid whether or not to use the Akka Cluster system to verify unique values ( more consistent) + * Setting this to false by default to avoid extra complications by default. */ @Key(UNIQUEVALUE_USE_CLUSTER) - @Default("true") + @Default("false") boolean getUnqiueValueViaCluster(); + + /** + * Tells Usergrid to restrict UniqueValue related chatter to local Akka Cluster only. Skips remote regions + * Setting this to true by default to avoid extra complications by default. + */ + @Key(UNIQUEVALUE_SKIP_REMOTE_REGIONS) + @Default("true") + boolean getSkipRemoteRegions(); /** * Unique Value cache TTL in seconds.