From 95ea70fd0d3b6ff25ea5ee43f89bdb9306d62c4f Mon Sep 17 00:00:00 2001 From: Steve Gury Date: Tue, 12 Jul 2016 16:00:58 -0700 Subject: [PATCH] Remove reactivesocket-local-transport ***Problem*** Nobody is using this code, and I start to be tired of maintaining it. ***Solution*** Remove it, git keep the code in its history if we need it. --- reactivesocket-transport-local/build.gradle | 19 -- .../local/LocalClientDuplexConnection.java | 107 ---------- .../LocalClientReactiveSocketConnector.java | 71 ------- .../local/LocalReactiveSocketManager.java | 54 ----- .../local/LocalServerDuplexConection.java | 108 ---------- .../LocalServerReactiveSocketConnector.java | 64 ------ .../local/ClientServerTest.java | 185 ------------------ settings.gradle | 1 - 8 files changed, 609 deletions(-) delete mode 100644 reactivesocket-transport-local/build.gradle delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java delete mode 100644 reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java delete mode 100644 reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java diff --git a/reactivesocket-transport-local/build.gradle b/reactivesocket-transport-local/build.gradle deleted file mode 100644 index 1c943599d..000000000 --- a/reactivesocket-transport-local/build.gradle +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2016 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -dependencies { - compile project(':reactivesocket-transport-tcp') - compile project(':reactivesocket-core') - - testCompile project(':reactivesocket-test') -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java deleted file mode 100644 index 86855f9cc..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientDuplexConnection.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.internal.EmptySubject; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.concurrent.CopyOnWriteArrayList; - -class LocalClientDuplexConnection implements DuplexConnection { - private final String name; - - private final CopyOnWriteArrayList> subjects; - private final EmptySubject closeSubject = new EmptySubject(); - - public LocalClientDuplexConnection(String name) { - this.name = name; - this.subjects = new CopyOnWriteArrayList<>(); - } - - @Override - public Observable getInput() { - return o -> { - o.onSubscribe(() -> subjects.removeIf(s -> s == o)); - subjects.add(o); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - o - .subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Frame frame) { - try { - LocalReactiveSocketManager - .getInstance() - .getServerConnection(name) - .write(frame); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable t) { - callback.error(t); - } - - @Override - public void onComplete() { - callback.success(); - } - }); - } - - @Override - public double availability() { - return 1.0; - } - - void write(Frame frame) { - subjects - .forEach(o -> o.onNext(frame)); - } - - @Override - public Publisher close() { - return s -> { - LocalReactiveSocketManager - .getInstance() - .removeClientConnection(name); - closeSubject.subscribe(s); - }; - } - - @Override - public Publisher onClose() { - return closeSubject; - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java deleted file mode 100644 index 43740f4b1..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalClientReactiveSocketConnector.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.*; -import io.reactivesocket.internal.rx.EmptySubscription; -import org.reactivestreams.Publisher; - -public class LocalClientReactiveSocketConnector implements ReactiveSocketConnector { - public static final LocalClientReactiveSocketConnector INSTANCE = new LocalClientReactiveSocketConnector(); - - private LocalClientReactiveSocketConnector() {} - - @Override - public Publisher connect(Config config) { - return s -> { - try { - s.onSubscribe(EmptySubscription.INSTANCE); - LocalClientDuplexConnection clientConnection = LocalReactiveSocketManager - .getInstance() - .getClientConnection(config.getName()); - ReactiveSocket reactiveSocket = DefaultReactiveSocket - .fromClientConnection(clientConnection, ConnectionSetupPayload.create(config.getMetadataMimeType(), config.getDataMimeType())); - - reactiveSocket.startAndWait(); - - s.onNext(reactiveSocket); - s.onComplete(); - } catch (Throwable t) { - s.onError(t); - } - }; - } - - public static class Config { - final String name; - final String metadataMimeType; - final String dataMimeType; - - public Config(String name, String metadataMimeType, String dataMimeType) { - this.name = name; - this.metadataMimeType = metadataMimeType; - this.dataMimeType = dataMimeType; - } - - public String getName() { - return name; - } - - public String getMetadataMimeType() { - return metadataMimeType; - } - - public String getDataMimeType() { - return dataMimeType; - } - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java deleted file mode 100644 index 60d246266..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalReactiveSocketManager.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * Created by rroeser on 4/2/16. - */ -class LocalReactiveSocketManager { - private static final LocalReactiveSocketManager INSTANCE = new LocalReactiveSocketManager(); - - private final ConcurrentHashMap serverConnections; - private final ConcurrentHashMap clientConnections; - - private LocalReactiveSocketManager() { - serverConnections = new ConcurrentHashMap<>(); - clientConnections = new ConcurrentHashMap<>(); - } - - public static LocalReactiveSocketManager getInstance() { - return INSTANCE; - } - - public LocalClientDuplexConnection getClientConnection(String name) { - return clientConnections.computeIfAbsent(name, LocalClientDuplexConnection::new); - } - - public void removeClientConnection(String name) { - clientConnections.remove(name); - } - - public LocalServerDuplexConection getServerConnection(String name) { - return serverConnections.computeIfAbsent(name, LocalServerDuplexConection::new); - } - - public void removeServerDuplexConnection(String name) { - serverConnections.remove(name); - } - -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java deleted file mode 100644 index 9a3dde4d0..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerDuplexConection.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.internal.EmptySubject; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import io.reactivesocket.rx.Observer; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.concurrent.CopyOnWriteArrayList; - -class LocalServerDuplexConection implements DuplexConnection { - private final String name; - - private final CopyOnWriteArrayList> subjects; - private final EmptySubject closeSubject = new EmptySubject(); - - public LocalServerDuplexConection(String name) { - this.name = name; - this.subjects = new CopyOnWriteArrayList<>(); - } - - @Override - public Observable getInput() { - return o -> { - o.onSubscribe(() -> subjects.removeIf(s -> s == o)); - subjects.add(o); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - o - .subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Frame frame) { - try { - LocalReactiveSocketManager - .getInstance() - .getClientConnection(name) - .write(frame); - } catch (Throwable t) { - onError(t); - } - } - - @Override - public void onError(Throwable t) { - callback.error(t); - } - - @Override - public void onComplete() { - callback.success(); - } - }); - } - - @Override - public double availability() { - return 1.0; - } - - void write(Frame frame) { - subjects - .forEach(o -> o.onNext(frame)); - } - - @Override - public Publisher close() { - return s -> { - LocalReactiveSocketManager - .getInstance() - .removeServerDuplexConnection(name); - s.onComplete(); - closeSubject.onComplete(); - }; - } - - @Override - public Publisher onClose() { - return closeSubject; - } -} diff --git a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java b/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java deleted file mode 100644 index 58ad1d62b..000000000 --- a/reactivesocket-transport-local/src/main/java/io/reactivesocket/local/LocalServerReactiveSocketConnector.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.*; -import io.reactivesocket.internal.rx.EmptySubscription; -import org.reactivestreams.Publisher; - -public class LocalServerReactiveSocketConnector implements ReactiveSocketConnector { - public static final LocalServerReactiveSocketConnector INSTANCE = new LocalServerReactiveSocketConnector(); - - private LocalServerReactiveSocketConnector() {} - - @Override - public Publisher connect(Config config) { - return s -> { - try { - s.onSubscribe(EmptySubscription.INSTANCE); - LocalServerDuplexConection clientConnection = LocalReactiveSocketManager - .getInstance() - .getServerConnection(config.getName()); - ReactiveSocket reactiveSocket = DefaultReactiveSocket - .fromServerConnection(clientConnection, config.getConnectionSetupHandler()); - - reactiveSocket.startAndWait(); - s.onNext(reactiveSocket); - s.onComplete(); - } catch (Throwable t) { - s.onError(t); - } - }; - } - - public static class Config { - final String name; - final ConnectionSetupHandler connectionSetupHandler; - - public Config(String name, ConnectionSetupHandler connectionSetupHandler) { - this.name = name; - this.connectionSetupHandler = connectionSetupHandler; - } - - public ConnectionSetupHandler getConnectionSetupHandler() { - return connectionSetupHandler; - } - - public String getName() { - return name; - } - } -} diff --git a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java b/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java deleted file mode 100644 index a62871e8f..000000000 --- a/reactivesocket-transport-local/src/test/java/io/reactivesocket/local/ClientServerTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.local; - -import io.reactivesocket.ConnectionSetupHandler; -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.exceptions.SetupException; -import io.reactivesocket.test.TestUtil; -import org.junit.BeforeClass; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import rx.Observable; -import rx.RxReactiveStreams; -import rx.observers.TestSubscriber; - -import java.util.concurrent.TimeUnit; - -import static io.reactivesocket.util.Unsafe.toSingleFuture; - -public class ClientServerTest { - - static ReactiveSocket client; - - static ReactiveSocket server; - - @BeforeClass - public static void setup() throws Exception { - LocalServerReactiveSocketConnector.Config serverConfig = new LocalServerReactiveSocketConnector.Config("test", new ConnectionSetupHandler() { - @Override - public RequestHandler apply(ConnectionSetupPayload setupPayload, ReactiveSocket rs) throws SetupException { - return new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return s -> { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - s.onNext(response); - s.onComplete(); - }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response) - .repeat()); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - return null; - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } - }; - } - }); - - server = toSingleFuture(LocalServerReactiveSocketConnector.INSTANCE.connect(serverConfig)).get(5, TimeUnit.SECONDS); - - LocalClientReactiveSocketConnector.Config clientConfig = new LocalClientReactiveSocketConnector.Config("test", "text", "text"); - client = toSingleFuture(LocalClientReactiveSocketConnector.INSTANCE.connect(clientConfig)).get(5, TimeUnit.SECONDS);; - } - - @Test - public void testRequestResponse1() { - requestResponseN(1500, 1); - } - - @Test - public void testRequestResponse10() { - requestResponseN(1500, 10); - } - - - @Test - public void testRequestResponse100() { - requestResponseN(1500, 100); - } - - @Test - public void testRequestResponse10_000() { - requestResponseN(60_000, 10_000); - } - - - @Test - public void testRequestResponse100_000() { - requestResponseN(60_000, 10_000); - } - @Test - public void testRequestResponse1_000_000() { - requestResponseN(60_000, 10_000); - } - - @Test - public void testRequestStream() { - TestSubscriber ts = TestSubscriber.create(); - - RxReactiveStreams - .toObservable(client.requestStream(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .subscribe(ts); - - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - @Test - public void testRequestSubscription() throws InterruptedException { - TestSubscriber ts = TestSubscriber.create(); - - RxReactiveStreams - .toObservable(client.requestSubscription(TestUtil.utf8EncodedPayload("hello sub", "metadata sub"))) - .take(10) - .subscribe(ts); - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - } - - - public void requestResponseN(int timeout, int count) { - - TestSubscriber ts = TestSubscriber.create(); - - Observable - .range(1, count) - .flatMap(i -> - RxReactiveStreams - .toObservable(client.requestResponse(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .map(payload -> TestUtil.byteToString(payload.getData())) - ) - .doOnError(Throwable::printStackTrace) - .subscribe(ts); - - ts.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); - ts.assertValueCount(count); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - -} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 14e459afc..1411e1476 100644 --- a/settings.gradle +++ b/settings.gradle @@ -7,6 +7,5 @@ include 'reactivesocket-mime-types' include 'reactivesocket-stats-servo' include 'reactivesocket-test' include 'reactivesocket-transport-aeron' -include 'reactivesocket-transport-local' include 'reactivesocket-transport-tcp' include 'reactivesocket-transport-websocket'