From 345edefeb257ee319d8594334348a3c3294f19c3 Mon Sep 17 00:00:00 2001 From: Alex Danilenko Date: Sat, 5 Nov 2022 21:46:52 +0200 Subject: [PATCH] server events --- core/src/main/java/spotty/server/Server.java | 44 ++++--- .../spotty/server/connection/Connection.java | 113 +++++++++++++----- .../spotty/server/event/ServerEvents.java | 41 +++++++ .../server/connection/ConnectionTest.groovy | 85 ++++++++----- 4 files changed, 206 insertions(+), 77 deletions(-) create mode 100644 core/src/main/java/spotty/server/event/ServerEvents.java diff --git a/core/src/main/java/spotty/server/Server.java b/core/src/main/java/spotty/server/Server.java index e501a9c..61dfecf 100644 --- a/core/src/main/java/spotty/server/Server.java +++ b/core/src/main/java/spotty/server/Server.java @@ -6,6 +6,7 @@ import spotty.server.connection.Connection; import spotty.server.connection.socket.SocketFactory; import spotty.server.connection.socket.SpottySocket; +import spotty.server.event.ServerEvents; import spotty.server.handler.request.RequestHandler; import spotty.server.registry.exception.ExceptionHandlerRegistry; import spotty.server.worker.ReactorWorker; @@ -24,20 +25,18 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.security.KeyStore; -import java.util.Iterator; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import static java.nio.channels.SelectionKey.OP_ACCEPT; import static java.nio.channels.SelectionKey.OP_CONNECT; import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; -import static spotty.common.utils.ThreadUtils.threadPool; import static spotty.common.validation.Validation.isNotBlank; import static spotty.common.validation.Validation.notBlank; import static spotty.common.validation.Validation.notNull; import static spotty.common.validation.Validation.validate; +import static spotty.server.connection.Connection.Builder.connection; import static spotty.server.connection.state.ConnectionState.CLOSED; import static spotty.server.connection.state.ConnectionState.DATA_REMAINING; import static spotty.server.connection.state.ConnectionState.INITIALIZED; @@ -48,8 +47,6 @@ public final class Server implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(Server.class); - private final ExecutorService SERVER_RUN = Executors.newSingleThreadExecutor(threadPool("spotty-main", false)); - private volatile boolean running = false; private volatile boolean started = false; private volatile boolean enabledHttps = false; @@ -57,11 +54,12 @@ public final class Server implements Closeable { private final AtomicInteger connections = new AtomicInteger(); private final SocketFactory socketFactory = new SocketFactory(); + private final ServerEvents serverEvents = new ServerEvents(); private final int maxRequestBodySize; private final RequestHandler requestHandler; private final ExceptionHandlerRegistry exceptionHandlerRegistry; - private final ReactorWorker reactorWorker ; + private final ReactorWorker reactorWorker; private final InetSocketAddress socketAddress; public Server(int port, int maxRequestBodySize, RequestHandler requestHandler, ExceptionHandlerRegistry exceptionHandlerRegistry, ReactorWorker reactorWorker) { @@ -80,7 +78,9 @@ public synchronized void start() { return; } - SERVER_RUN.execute(this::serverInit); + final Thread main = new Thread(this::serverRun, "spotty-main"); + main.setDaemon(false); + main.start(); } public void enableHttps(String keyStorePath, String keyStorePassword, String trustStorePath, String trustStorePassword) { @@ -130,7 +130,6 @@ public void enableHttps(String keyStorePath, String keyStorePassword, String tru @Override public synchronized void close() { stop(); - SERVER_RUN.shutdownNow(); reactorWorker.close(); } @@ -186,7 +185,7 @@ public String hostUrl() { return sb.toString(); } - private void serverInit() { + private void serverRun() { try (final ServerSocketChannel serverSocket = ServerSocketChannel.open(); final Selector selector = Selector.open()) { // Binding this server on the port @@ -202,14 +201,19 @@ private void serverInit() { final Thread currentThread = Thread.currentThread(); while (running && !currentThread.isInterrupted()) { selector.select(1000); - final Iterator keys = selector.selectedKeys().iterator(); - while (keys.hasNext()) { - final SelectionKey key = keys.next(); - keys.remove(); + final Set keys = selector.selectedKeys(); + serverEvents.add(keys); + keys.clear(); + + SelectionKey key; + while ((key = serverEvents.poll()) != null) { if (!key.isValid()) { final Connection connection = (Connection) key.attachment(); - connection.close(); + if (connection != null) { + connection.close(); + } + key.cancel(); continue; @@ -243,7 +247,15 @@ private void accept(SelectionKey acceptKey) throws IOException { final SpottySocket socket = socketFactory.createSocket(channel); - final Connection connection = new Connection(socket, requestHandler, reactorWorker, exceptionHandlerRegistry, maxRequestBodySize); + final Connection connection = connection() + .socket(socket) + .serverEvents(serverEvents) + .requestHandler(requestHandler) + .reactorWorker(reactorWorker) + .exceptionHandlerRegistry(exceptionHandlerRegistry) + .maxRequestBodySize(maxRequestBodySize) + .build(); + LOG.debug("{} accepted, count={}", connection, connections.incrementAndGet()); connection.whenStateIs(CLOSED, () -> { diff --git a/core/src/main/java/spotty/server/connection/Connection.java b/core/src/main/java/spotty/server/connection/Connection.java index d9b06dc..2044f1f 100644 --- a/core/src/main/java/spotty/server/connection/Connection.java +++ b/core/src/main/java/spotty/server/connection/Connection.java @@ -35,6 +35,7 @@ import spotty.common.utils.ExceptionalRunnable; import spotty.server.connection.socket.SpottySocket; import spotty.server.connection.state.ConnectionState; +import spotty.server.event.ServerEvents; import spotty.server.handler.exception.ExceptionHandler; import spotty.server.handler.request.RequestHandler; import spotty.server.registry.exception.ExceptionHandlerRegistry; @@ -108,36 +109,26 @@ public final class Connection extends StateMachine implements C private SpottySocket socket; private final ReactorWorker reactorWorker; private final ExceptionHandlerRegistry exceptionHandlerRegistry; + private final ServerEvents serverEvents; private final int maxRequestBodySize; private ByteBuffer readBuffer; private RequestHandler requestHandler; + private SelectionKey selectionKey; private ByteBuffer headersByteBuffer; private ByteBuffer bodyByteBuffer; - public Connection(SpottySocket socket, - RequestHandler requestHandler, - ReactorWorker reactorWorker, - ExceptionHandlerRegistry exceptionHandlerRegistry, - int maxRequestBodySize) throws SpottyStreamException { - this(socket, requestHandler, reactorWorker, exceptionHandlerRegistry, maxRequestBodySize, DEFAULT_BUFFER_SIZE); - } - - public Connection(SpottySocket socket, - RequestHandler requestHandler, - ReactorWorker reactorWorker, - ExceptionHandlerRegistry exceptionHandlerRegistry, - int maxRequestBodySize, - int bufferSize) throws SpottyStreamException { + private Connection(Builder builder) throws SpottyStreamException { super(INITIALIZED); - this.socket = notNull("socket", socket); - this.requestHandler = notNull("requestHandler", requestHandler); - this.reactorWorker = notNull("reactorWorker", reactorWorker); - this.exceptionHandlerRegistry = notNull("exceptionHandlerService", exceptionHandlerRegistry); - this.maxRequestBodySize = maxRequestBodySize; + this.socket = notNull("socket", builder.socket); + this.requestHandler = notNull("requestHandler", builder.requestHandler); + this.reactorWorker = notNull("reactorWorker", builder.reactorWorker); + this.exceptionHandlerRegistry = notNull("exceptionHandlerService", builder.exceptionHandlerRegistry); + this.serverEvents = notNull("serverEvents", builder.serverEvents); + this.maxRequestBodySize = builder.maxRequestBodySize; - this.readBuffer = ByteBuffer.allocate(bufferSize); + this.readBuffer = ByteBuffer.allocate(builder.bufferSize); this.stateHandlerGraph .filter( @@ -204,7 +195,11 @@ public void after() { } public SelectionKey register(Selector selector) { - return exceptionHandler(() -> socket.register(selector, OP_CONNECT, this)); + if (selectionKey != null) { + throw new SpottyException("connection has been registered already"); + } + + return selectionKey = exceptionHandler(() -> socket.register(selector, OP_CONNECT, this)); } public void markDataRemaining() { @@ -218,12 +213,15 @@ public void markReadyToRead() { } public void handle() { - do { - exceptionHandler( - handleState, - afterExceptionHandler // if exception respond error to the client - ); - } while (socket.readBufferHasRemaining() && state().isReading()); + exceptionHandler( + handleState, + afterExceptionHandler // if exception respond error to the client + ); + + // if after connection handing socket buffer still has data then run handing in next tick + if (socket.readBufferHasRemaining() && state().isReading()) { + runHandleNextTick(); + } } // optimization to not spawn callback objects each time @@ -583,14 +581,18 @@ private boolean responseWriteCompleted() { changeState(READY_TO_READ); - // has something did not process + // has something did not process, add it to handle in next tick if (readBuffer.position() > 0 || socket.readBufferHasRemaining()) { - handle(); + runHandleNextTick(); } return false; } + private void runHandleNextTick() { + serverEvents.add(selectionKey); + } + private void resetResponse() { this.response.reset(); this.responseHeadersBuffer.reset(); @@ -680,4 +682,57 @@ private T exceptionHandler(ExceptionalCallable runnable, Runnable afterEx return null; } + public static class Builder { + private SpottySocket socket; + private RequestHandler requestHandler; + private ReactorWorker reactorWorker; + private ExceptionHandlerRegistry exceptionHandlerRegistry; + private ServerEvents serverEvents; + private int maxRequestBodySize; + private int bufferSize = DEFAULT_BUFFER_SIZE; + + public static Builder connection() { + return new Builder(); + } + + public Builder socket(SpottySocket socket) { + this.socket = socket; + return this; + } + + public Builder requestHandler(RequestHandler requestHandler) { + this.requestHandler = requestHandler; + return this; + } + + public Builder reactorWorker(ReactorWorker reactorWorker) { + this.reactorWorker = reactorWorker; + return this; + } + + public Builder exceptionHandlerRegistry(ExceptionHandlerRegistry exceptionHandlerRegistry) { + this.exceptionHandlerRegistry = exceptionHandlerRegistry; + return this; + } + + public Builder serverEvents(ServerEvents serverEvents) { + this.serverEvents = serverEvents; + return this; + } + + public Builder maxRequestBodySize(int maxRequestBodySize) { + this.maxRequestBodySize = maxRequestBodySize; + return this; + } + + public Builder bufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public Connection build() { + return new Connection(this); + } + } + } diff --git a/core/src/main/java/spotty/server/event/ServerEvents.java b/core/src/main/java/spotty/server/event/ServerEvents.java new file mode 100644 index 0000000..c299cc9 --- /dev/null +++ b/core/src/main/java/spotty/server/event/ServerEvents.java @@ -0,0 +1,41 @@ +/* + * Copyright 2022 - Alex Danilenko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package spotty.server.event; + +import java.nio.channels.SelectionKey; +import java.util.Collection; +import java.util.LinkedList; +import java.util.Queue; + +public final class ServerEvents { + private final Queue queue = new LinkedList<>(); + + public void add(SelectionKey key) { + queue.add(key); + } + + public void add(Collection keys) { + queue.addAll(keys); + } + + public SelectionKey poll() { + return queue.poll(); + } + + public boolean isEmpty() { + return queue.isEmpty(); + } +} \ No newline at end of file diff --git a/core/src/test/groovy/spotty/server/connection/ConnectionTest.groovy b/core/src/test/groovy/spotty/server/connection/ConnectionTest.groovy index 7346948..8249fd3 100644 --- a/core/src/test/groovy/spotty/server/connection/ConnectionTest.groovy +++ b/core/src/test/groovy/spotty/server/connection/ConnectionTest.groovy @@ -9,6 +9,7 @@ import spotty.common.response.ResponseHeadersWriter import spotty.common.response.SpottyResponse import spotty.common.stream.output.SpottyByteArrayOutputStream import spotty.server.connection.socket.SocketFactory +import spotty.server.event.ServerEvents import spotty.server.handler.EchoRequestHandler import spotty.server.handler.request.RequestHandler import spotty.server.registry.exception.ExceptionHandlerRegistry @@ -26,10 +27,10 @@ import static spotty.common.http.HttpStatus.BAD_REQUEST import static spotty.common.http.HttpStatus.INTERNAL_SERVER_ERROR import static spotty.common.http.HttpStatus.MOVED_PERMANENTLY import static spotty.common.http.HttpStatus.TOO_MANY_REQUESTS +import static spotty.server.connection.Connection.Builder.connection import static spotty.server.connection.state.ConnectionState.READY_TO_WRITE class ConnectionTest extends Specification implements WebRequestTestData { - private def socketFactory = new SocketFactory() private def exceptionService = new ExceptionHandlerRegistry() private def reactorWorker = new ReactorWorker(1, 1, 10, SECONDS) @@ -69,7 +70,11 @@ class ConnectionTest extends Specification implements WebRequestTestData { socket.write(fullRequest) socket.flip() - var connection = new Connection(socketFactory.createSocket(socket), delayHandler, reactorWorker, exceptionService, maxBodyLimit) + var connection = connectionBuilder() + .socket(socketFactory.createSocket(socket)) + .requestHandler(delayHandler) + .build() + connection.markReadyToRead() when: @@ -94,7 +99,11 @@ class ConnectionTest extends Specification implements WebRequestTestData { var expectedResponse = data.toString() - var connection = new Connection(socketFactory.createSocket(socket), new EchoRequestHandler(), reactorWorker, exceptionService, maxBodyLimit, fullRequest.length()) + var connection = connectionBuilder() + .socket(socketFactory.createSocket(socket)) + .bufferSize(fullRequest.length()) + .build() + connection.markReadyToRead() when: @@ -118,7 +127,9 @@ class ConnectionTest extends Specification implements WebRequestTestData { socket.configureBlocking(true) when: - new Connection(socketFactory.createSocket(socket), new EchoRequestHandler(), reactorWorker, exceptionService, maxBodyLimit) + connectionBuilder() + .socket(socketFactory.createSocket(socket)) + .build() .markReadyToRead() then: @@ -132,7 +143,10 @@ class ConnectionTest extends Specification implements WebRequestTestData { socket.write("wrong request head line\n") socket.flip() - var connection = new Connection(socketFactory.createSocket(socket), new EchoRequestHandler(), reactorWorker, exceptionService, maxBodyLimit) + var connection = connectionBuilder() + .socket(socketFactory.createSocket(socket)) + .build() + connection.markReadyToRead() when: @@ -166,7 +180,10 @@ class ConnectionTest extends Specification implements WebRequestTestData { socket.write("$HOST: localhost:4000\n\n") socket.flip() - var connection = new Connection(socketFactory.createSocket(socket), new EchoRequestHandler(), reactorWorker, exceptionService, maxBodyLimit) + var connection = connectionBuilder() + .socket(socketFactory.createSocket(socket)) + .build() + connection.markReadyToRead() when: @@ -208,16 +225,14 @@ class ConnectionTest extends Specification implements WebRequestTestData { socket.write(fullRequest) socket.flip() - var connection = new Connection( - socketFactory.createSocket(socket), - { req, res -> + var connection = connectionBuilder() + .socket(socketFactory.createSocket(socket)) + .requestHandler { req, res -> throw new SpottyHttpException(TOO_MANY_REQUESTS, "some message") - }, - reactorWorker, - exceptionService, - maxBodyLimit, - fullRequest.length() - ) + } + .bufferSize(fullRequest.length()) + .build() + connection.markReadyToRead() when: @@ -254,14 +269,12 @@ class ConnectionTest extends Specification implements WebRequestTestData { socket.write(fullRequest) socket.flip() - var connection = new Connection( - socketFactory.createSocket(socket), - { req, res -> res.redirect("https://google.com") }, - reactorWorker, - exceptionService, - maxBodyLimit, - fullRequest.length() - ) + var connection = connectionBuilder() + .socket(socketFactory.createSocket(socket)) + .requestHandler { req, res -> res.redirect("https://google.com") } + .bufferSize(fullRequest.length()) + .build() + connection.markReadyToRead() when: @@ -288,16 +301,15 @@ class ConnectionTest extends Specification implements WebRequestTestData { socket.write(fullRequest) socket.flip() - var connection = new Connection( - socketFactory.createSocket(socket), - { req, res -> + var connection = connectionBuilder() + .socket(socketFactory.createSocket(socket)) + .requestHandler { req, res -> throw new SpottyHttpException(TOO_MANY_REQUESTS, "some message") - }, - reactorWorker, - exceptionService, - maxRequestBodySize, - fullRequest.length() - ) + } + .maxRequestBodySize(maxRequestBodySize) + .bufferSize(fullRequest.length()) + .build() + connection.markReadyToRead() when: @@ -323,4 +335,13 @@ class ConnectionTest extends Specification implements WebRequestTestData { """.stripIndent().trim() } + def connectionBuilder() { + return connection() + .requestHandler(new EchoRequestHandler()) + .serverEvents(new ServerEvents()) + .reactorWorker(reactorWorker) + .exceptionHandlerRegistry(exceptionService) + .maxRequestBodySize(maxBodyLimit) + } + }