diff --git a/reactivesocket-transport-local/build.gradle b/reactivesocket-transport-local/build.gradle
deleted file mode 100644
index 1c943599d..000000000
--- a/reactivesocket-transport-local/build.gradle
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-dependencies {
- compile project(':reactivesocket-transport-tcp')
- compile project(':reactivesocket-core')
-
- testCompile project(':reactivesocket-test')
-}
diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java
deleted file mode 100644
index 86855f9cc..000000000
--- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.reactivesocket.local;
-
-import io.reactivesocket.DuplexConnection;
-import io.reactivesocket.Frame;
-import io.reactivesocket.internal.EmptySubject;
-import io.reactivesocket.rx.Completable;
-import io.reactivesocket.rx.Observable;
-import io.reactivesocket.rx.Observer;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.util.concurrent.CopyOnWriteArrayList;
-
-class LocalClientDuplexConnection implements DuplexConnection {
- private final String name;
-
- private final CopyOnWriteArrayList> subjects;
- private final EmptySubject closeSubject = new EmptySubject();
-
- public LocalClientDuplexConnection(String name) {
- this.name = name;
- this.subjects = new CopyOnWriteArrayList<>();
- }
-
- @Override
- public Observable getInput() {
- return o -> {
- o.onSubscribe(() -> subjects.removeIf(s -> s == o));
- subjects.add(o);
- };
- }
-
- @Override
- public void addOutput(Publisher o, Completable callback) {
- o
- .subscribe(new Subscriber() {
-
- @Override
- public void onSubscribe(Subscription s) {
- s.request(Long.MAX_VALUE);
- }
-
- @Override
- public void onNext(Frame frame) {
- try {
- LocalReactiveSocketManager
- .getInstance()
- .getServerConnection(name)
- .write(frame);
- } catch (Throwable t) {
- onError(t);
- }
- }
-
- @Override
- public void onError(Throwable t) {
- callback.error(t);
- }
-
- @Override
- public void onComplete() {
- callback.success();
- }
- });
- }
-
- @Override
- public double availability() {
- return 1.0;
- }
-
- void write(Frame frame) {
- subjects
- .forEach(o -> o.onNext(frame));
- }
-
- @Override
- public Publisher close() {
- return s -> {
- LocalReactiveSocketManager
- .getInstance()
- .removeClientConnection(name);
- closeSubject.subscribe(s);
- };
- }
-
- @Override
- public Publisher onClose() {
- return closeSubject;
- }
-}
diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java
deleted file mode 100644
index 43740f4b1..000000000
--- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.reactivesocket.local;
-
-import io.reactivesocket.*;
-import io.reactivesocket.internal.rx.EmptySubscription;
-import org.reactivestreams.Publisher;
-
-public class LocalClientReactiveSocketConnector implements ReactiveSocketConnector {
- public static final LocalClientReactiveSocketConnector INSTANCE = new LocalClientReactiveSocketConnector();
-
- private LocalClientReactiveSocketConnector() {}
-
- @Override
- public Publisher connect(Config config) {
- return s -> {
- try {
- s.onSubscribe(EmptySubscription.INSTANCE);
- LocalClientDuplexConnection clientConnection = LocalReactiveSocketManager
- .getInstance()
- .getClientConnection(config.getName());
- ReactiveSocket reactiveSocket = DefaultReactiveSocket
- .fromClientConnection(clientConnection, ConnectionSetupPayload.create(config.getMetadataMimeType(), config.getDataMimeType()));
-
- reactiveSocket.startAndWait();
-
- s.onNext(reactiveSocket);
- s.onComplete();
- } catch (Throwable t) {
- s.onError(t);
- }
- };
- }
-
- public static class Config {
- final String name;
- final String metadataMimeType;
- final String dataMimeType;
-
- public Config(String name, String metadataMimeType, String dataMimeType) {
- this.name = name;
- this.metadataMimeType = metadataMimeType;
- this.dataMimeType = dataMimeType;
- }
-
- public String getName() {
- return name;
- }
-
- public String getMetadataMimeType() {
- return metadataMimeType;
- }
-
- public String getDataMimeType() {
- return dataMimeType;
- }
- }
-}
diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java
deleted file mode 100644
index 60d246266..000000000
--- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.reactivesocket.local;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Created by rroeser on 4/2/16.
- */
-class LocalReactiveSocketManager {
- private static final LocalReactiveSocketManager INSTANCE = new LocalReactiveSocketManager();
-
- private final ConcurrentHashMap serverConnections;
- private final ConcurrentHashMap clientConnections;
-
- private LocalReactiveSocketManager() {
- serverConnections = new ConcurrentHashMap<>();
- clientConnections = new ConcurrentHashMap<>();
- }
-
- public static LocalReactiveSocketManager getInstance() {
- return INSTANCE;
- }
-
- public LocalClientDuplexConnection getClientConnection(String name) {
- return clientConnections.computeIfAbsent(name, LocalClientDuplexConnection::new);
- }
-
- public void removeClientConnection(String name) {
- clientConnections.remove(name);
- }
-
- public LocalServerDuplexConection getServerConnection(String name) {
- return serverConnections.computeIfAbsent(name, LocalServerDuplexConection::new);
- }
-
- public void removeServerDuplexConnection(String name) {
- serverConnections.remove(name);
- }
-
-}
diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java
deleted file mode 100644
index 9a3dde4d0..000000000
--- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.reactivesocket.local;
-
-import io.reactivesocket.DuplexConnection;
-import io.reactivesocket.Frame;
-import io.reactivesocket.internal.EmptySubject;
-import io.reactivesocket.rx.Completable;
-import io.reactivesocket.rx.Observable;
-import io.reactivesocket.rx.Observer;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.util.concurrent.CopyOnWriteArrayList;
-
-class LocalServerDuplexConection implements DuplexConnection {
- private final String name;
-
- private final CopyOnWriteArrayList> subjects;
- private final EmptySubject closeSubject = new EmptySubject();
-
- public LocalServerDuplexConection(String name) {
- this.name = name;
- this.subjects = new CopyOnWriteArrayList<>();
- }
-
- @Override
- public Observable getInput() {
- return o -> {
- o.onSubscribe(() -> subjects.removeIf(s -> s == o));
- subjects.add(o);
- };
- }
-
- @Override
- public void addOutput(Publisher o, Completable callback) {
- o
- .subscribe(new Subscriber() {
-
- @Override
- public void onSubscribe(Subscription s) {
- s.request(Long.MAX_VALUE);
- }
-
- @Override
- public void onNext(Frame frame) {
- try {
- LocalReactiveSocketManager
- .getInstance()
- .getClientConnection(name)
- .write(frame);
- } catch (Throwable t) {
- onError(t);
- }
- }
-
- @Override
- public void onError(Throwable t) {
- callback.error(t);
- }
-
- @Override
- public void onComplete() {
- callback.success();
- }
- });
- }
-
- @Override
- public double availability() {
- return 1.0;
- }
-
- void write(Frame frame) {
- subjects
- .forEach(o -> o.onNext(frame));
- }
-
- @Override
- public Publisher close() {
- return s -> {
- LocalReactiveSocketManager
- .getInstance()
- .removeServerDuplexConnection(name);
- s.onComplete();
- closeSubject.onComplete();
- };
- }
-
- @Override
- public Publisher onClose() {
- return closeSubject;
- }
-}
diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java
deleted file mode 100644
index 58ad1d62b..000000000
--- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.reactivesocket.local;
-
-import io.reactivesocket.*;
-import io.reactivesocket.internal.rx.EmptySubscription;
-import org.reactivestreams.Publisher;
-
-public class LocalServerReactiveSocketConnector implements ReactiveSocketConnector {
- public static final LocalServerReactiveSocketConnector INSTANCE = new LocalServerReactiveSocketConnector();
-
- private LocalServerReactiveSocketConnector() {}
-
- @Override
- public Publisher connect(Config config) {
- return s -> {
- try {
- s.onSubscribe(EmptySubscription.INSTANCE);
- LocalServerDuplexConection clientConnection = LocalReactiveSocketManager
- .getInstance()
- .getServerConnection(config.getName());
- ReactiveSocket reactiveSocket = DefaultReactiveSocket
- .fromServerConnection(clientConnection, config.getConnectionSetupHandler());
-
- reactiveSocket.startAndWait();
- s.onNext(reactiveSocket);
- s.onComplete();
- } catch (Throwable t) {
- s.onError(t);
- }
- };
- }
-
- public static class Config {
- final String name;
- final ConnectionSetupHandler connectionSetupHandler;
-
- public Config(String name, ConnectionSetupHandler connectionSetupHandler) {
- this.name = name;
- this.connectionSetupHandler = connectionSetupHandler;
- }
-
- public ConnectionSetupHandler getConnectionSetupHandler() {
- return connectionSetupHandler;
- }
-
- public String getName() {
- return name;
- }
- }
-}
diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java
deleted file mode 100644
index a62871e8f..000000000
--- a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Copyright 2015 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.reactivesocket.local;
-
-import io.reactivesocket.ConnectionSetupHandler;
-import io.reactivesocket.ConnectionSetupPayload;
-import io.reactivesocket.Payload;
-import io.reactivesocket.ReactiveSocket;
-import io.reactivesocket.RequestHandler;
-import io.reactivesocket.exceptions.SetupException;
-import io.reactivesocket.test.TestUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import rx.Observable;
-import rx.RxReactiveStreams;
-import rx.observers.TestSubscriber;
-
-import java.util.concurrent.TimeUnit;
-
-import static io.reactivesocket.util.Unsafe.toSingleFuture;
-
-public class ClientServerTest {
-
- static ReactiveSocket client;
-
- static ReactiveSocket server;
-
- @BeforeClass
- public static void setup() throws Exception {
- LocalServerReactiveSocketConnector.Config serverConfig = new LocalServerReactiveSocketConnector.Config("test", new ConnectionSetupHandler() {
- @Override
- public RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket rs) throws SetupException {
- return new RequestHandler() {
- @Override
- public Publisher handleRequestResponse(Payload payload) {
- return s -> {
- Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
- s.onNext(response);
- s.onComplete();
- };
- }
-
- @Override
- public Publisher handleRequestStream(Payload payload) {
- Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
-
- return RxReactiveStreams
- .toPublisher(Observable
- .range(1, 10)
- .map(i -> response));
- }
-
- @Override
- public Publisher handleSubscription(Payload payload) {
- Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
-
- return RxReactiveStreams
- .toPublisher(Observable
- .range(1, 10)
- .map(i -> response)
- .repeat());
- }
-
- @Override
- public Publisher handleFireAndForget(Payload payload) {
- return Subscriber::onComplete;
- }
-
- @Override
- public Publisher handleChannel(Payload initialPayload, Publisher inputs) {
- return null;
- }
-
- @Override
- public Publisher handleMetadataPush(Payload payload) {
- return null;
- }
- };
- }
- });
-
- server = toSingleFuture(LocalServerReactiveSocketConnector.INSTANCE.connect(serverConfig)).get(5, TimeUnit.SECONDS);
-
- LocalClientReactiveSocketConnector.Config clientConfig = new LocalClientReactiveSocketConnector.Config("test", "text", "text");
- client = toSingleFuture(LocalClientReactiveSocketConnector.INSTANCE.connect(clientConfig)).get(5, TimeUnit.SECONDS);;
- }
-
- @Test
- public void testRequestResponse1() {
- requestResponseN(1500, 1);
- }
-
- @Test
- public void testRequestResponse10() {
- requestResponseN(1500, 10);
- }
-
-
- @Test
- public void testRequestResponse100() {
- requestResponseN(1500, 100);
- }
-
- @Test
- public void testRequestResponse10_000() {
- requestResponseN(60_000, 10_000);
- }
-
-
- @Test
- public void testRequestResponse100_000() {
- requestResponseN(60_000, 10_000);
- }
- @Test
- public void testRequestResponse1_000_000() {
- requestResponseN(60_000, 10_000);
- }
-
- @Test
- public void testRequestStream() {
- TestSubscriber ts = TestSubscriber.create();
-
- RxReactiveStreams
- .toObservable(client.requestStream(TestUtil.utf8EncodedPayload("hello", "metadata")))
- .subscribe(ts);
-
-
- ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS);
- ts.assertValueCount(10);
- ts.assertNoErrors();
- ts.assertCompleted();
- }
-
- @Test
- public void testRequestSubscription() throws InterruptedException {
- TestSubscriber ts = TestSubscriber.create();
-
- RxReactiveStreams
- .toObservable(client.requestSubscription(TestUtil.utf8EncodedPayload("hello sub", "metadata sub")))
- .take(10)
- .subscribe(ts);
-
- ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS);
- ts.assertValueCount(10);
- ts.assertNoErrors();
- }
-
-
- public void requestResponseN(int timeout, int count) {
-
- TestSubscriber ts = TestSubscriber.create();
-
- Observable
- .range(1, count)
- .flatMap(i ->
- RxReactiveStreams
- .toObservable(client.requestResponse(TestUtil.utf8EncodedPayload("hello", "metadata")))
- .map(payload -> TestUtil.byteToString(payload.getData()))
- )
- .doOnError(Throwable::printStackTrace)
- .subscribe(ts);
-
- ts.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS);
- ts.assertValueCount(count);
- ts.assertNoErrors();
- ts.assertCompleted();
- }
-
-
-}
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 14e459afc..1411e1476 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -7,6 +7,5 @@ include 'reactivesocket-mime-types'
include 'reactivesocket-stats-servo'
include 'reactivesocket-test'
include 'reactivesocket-transport-aeron'
-include 'reactivesocket-transport-local'
include 'reactivesocket-transport-tcp'
include 'reactivesocket-transport-websocket'