Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions core/src/main/java/spotty/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import spotty.server.connection.Connection;
import spotty.server.connection.socket.SocketFactory;
import spotty.server.connection.socket.SpottySocket;
import spotty.server.event.ServerEvents;
import spotty.server.handler.request.RequestHandler;
import spotty.server.registry.exception.ExceptionHandlerRegistry;
import spotty.server.worker.ReactorWorker;
Expand All @@ -24,20 +25,18 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.KeyStore;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static java.nio.channels.SelectionKey.OP_ACCEPT;
import static java.nio.channels.SelectionKey.OP_CONNECT;
import static java.nio.channels.SelectionKey.OP_READ;
import static java.nio.channels.SelectionKey.OP_WRITE;
import static spotty.common.utils.ThreadUtils.threadPool;
import static spotty.common.validation.Validation.isNotBlank;
import static spotty.common.validation.Validation.notBlank;
import static spotty.common.validation.Validation.notNull;
import static spotty.common.validation.Validation.validate;
import static spotty.server.connection.Connection.Builder.connection;
import static spotty.server.connection.state.ConnectionState.CLOSED;
import static spotty.server.connection.state.ConnectionState.DATA_REMAINING;
import static spotty.server.connection.state.ConnectionState.INITIALIZED;
Expand All @@ -48,20 +47,19 @@
public final class Server implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(Server.class);

private final ExecutorService SERVER_RUN = Executors.newSingleThreadExecutor(threadPool("spotty-main", false));

private volatile boolean running = false;
private volatile boolean started = false;
private volatile boolean enabledHttps = false;

private final AtomicInteger connections = new AtomicInteger();

private final SocketFactory socketFactory = new SocketFactory();
private final ServerEvents serverEvents = new ServerEvents();

private final int maxRequestBodySize;
private final RequestHandler requestHandler;
private final ExceptionHandlerRegistry exceptionHandlerRegistry;
private final ReactorWorker reactorWorker ;
private final ReactorWorker reactorWorker;
private final InetSocketAddress socketAddress;

public Server(int port, int maxRequestBodySize, RequestHandler requestHandler, ExceptionHandlerRegistry exceptionHandlerRegistry, ReactorWorker reactorWorker) {
Expand All @@ -80,7 +78,9 @@ public synchronized void start() {
return;
}

SERVER_RUN.execute(this::serverInit);
final Thread main = new Thread(this::serverRun, "spotty-main");
main.setDaemon(false);
main.start();
}

public void enableHttps(String keyStorePath, String keyStorePassword, String trustStorePath, String trustStorePassword) {
Expand Down Expand Up @@ -130,7 +130,6 @@ public void enableHttps(String keyStorePath, String keyStorePassword, String tru
@Override
public synchronized void close() {
stop();
SERVER_RUN.shutdownNow();
reactorWorker.close();
}

Expand Down Expand Up @@ -186,7 +185,7 @@ public String hostUrl() {
return sb.toString();
}

private void serverInit() {
private void serverRun() {
try (final ServerSocketChannel serverSocket = ServerSocketChannel.open();
final Selector selector = Selector.open()) {
// Binding this server on the port
Expand All @@ -202,14 +201,19 @@ private void serverInit() {
final Thread currentThread = Thread.currentThread();
while (running && !currentThread.isInterrupted()) {
selector.select(1000);
final Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
final SelectionKey key = keys.next();
keys.remove();

final Set<SelectionKey> keys = selector.selectedKeys();
serverEvents.add(keys);
keys.clear();

SelectionKey key;
while ((key = serverEvents.poll()) != null) {
if (!key.isValid()) {
final Connection connection = (Connection) key.attachment();
connection.close();
if (connection != null) {
connection.close();
}

key.cancel();

continue;
Expand Down Expand Up @@ -243,7 +247,15 @@ private void accept(SelectionKey acceptKey) throws IOException {

final SpottySocket socket = socketFactory.createSocket(channel);

final Connection connection = new Connection(socket, requestHandler, reactorWorker, exceptionHandlerRegistry, maxRequestBodySize);
final Connection connection = connection()
.socket(socket)
.serverEvents(serverEvents)
.requestHandler(requestHandler)
.reactorWorker(reactorWorker)
.exceptionHandlerRegistry(exceptionHandlerRegistry)
.maxRequestBodySize(maxRequestBodySize)
.build();

LOG.debug("{} accepted, count={}", connection, connections.incrementAndGet());

connection.whenStateIs(CLOSED, () -> {
Expand Down
113 changes: 84 additions & 29 deletions core/src/main/java/spotty/server/connection/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import spotty.common.utils.ExceptionalRunnable;
import spotty.server.connection.socket.SpottySocket;
import spotty.server.connection.state.ConnectionState;
import spotty.server.event.ServerEvents;
import spotty.server.handler.exception.ExceptionHandler;
import spotty.server.handler.request.RequestHandler;
import spotty.server.registry.exception.ExceptionHandlerRegistry;
Expand Down Expand Up @@ -108,36 +109,26 @@ public final class Connection extends StateMachine<ConnectionState> implements C
private SpottySocket socket;
private final ReactorWorker reactorWorker;
private final ExceptionHandlerRegistry exceptionHandlerRegistry;
private final ServerEvents serverEvents;
private final int maxRequestBodySize;
private ByteBuffer readBuffer;
private RequestHandler requestHandler;
private SelectionKey selectionKey;

private ByteBuffer headersByteBuffer;
private ByteBuffer bodyByteBuffer;

public Connection(SpottySocket socket,
RequestHandler requestHandler,
ReactorWorker reactorWorker,
ExceptionHandlerRegistry exceptionHandlerRegistry,
int maxRequestBodySize) throws SpottyStreamException {
this(socket, requestHandler, reactorWorker, exceptionHandlerRegistry, maxRequestBodySize, DEFAULT_BUFFER_SIZE);
}

public Connection(SpottySocket socket,
RequestHandler requestHandler,
ReactorWorker reactorWorker,
ExceptionHandlerRegistry exceptionHandlerRegistry,
int maxRequestBodySize,
int bufferSize) throws SpottyStreamException {
private Connection(Builder builder) throws SpottyStreamException {
super(INITIALIZED);

this.socket = notNull("socket", socket);
this.requestHandler = notNull("requestHandler", requestHandler);
this.reactorWorker = notNull("reactorWorker", reactorWorker);
this.exceptionHandlerRegistry = notNull("exceptionHandlerService", exceptionHandlerRegistry);
this.maxRequestBodySize = maxRequestBodySize;
this.socket = notNull("socket", builder.socket);
this.requestHandler = notNull("requestHandler", builder.requestHandler);
this.reactorWorker = notNull("reactorWorker", builder.reactorWorker);
this.exceptionHandlerRegistry = notNull("exceptionHandlerService", builder.exceptionHandlerRegistry);
this.serverEvents = notNull("serverEvents", builder.serverEvents);
this.maxRequestBodySize = builder.maxRequestBodySize;

this.readBuffer = ByteBuffer.allocate(bufferSize);
this.readBuffer = ByteBuffer.allocate(builder.bufferSize);

this.stateHandlerGraph
.filter(
Expand Down Expand Up @@ -204,7 +195,11 @@ public void after() {
}

public SelectionKey register(Selector selector) {
return exceptionHandler(() -> socket.register(selector, OP_CONNECT, this));
if (selectionKey != null) {
throw new SpottyException("connection has been registered already");
}

return selectionKey = exceptionHandler(() -> socket.register(selector, OP_CONNECT, this));
}

public void markDataRemaining() {
Expand All @@ -218,12 +213,15 @@ public void markReadyToRead() {
}

public void handle() {
do {
exceptionHandler(
handleState,
afterExceptionHandler // if exception respond error to the client
);
} while (socket.readBufferHasRemaining() && state().isReading());
exceptionHandler(
handleState,
afterExceptionHandler // if exception respond error to the client
);

// if after connection handing socket buffer still has data then run handing in next tick
if (socket.readBufferHasRemaining() && state().isReading()) {
runHandleNextTick();
}
}

// optimization to not spawn callback objects each time
Expand Down Expand Up @@ -583,14 +581,18 @@ private boolean responseWriteCompleted() {

changeState(READY_TO_READ);

// has something did not process
// has something did not process, add it to handle in next tick
if (readBuffer.position() > 0 || socket.readBufferHasRemaining()) {
handle();
runHandleNextTick();
}

return false;
}

private void runHandleNextTick() {
serverEvents.add(selectionKey);
}

private void resetResponse() {
this.response.reset();
this.responseHeadersBuffer.reset();
Expand Down Expand Up @@ -680,4 +682,57 @@ private <T> T exceptionHandler(ExceptionalCallable<T> runnable, Runnable afterEx
return null;
}

public static class Builder {
private SpottySocket socket;
private RequestHandler requestHandler;
private ReactorWorker reactorWorker;
private ExceptionHandlerRegistry exceptionHandlerRegistry;
private ServerEvents serverEvents;
private int maxRequestBodySize;
private int bufferSize = DEFAULT_BUFFER_SIZE;

public static Builder connection() {
return new Builder();
}

public Builder socket(SpottySocket socket) {
this.socket = socket;
return this;
}

public Builder requestHandler(RequestHandler requestHandler) {
this.requestHandler = requestHandler;
return this;
}

public Builder reactorWorker(ReactorWorker reactorWorker) {
this.reactorWorker = reactorWorker;
return this;
}

public Builder exceptionHandlerRegistry(ExceptionHandlerRegistry exceptionHandlerRegistry) {
this.exceptionHandlerRegistry = exceptionHandlerRegistry;
return this;
}

public Builder serverEvents(ServerEvents serverEvents) {
this.serverEvents = serverEvents;
return this;
}

public Builder maxRequestBodySize(int maxRequestBodySize) {
this.maxRequestBodySize = maxRequestBodySize;
return this;
}

public Builder bufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}

public Connection build() {
return new Connection(this);
}
}

}
41 changes: 41 additions & 0 deletions core/src/main/java/spotty/server/event/ServerEvents.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2022 - Alex Danilenko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spotty.server.event;

import java.nio.channels.SelectionKey;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;

public final class ServerEvents {
private final Queue<SelectionKey> queue = new LinkedList<>();

public void add(SelectionKey key) {
queue.add(key);
}

public void add(Collection<SelectionKey> keys) {
queue.addAll(keys);
}

public SelectionKey poll() {
return queue.poll();
}

public boolean isEmpty() {
return queue.isEmpty();
}
}
Loading